From 09055c7f732dba9ad489b69a7681559ab7dac726 Mon Sep 17 00:00:00 2001 From: Robert Kaussow Date: Mon, 2 Dec 2019 09:53:35 +0100 Subject: [PATCH] modularize --- Dockerfile | 2 +- gitbatch/Cli.py | 102 +++++++++++++++++++++++++ gitbatch/Logging.py | 177 +++++++++++++++++++++++++++++++++++++++++++ gitbatch/Utils.py | 24 ++++++ gitbatch/__main__.py | 93 +---------------------- setup.py | 2 + 6 files changed, 308 insertions(+), 92 deletions(-) create mode 100644 gitbatch/Cli.py create mode 100644 gitbatch/Logging.py create mode 100644 gitbatch/Utils.py diff --git a/Dockerfile b/Dockerfile index 6831c82..f982baa 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ FROM python:3.7-alpine - + LABEL maintainer="Robert Kaussow " \ org.label-schema.name="git-batch" \ org.label-schema.vcs-url="https://github.com/xoxys/git-batch" \ diff --git a/gitbatch/Cli.py b/gitbatch/Cli.py new file mode 100644 index 0000000..3f17fbd --- /dev/null +++ b/gitbatch/Cli.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3 +"""Main program.""" + +import argparse +import logging +import os +import sys + +from collections import defaultdict +from urllib.parse import urlparse + +import git + +from gitbatch import __version__ +from gitbatch.Logging import SingleLog +from gitbatch.Utils import normalize_path, to_bool + + +class GitBatch: + + def __init__(self): + self.log = SingleLog() + self.logger = self.log.logger + self.args = self._cli_args() + self.config = self._config() + self.run() + + def _cli_args(self): + parser = argparse.ArgumentParser( + description=("Clone single branch from all repositories listed in a file")) + parser.add_argument("--version", action="version", version="%(prog)s {}".format(__version__)) + + return parser.parse_args() + + def _config(self): + config = defaultdict(dict) + input_file_raw = os.environ.get("GIT_BATCH_INPUT_FILE", "./batchfile") + config["input_file"] = normalize_path(input_file_raw) + + config["ignore_existing"] = to_bool(os.environ.get("GIT_BATCH_IGNORE_EXISTING_REPO", True)) + config["ignore_missing"] = to_bool(os.environ.get("GIT_BATCH_IGNORE_MISSING_REMOTE", True)) + + return config + + def _repos_from_file(self, src): + repos = [] + with open(src, "r") as f: + for num, line in enumerate(f, start=1): + repo = {} + line = line.strip() + if line and not line.startswith("#"): + try: + url, branch, dest = [x.strip() for x in line.split(";")] + except ValueError as e: + self.log.sysexit_with_message("Wrong numer of delimiters in line {line_num}: {exp}".format( + line_num=num, exp=e)) + + if url: + url_parts = urlparse(url) + + repo["url"] = url + repo["branch"] = branch or "master" + repo["name"] = os.path.basename(url_parts.path) + repo["dest"] = normalize_path(dest) or normalize_path("./{}".format(repo["name"])) + + repos.append(repo) + else: + self.log.sysexit_with_message("Repository Url is not set on line {line_num}".format( + line_num=num)) + return repos + + def _repos_clone(self, repos, ignore_existing): + for repo in repos: + try: + options = [ + "--branch={}".format(repo["branch"]), + "--single-branch" + ] + git.Repo.clone_from(repo["url"], repo["dest"], multi_options=options) + except git.exc.GitCommandError as e: + passed = False + err_raw = e.stderr.strip().splitlines()[:-1] + err = [x.split(":", 1)[1].strip() for x in err_raw] + + if any(["already exists and is not an empty directory" in item for item in err]): + if self.config["ignore_existing"]: + passed = True + + if any(["Could not find remote branch" in item for item in err]): + if self.config["ignore_missing"]: + passed = True + + if not passed: + self.log.sysexit_with_message("Git error: " + "\n".join(err)) + + def run(self): + if os.path.isfile(self.config["input_file"]): + repos = self._repos_from_file(self.config["input_file"]) + self._repos_clone(repos, self.config["ignore_existing"]) + else: + self.log.sysexit_with_message("The given batch file at '{}' does not exist".format( + os.path.relpath(os.path.join("./", self.config["input_file"])))) diff --git a/gitbatch/Logging.py b/gitbatch/Logging.py new file mode 100644 index 0000000..a563cfb --- /dev/null +++ b/gitbatch/Logging.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python3 +"""Global utility methods and classes.""" + +import os +import logging +import gitbatch.Utils +import sys + + +from gitbatch.Utils import Singleton, to_bool +import colorama +from pythonjsonlogger import jsonlogger + +CONSOLE_FORMAT = "{}[%(levelname)s]{} %(message)s" +JSON_FORMAT = "(asctime) (levelname) (message)" + + +def _should_do_markup(): + py_colors = os.environ.get("PY_COLORS", None) + if py_colors is not None: + return to_bool(py_colors) + + return sys.stdout.isatty() and os.environ.get("TERM") != "dumb" + + +colorama.init(autoreset=True, strip=not _should_do_markup()) + + +class LogFilter(object): + """A custom log filter which excludes log messages above the logged level.""" + + def __init__(self, level): + """ + Initialize a new custom log filter. + + :param level: Log level limit + :returns: None + + """ + self.__level = level + + def filter(self, logRecord): # noqa + # https://docs.python.org/3/library/logging.html#logrecord-attributes + return logRecord.levelno <= self.__level + + +class MultilineFormatter(logging.Formatter): + """Logging Formatter to reset color after newline characters.""" + + def format(self, record): # noqa + record.msg = record.msg.replace("\n", "\n{}... ".format(colorama.Style.RESET_ALL)) + return logging.Formatter.format(self, record) + + +class MultilineJsonFormatter(jsonlogger.JsonFormatter): + """Logging Formatter to remove newline characters.""" + + def format(self, record): # noqa + record.msg = record.msg.replace("\n", " ") + return jsonlogger.JsonFormatter.format(self, record) + + +class Log: + def __init__(self, level=logging.WARN, name="ansibledoctor", json=False): + self.logger = logging.getLogger(name) + self.logger.setLevel(level) + self.logger.addHandler(self._get_error_handler(json=json)) + self.logger.addHandler(self._get_warn_handler(json=json)) + self.logger.addHandler(self._get_info_handler(json=json)) + self.logger.addHandler(self._get_critical_handler(json=json)) + self.logger.addHandler(self._get_debug_handler(json=json)) + self.logger.propagate = False + + def _get_error_handler(self, json=False): + handler = logging.StreamHandler(sys.stderr) + handler.setLevel(logging.ERROR) + handler.addFilter(LogFilter(logging.ERROR)) + handler.setFormatter(MultilineFormatter( + self.error(CONSOLE_FORMAT.format(colorama.Fore.RED, colorama.Style.RESET_ALL)))) + + if json: + handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT)) + + return handler + + def _get_warn_handler(self, json=False): + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(logging.WARN) + handler.addFilter(LogFilter(logging.WARN)) + handler.setFormatter(MultilineFormatter( + self.warn(CONSOLE_FORMAT.format(colorama.Fore.YELLOW, colorama.Style.RESET_ALL)))) + + if json: + handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT)) + + return handler + + def _get_info_handler(self, json=False): + handler = logging.StreamHandler(sys.stdout) + handler.setLevel(logging.INFO) + handler.addFilter(LogFilter(logging.INFO)) + handler.setFormatter(MultilineFormatter( + self.info(CONSOLE_FORMAT.format(colorama.Fore.CYAN, colorama.Style.RESET_ALL)))) + + if json: + handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT)) + + return handler + + def _get_critical_handler(self, json=False): + handler = logging.StreamHandler(sys.stderr) + handler.setLevel(logging.CRITICAL) + handler.addFilter(LogFilter(logging.CRITICAL)) + handler.setFormatter(MultilineFormatter( + self.critical(CONSOLE_FORMAT.format(colorama.Fore.RED, colorama.Style.RESET_ALL)))) + + if json: + handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT)) + + return handler + + def _get_debug_handler(self, json=False): + handler = logging.StreamHandler(sys.stderr) + handler.setLevel(logging.DEBUG) + handler.addFilter(LogFilter(logging.DEBUG)) + handler.setFormatter(MultilineFormatter( + self.critical(CONSOLE_FORMAT.format(colorama.Fore.BLUE, colorama.Style.RESET_ALL)))) + + if json: + handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT)) + + return handler + + def set_level(self, s): + self.logger.setLevel(s) + + def debug(self, msg): + """Format info messages and return string.""" + return msg + + def critical(self, msg): + """Format critical messages and return string.""" + return msg + + def error(self, msg): + """Format error messages and return string.""" + return msg + + def warn(self, msg): + """Format warn messages and return string.""" + return msg + + def info(self, msg): + """Format info messages and return string.""" + return msg + + def _color_text(self, color, msg): + """ + Colorize strings. + + :param color: colorama color settings + :param msg: string to colorize + :returns: string + + """ + return "{}{}{}".format(color, msg, colorama.Style.RESET_ALL) + + def sysexit(self, code=1): + sys.exit(code) + + def sysexit_with_message(self, msg, code=1): + self.logger.critical(str(msg)) + self.sysexit(code) + + +class SingleLog(Log, metaclass=Singleton): + pass diff --git a/gitbatch/Utils.py b/gitbatch/Utils.py new file mode 100644 index 0000000..ef83d59 --- /dev/null +++ b/gitbatch/Utils.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 +"""Global utility methods and classes.""" + +import os +import sys +from distutils.util import strtobool + + +def normalize_path(path): + if path: + return os.path.abspath(os.path.expanduser(os.path.expandvars(path))) + + +def to_bool(string): + return bool(strtobool(str(string))) + + +class Singleton(type): + _instances = {} + + def __call__(cls, *args, **kwargs): + if cls not in cls._instances: + cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) + return cls._instances[cls] diff --git a/gitbatch/__main__.py b/gitbatch/__main__.py index 9e60d32..c28c00c 100644 --- a/gitbatch/__main__.py +++ b/gitbatch/__main__.py @@ -1,100 +1,11 @@ #!/usr/bin/env python3 """Main program.""" -import argparse -import logging -import os -import sys -from urllib.parse import urlparse - -import git - -from gitbatch import __version__ - -logger = logging.getLogger("gitbatch") -formatter = logging.Formatter("[%(levelname)s] %(message)s") - -cmdlog = logging.StreamHandler() -cmdlog.setLevel(logging.ERROR) -cmdlog.setFormatter(formatter) -logger.addHandler(cmdlog) - - -def sysexit(self, code=1): - sys.exit(code) - - -def sysexit_with_message(msg, code=1): - logger.error(str(msg)) - sysexit(code) - - -def normalize_path(path): - if path: - return os.path.abspath(os.path.expanduser(os.path.expandvars(path))) - - -def repos_from_file(src): - repos = [] - with open(src, "r") as f: - for num, line in enumerate(f, start=1): - repo = {} - line = line.strip() - if line and not line.startswith("#"): - try: - url, branch, dest = [x.strip() for x in line.split(";")] - except ValueError as e: - sysexit_with_message("Wrong numer of delimiters in line {line_num}: {exp}".format( - line_num=num, exp=e)) - - if url: - url_parts = urlparse(url) - - repo["url"] = url - repo["branch"] = branch or "master" - repo["name"] = os.path.basename(url_parts.path) - repo["dest"] = normalize_path(dest) or normalize_path("./{}".format(repo["name"])) - - repos.append(repo) - else: - sysexit_with_message("Repository Url is not set on line {line_num}".format( - line_num=num)) - return repos - - -def repos_clone(repos, ignore_existing): - for repo in repos: - print(repo) - try: - git.Repo.clone_from(repo["url"], repo["dest"], multi_options=["--branch=docs", "--single-branch"]) - except git.exc.GitCommandError as e: - if not ignore_existing: - err_raw = [x.strip() for x in e.stderr.split(":")][2] - err = err_raw.splitlines()[0].split(".")[0] - sysexit_with_message("Git error: {}".format(err)) - else: - pass +from gitbatch.Cli import GitBatch def main(): - """Run main program.""" - parser = argparse.ArgumentParser( - description=("Clone single branch from all repositories listed in a file")) - parser.add_argument("--version", action="version", version="%(prog)s {}".format(__version__)) - - parser.parse_args() - - input_file_raw = os.environ.get("GIT_BATCH_INPUT_FILE", "./batchfile") - input_file = normalize_path(input_file_raw) - - ignore_existing = os.environ.get("GIT_BATCH_IGNORE_EXISTING", True) - - if os.path.isfile(input_file): - repos = repos_from_file(input_file) - repos_clone(repos, ignore_existing) - else: - sysexit_with_message("The given batch file at '{}' does not exist".format( - os.path.relpath(os.path.join("./", input_file)))) + GitBatch() if __name__ == "__main__": diff --git a/setup.py b/setup.py index f2f5b94..18a00d8 100644 --- a/setup.py +++ b/setup.py @@ -58,6 +58,8 @@ setup( ], install_requires=[ "gitpython", + "colorama", + "python-json-logger", ], entry_points={ "console_scripts": [