mirror of
https://github.com/thegeeklab/git-batch.git
synced 2024-11-24 12:00:42 +00:00
modularize
This commit is contained in:
parent
111c08b69e
commit
09055c7f73
102
gitbatch/Cli.py
Normal file
102
gitbatch/Cli.py
Normal file
@ -0,0 +1,102 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Main program."""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
|
||||
from collections import defaultdict
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import git
|
||||
|
||||
from gitbatch import __version__
|
||||
from gitbatch.Logging import SingleLog
|
||||
from gitbatch.Utils import normalize_path, to_bool
|
||||
|
||||
|
||||
class GitBatch:
|
||||
|
||||
def __init__(self):
|
||||
self.log = SingleLog()
|
||||
self.logger = self.log.logger
|
||||
self.args = self._cli_args()
|
||||
self.config = self._config()
|
||||
self.run()
|
||||
|
||||
def _cli_args(self):
|
||||
parser = argparse.ArgumentParser(
|
||||
description=("Clone single branch from all repositories listed in a file"))
|
||||
parser.add_argument("--version", action="version", version="%(prog)s {}".format(__version__))
|
||||
|
||||
return parser.parse_args()
|
||||
|
||||
def _config(self):
|
||||
config = defaultdict(dict)
|
||||
input_file_raw = os.environ.get("GIT_BATCH_INPUT_FILE", "./batchfile")
|
||||
config["input_file"] = normalize_path(input_file_raw)
|
||||
|
||||
config["ignore_existing"] = to_bool(os.environ.get("GIT_BATCH_IGNORE_EXISTING_REPO", True))
|
||||
config["ignore_missing"] = to_bool(os.environ.get("GIT_BATCH_IGNORE_MISSING_REMOTE", True))
|
||||
|
||||
return config
|
||||
|
||||
def _repos_from_file(self, src):
|
||||
repos = []
|
||||
with open(src, "r") as f:
|
||||
for num, line in enumerate(f, start=1):
|
||||
repo = {}
|
||||
line = line.strip()
|
||||
if line and not line.startswith("#"):
|
||||
try:
|
||||
url, branch, dest = [x.strip() for x in line.split(";")]
|
||||
except ValueError as e:
|
||||
self.log.sysexit_with_message("Wrong numer of delimiters in line {line_num}: {exp}".format(
|
||||
line_num=num, exp=e))
|
||||
|
||||
if url:
|
||||
url_parts = urlparse(url)
|
||||
|
||||
repo["url"] = url
|
||||
repo["branch"] = branch or "master"
|
||||
repo["name"] = os.path.basename(url_parts.path)
|
||||
repo["dest"] = normalize_path(dest) or normalize_path("./{}".format(repo["name"]))
|
||||
|
||||
repos.append(repo)
|
||||
else:
|
||||
self.log.sysexit_with_message("Repository Url is not set on line {line_num}".format(
|
||||
line_num=num))
|
||||
return repos
|
||||
|
||||
def _repos_clone(self, repos, ignore_existing):
|
||||
for repo in repos:
|
||||
try:
|
||||
options = [
|
||||
"--branch={}".format(repo["branch"]),
|
||||
"--single-branch"
|
||||
]
|
||||
git.Repo.clone_from(repo["url"], repo["dest"], multi_options=options)
|
||||
except git.exc.GitCommandError as e:
|
||||
passed = False
|
||||
err_raw = e.stderr.strip().splitlines()[:-1]
|
||||
err = [x.split(":", 1)[1].strip() for x in err_raw]
|
||||
|
||||
if any(["already exists and is not an empty directory" in item for item in err]):
|
||||
if self.config["ignore_existing"]:
|
||||
passed = True
|
||||
|
||||
if any(["Could not find remote branch" in item for item in err]):
|
||||
if self.config["ignore_missing"]:
|
||||
passed = True
|
||||
|
||||
if not passed:
|
||||
self.log.sysexit_with_message("Git error: " + "\n".join(err))
|
||||
|
||||
def run(self):
|
||||
if os.path.isfile(self.config["input_file"]):
|
||||
repos = self._repos_from_file(self.config["input_file"])
|
||||
self._repos_clone(repos, self.config["ignore_existing"])
|
||||
else:
|
||||
self.log.sysexit_with_message("The given batch file at '{}' does not exist".format(
|
||||
os.path.relpath(os.path.join("./", self.config["input_file"]))))
|
177
gitbatch/Logging.py
Normal file
177
gitbatch/Logging.py
Normal file
@ -0,0 +1,177 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Global utility methods and classes."""
|
||||
|
||||
import os
|
||||
import logging
|
||||
import gitbatch.Utils
|
||||
import sys
|
||||
|
||||
|
||||
from gitbatch.Utils import Singleton, to_bool
|
||||
import colorama
|
||||
from pythonjsonlogger import jsonlogger
|
||||
|
||||
CONSOLE_FORMAT = "{}[%(levelname)s]{} %(message)s"
|
||||
JSON_FORMAT = "(asctime) (levelname) (message)"
|
||||
|
||||
|
||||
def _should_do_markup():
|
||||
py_colors = os.environ.get("PY_COLORS", None)
|
||||
if py_colors is not None:
|
||||
return to_bool(py_colors)
|
||||
|
||||
return sys.stdout.isatty() and os.environ.get("TERM") != "dumb"
|
||||
|
||||
|
||||
colorama.init(autoreset=True, strip=not _should_do_markup())
|
||||
|
||||
|
||||
class LogFilter(object):
|
||||
"""A custom log filter which excludes log messages above the logged level."""
|
||||
|
||||
def __init__(self, level):
|
||||
"""
|
||||
Initialize a new custom log filter.
|
||||
|
||||
:param level: Log level limit
|
||||
:returns: None
|
||||
|
||||
"""
|
||||
self.__level = level
|
||||
|
||||
def filter(self, logRecord): # noqa
|
||||
# https://docs.python.org/3/library/logging.html#logrecord-attributes
|
||||
return logRecord.levelno <= self.__level
|
||||
|
||||
|
||||
class MultilineFormatter(logging.Formatter):
|
||||
"""Logging Formatter to reset color after newline characters."""
|
||||
|
||||
def format(self, record): # noqa
|
||||
record.msg = record.msg.replace("\n", "\n{}... ".format(colorama.Style.RESET_ALL))
|
||||
return logging.Formatter.format(self, record)
|
||||
|
||||
|
||||
class MultilineJsonFormatter(jsonlogger.JsonFormatter):
|
||||
"""Logging Formatter to remove newline characters."""
|
||||
|
||||
def format(self, record): # noqa
|
||||
record.msg = record.msg.replace("\n", " ")
|
||||
return jsonlogger.JsonFormatter.format(self, record)
|
||||
|
||||
|
||||
class Log:
|
||||
def __init__(self, level=logging.WARN, name="ansibledoctor", json=False):
|
||||
self.logger = logging.getLogger(name)
|
||||
self.logger.setLevel(level)
|
||||
self.logger.addHandler(self._get_error_handler(json=json))
|
||||
self.logger.addHandler(self._get_warn_handler(json=json))
|
||||
self.logger.addHandler(self._get_info_handler(json=json))
|
||||
self.logger.addHandler(self._get_critical_handler(json=json))
|
||||
self.logger.addHandler(self._get_debug_handler(json=json))
|
||||
self.logger.propagate = False
|
||||
|
||||
def _get_error_handler(self, json=False):
|
||||
handler = logging.StreamHandler(sys.stderr)
|
||||
handler.setLevel(logging.ERROR)
|
||||
handler.addFilter(LogFilter(logging.ERROR))
|
||||
handler.setFormatter(MultilineFormatter(
|
||||
self.error(CONSOLE_FORMAT.format(colorama.Fore.RED, colorama.Style.RESET_ALL))))
|
||||
|
||||
if json:
|
||||
handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT))
|
||||
|
||||
return handler
|
||||
|
||||
def _get_warn_handler(self, json=False):
|
||||
handler = logging.StreamHandler(sys.stdout)
|
||||
handler.setLevel(logging.WARN)
|
||||
handler.addFilter(LogFilter(logging.WARN))
|
||||
handler.setFormatter(MultilineFormatter(
|
||||
self.warn(CONSOLE_FORMAT.format(colorama.Fore.YELLOW, colorama.Style.RESET_ALL))))
|
||||
|
||||
if json:
|
||||
handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT))
|
||||
|
||||
return handler
|
||||
|
||||
def _get_info_handler(self, json=False):
|
||||
handler = logging.StreamHandler(sys.stdout)
|
||||
handler.setLevel(logging.INFO)
|
||||
handler.addFilter(LogFilter(logging.INFO))
|
||||
handler.setFormatter(MultilineFormatter(
|
||||
self.info(CONSOLE_FORMAT.format(colorama.Fore.CYAN, colorama.Style.RESET_ALL))))
|
||||
|
||||
if json:
|
||||
handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT))
|
||||
|
||||
return handler
|
||||
|
||||
def _get_critical_handler(self, json=False):
|
||||
handler = logging.StreamHandler(sys.stderr)
|
||||
handler.setLevel(logging.CRITICAL)
|
||||
handler.addFilter(LogFilter(logging.CRITICAL))
|
||||
handler.setFormatter(MultilineFormatter(
|
||||
self.critical(CONSOLE_FORMAT.format(colorama.Fore.RED, colorama.Style.RESET_ALL))))
|
||||
|
||||
if json:
|
||||
handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT))
|
||||
|
||||
return handler
|
||||
|
||||
def _get_debug_handler(self, json=False):
|
||||
handler = logging.StreamHandler(sys.stderr)
|
||||
handler.setLevel(logging.DEBUG)
|
||||
handler.addFilter(LogFilter(logging.DEBUG))
|
||||
handler.setFormatter(MultilineFormatter(
|
||||
self.critical(CONSOLE_FORMAT.format(colorama.Fore.BLUE, colorama.Style.RESET_ALL))))
|
||||
|
||||
if json:
|
||||
handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT))
|
||||
|
||||
return handler
|
||||
|
||||
def set_level(self, s):
|
||||
self.logger.setLevel(s)
|
||||
|
||||
def debug(self, msg):
|
||||
"""Format info messages and return string."""
|
||||
return msg
|
||||
|
||||
def critical(self, msg):
|
||||
"""Format critical messages and return string."""
|
||||
return msg
|
||||
|
||||
def error(self, msg):
|
||||
"""Format error messages and return string."""
|
||||
return msg
|
||||
|
||||
def warn(self, msg):
|
||||
"""Format warn messages and return string."""
|
||||
return msg
|
||||
|
||||
def info(self, msg):
|
||||
"""Format info messages and return string."""
|
||||
return msg
|
||||
|
||||
def _color_text(self, color, msg):
|
||||
"""
|
||||
Colorize strings.
|
||||
|
||||
:param color: colorama color settings
|
||||
:param msg: string to colorize
|
||||
:returns: string
|
||||
|
||||
"""
|
||||
return "{}{}{}".format(color, msg, colorama.Style.RESET_ALL)
|
||||
|
||||
def sysexit(self, code=1):
|
||||
sys.exit(code)
|
||||
|
||||
def sysexit_with_message(self, msg, code=1):
|
||||
self.logger.critical(str(msg))
|
||||
self.sysexit(code)
|
||||
|
||||
|
||||
class SingleLog(Log, metaclass=Singleton):
|
||||
pass
|
24
gitbatch/Utils.py
Normal file
24
gitbatch/Utils.py
Normal file
@ -0,0 +1,24 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Global utility methods and classes."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
from distutils.util import strtobool
|
||||
|
||||
|
||||
def normalize_path(path):
|
||||
if path:
|
||||
return os.path.abspath(os.path.expanduser(os.path.expandvars(path)))
|
||||
|
||||
|
||||
def to_bool(string):
|
||||
return bool(strtobool(str(string)))
|
||||
|
||||
|
||||
class Singleton(type):
|
||||
_instances = {}
|
||||
|
||||
def __call__(cls, *args, **kwargs):
|
||||
if cls not in cls._instances:
|
||||
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
|
||||
return cls._instances[cls]
|
@ -1,100 +1,11 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Main program."""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import git
|
||||
|
||||
from gitbatch import __version__
|
||||
|
||||
logger = logging.getLogger("gitbatch")
|
||||
formatter = logging.Formatter("[%(levelname)s] %(message)s")
|
||||
|
||||
cmdlog = logging.StreamHandler()
|
||||
cmdlog.setLevel(logging.ERROR)
|
||||
cmdlog.setFormatter(formatter)
|
||||
logger.addHandler(cmdlog)
|
||||
|
||||
|
||||
def sysexit(self, code=1):
|
||||
sys.exit(code)
|
||||
|
||||
|
||||
def sysexit_with_message(msg, code=1):
|
||||
logger.error(str(msg))
|
||||
sysexit(code)
|
||||
|
||||
|
||||
def normalize_path(path):
|
||||
if path:
|
||||
return os.path.abspath(os.path.expanduser(os.path.expandvars(path)))
|
||||
|
||||
|
||||
def repos_from_file(src):
|
||||
repos = []
|
||||
with open(src, "r") as f:
|
||||
for num, line in enumerate(f, start=1):
|
||||
repo = {}
|
||||
line = line.strip()
|
||||
if line and not line.startswith("#"):
|
||||
try:
|
||||
url, branch, dest = [x.strip() for x in line.split(";")]
|
||||
except ValueError as e:
|
||||
sysexit_with_message("Wrong numer of delimiters in line {line_num}: {exp}".format(
|
||||
line_num=num, exp=e))
|
||||
|
||||
if url:
|
||||
url_parts = urlparse(url)
|
||||
|
||||
repo["url"] = url
|
||||
repo["branch"] = branch or "master"
|
||||
repo["name"] = os.path.basename(url_parts.path)
|
||||
repo["dest"] = normalize_path(dest) or normalize_path("./{}".format(repo["name"]))
|
||||
|
||||
repos.append(repo)
|
||||
else:
|
||||
sysexit_with_message("Repository Url is not set on line {line_num}".format(
|
||||
line_num=num))
|
||||
return repos
|
||||
|
||||
|
||||
def repos_clone(repos, ignore_existing):
|
||||
for repo in repos:
|
||||
print(repo)
|
||||
try:
|
||||
git.Repo.clone_from(repo["url"], repo["dest"], multi_options=["--branch=docs", "--single-branch"])
|
||||
except git.exc.GitCommandError as e:
|
||||
if not ignore_existing:
|
||||
err_raw = [x.strip() for x in e.stderr.split(":")][2]
|
||||
err = err_raw.splitlines()[0].split(".")[0]
|
||||
sysexit_with_message("Git error: {}".format(err))
|
||||
else:
|
||||
pass
|
||||
from gitbatch.Cli import GitBatch
|
||||
|
||||
|
||||
def main():
|
||||
"""Run main program."""
|
||||
parser = argparse.ArgumentParser(
|
||||
description=("Clone single branch from all repositories listed in a file"))
|
||||
parser.add_argument("--version", action="version", version="%(prog)s {}".format(__version__))
|
||||
|
||||
parser.parse_args()
|
||||
|
||||
input_file_raw = os.environ.get("GIT_BATCH_INPUT_FILE", "./batchfile")
|
||||
input_file = normalize_path(input_file_raw)
|
||||
|
||||
ignore_existing = os.environ.get("GIT_BATCH_IGNORE_EXISTING", True)
|
||||
|
||||
if os.path.isfile(input_file):
|
||||
repos = repos_from_file(input_file)
|
||||
repos_clone(repos, ignore_existing)
|
||||
else:
|
||||
sysexit_with_message("The given batch file at '{}' does not exist".format(
|
||||
os.path.relpath(os.path.join("./", input_file))))
|
||||
GitBatch()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
Loading…
Reference in New Issue
Block a user