ansible-doctor/ansibledoctor/utils/__init__.py

341 lines
9.7 KiB
Python

#!/usr/bin/env python3
"""Global utility methods and classes."""
import logging
import os
import sys
from collections.abc import Iterable
import colorama
from pythonjsonlogger import jsonlogger
CONSOLE_FORMAT = "{}{}[%(levelname)s]{} %(message)s"
JSON_FORMAT = "%(asctime)s %(levelname)s %(message)s"
def strtobool(value):
"""Convert a string representation of truth to true or false."""
_map = {
"y": True,
"yes": True,
"t": True,
"true": True,
"on": True,
"1": True,
"n": False,
"no": False,
"f": False,
"false": False,
"off": False,
"0": False,
}
try:
return _map[str(value).lower()]
except KeyError as err:
raise ValueError(f'"{value}" is not a valid bool value') from err
def to_bool(string):
return bool(strtobool(str(string)))
def flatten(items):
for x in items:
if isinstance(x, Iterable) and not isinstance(x, (str, bytes)):
yield from flatten(x)
else:
yield x
def _should_do_markup():
py_colors = os.environ.get("PY_COLORS", None)
if py_colors is not None:
return to_bool(py_colors)
return sys.stdout.isatty() and os.environ.get("TERM") != "dumb"
def _split_string(string, delimiter, escape, maxsplit=None):
result = []
current_element = []
iterator = iter(string)
count_split = 0
skip_split = False
for character in iterator:
if maxsplit and count_split >= maxsplit:
skip_split = True
if character == escape and not skip_split:
try:
next_character = next(iterator)
if next_character != delimiter and next_character != escape:
# Do not copy the escape character if it is intended to escape either the
# delimiter or the escape character itself. Copy the escape character
# if it is not used to escape either of these characters.
current_element.append(escape)
current_element.append(next_character)
count_split += 1
except StopIteration:
current_element.append(escape)
elif character == delimiter and not skip_split:
result.append("".join(current_element))
current_element = []
count_split += 1
else:
current_element.append(character)
result.append("".join(current_element))
return result
colorama.init(autoreset=True, strip=not _should_do_markup())
class Singleton(type):
"""Meta singleton class."""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super().__call__(*args, **kwargs)
return cls._instances[cls]
class LogFilter:
"""Exclude log messages above the logged level."""
def __init__(self, level):
"""
Initialize a new custom log filter.
:param level: Log level limit
:returns: None
"""
self.__level = level
def filter(self, logRecord): # noqa
# https://docs.python.org/3/library/logging.html#logrecord-attributes
return logRecord.levelno <= self.__level
class MultilineFormatter(logging.Formatter):
"""Reset color after newline characters."""
def format(self, record):
record.msg = record.msg.strip().replace("\n", f"\n{colorama.Style.RESET_ALL}... ")
return logging.Formatter.format(self, record)
class MultilineJsonFormatter(jsonlogger.JsonFormatter):
"""Remove newline characters."""
def format(self, record):
record.msg = record.msg.replace("\n", " ")
return jsonlogger.JsonFormatter.format(self, record)
class Log:
"""Handle logging."""
def __init__(self, level=logging.WARNING, name="ansibledoctor", json=False):
self.ctx = os.getcwd()
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
self.register_hanlers(json=json)
self.logger.propagate = False
def _get_error_handler(self, json=False):
handler = logging.StreamHandler(sys.stderr)
handler.setLevel(logging.ERROR)
handler.addFilter(LogFilter(logging.ERROR))
handler.setFormatter(
MultilineFormatter(
self.error(
CONSOLE_FORMAT.format(
colorama.Fore.RED, colorama.Style.BRIGHT, colorama.Style.RESET_ALL
)
)
)
)
if json:
handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT))
return handler
def _get_warning_handler(self, json=False):
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.WARNING)
handler.addFilter(LogFilter(logging.WARNING))
handler.setFormatter(
MultilineFormatter(
self.warning(
CONSOLE_FORMAT.format(
colorama.Fore.YELLOW, colorama.Style.BRIGHT, colorama.Style.RESET_ALL
)
)
)
)
if json:
handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT))
return handler
def _get_info_handler(self, json=False):
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
handler.addFilter(LogFilter(logging.INFO))
handler.setFormatter(
MultilineFormatter(
self.info(
CONSOLE_FORMAT.format(
colorama.Fore.CYAN, colorama.Style.BRIGHT, colorama.Style.RESET_ALL
)
)
)
)
if json:
handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT))
return handler
def _get_critical_handler(self, json=False):
handler = logging.StreamHandler(sys.stderr)
handler.setLevel(logging.CRITICAL)
handler.addFilter(LogFilter(logging.CRITICAL))
handler.setFormatter(
MultilineFormatter(
self.critical(
CONSOLE_FORMAT.format(
colorama.Fore.RED, colorama.Style.BRIGHT, colorama.Style.RESET_ALL
)
)
)
)
if json:
handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT))
return handler
def _get_debug_handler(self, json=False):
handler = logging.StreamHandler(sys.stderr)
handler.setLevel(logging.DEBUG)
handler.addFilter(LogFilter(logging.DEBUG))
handler.setFormatter(
MultilineFormatter(
self.debug(
CONSOLE_FORMAT.format(
colorama.Fore.BLUE, colorama.Style.BRIGHT, colorama.Style.RESET_ALL
)
)
)
)
if json:
handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT))
return handler
def register_hanlers(self, json=False):
"""
Enable or disable JSON logging.
:param enable: True to enable JSON logging, False to disable
"""
# Remove all existing handlers
for handler in self.logger.handlers[:]:
self.logger.removeHandler(handler)
self.logger.addHandler(self._get_error_handler(json=json))
self.logger.addHandler(self._get_warning_handler(json=json))
self.logger.addHandler(self._get_info_handler(json=json))
self.logger.addHandler(self._get_critical_handler(json=json))
self.logger.addHandler(self._get_debug_handler(json=json))
def set_level(self, s):
self.logger.setLevel(s.upper())
def debug(self, msg):
"""Format info messages and return string."""
return msg
def critical(self, msg):
"""Format critical messages and return string."""
return msg
def error(self, msg):
"""Format error messages and return string."""
return msg
def warning(self, msg):
"""Format warning messages and return string."""
return msg
def info(self, msg):
"""Format info messages and return string."""
return msg
def _color_text(self, color, msg):
"""
Colorize strings.
:param color: colorama color settings
:param msg: string to colorize
:returns: string
"""
return f"{color}{msg}{colorama.Style.RESET_ALL}"
def sysexit(self, code=1):
sys.exit(code)
def sysexit_with_message(self, msg, code=1):
self.logger.critical(str(msg).strip())
self.sysexit(code)
class SingleLog(Log, metaclass=Singleton):
"""Singleton logging class."""
pass
class FileUtils:
"""Mics static methods for file handling."""
@staticmethod
def create_path(path):
os.makedirs(path, exist_ok=True)
@staticmethod
def query_yes_no(question, default=True):
"""
Ask a yes/no question via input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is one of "yes" or "no".
"""
prompt = "[Y/n]" if default else "[N/y]"
while True:
try:
# input method is safe in python3
choice = input(f"{question} {prompt} ") or default # nosec
return to_bool(choice)
except ValueError:
print("Invalid input. Please enter 'y' or 'n'.") # noqa: T201
except KeyboardInterrupt as e:
raise e