ansible-doctor/ansibledoctor/utils.py

276 lines
7.7 KiB
Python
Raw Normal View History

2019-10-08 09:39:27 +00:00
#!/usr/bin/env python3
"""Global utility methods and classes."""
import logging
2019-10-07 06:52:00 +00:00
import os
import sys
from distutils.util import strtobool
2019-10-07 06:52:00 +00:00
import colorama
from pythonjsonlogger import jsonlogger
2019-10-08 09:39:27 +00:00
import ansibledoctor.exception
2020-04-06 07:45:26 +00:00
CONSOLE_FORMAT = "{}{}[%(levelname)s]{} %(message)s"
JSON_FORMAT = "(asctime) (levelname) (message)"
2019-10-08 13:55:24 +00:00
def to_bool(string):
return bool(strtobool(str(string)))
def _should_do_markup():
py_colors = os.environ.get("PY_COLORS", None)
if py_colors is not None:
2019-10-08 13:58:08 +00:00
return to_bool(py_colors)
return sys.stdout.isatty() and os.environ.get("TERM") != "dumb"
colorama.init(autoreset=True, strip=not _should_do_markup())
2019-10-07 06:52:00 +00:00
class Singleton(type):
2020-04-05 21:16:53 +00:00
"""Meta singleton class."""
2019-10-07 06:52:00 +00:00
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class LogFilter(object):
"""A custom log filter which excludes log messages above the logged level."""
def __init__(self, level):
"""
Initialize a new custom log filter.
:param level: Log level limit
:returns: None
"""
self.__level = level
def filter(self, logRecord): # noqa
# https://docs.python.org/3/library/logging.html#logrecord-attributes
return logRecord.levelno <= self.__level
class MultilineFormatter(logging.Formatter):
"""Logging Formatter to reset color after newline characters."""
2020-04-05 21:16:53 +00:00
def format(self, record): # noqa
record.msg = record.msg.replace("\n", "\n{}... ".format(colorama.Style.RESET_ALL))
return logging.Formatter.format(self, record)
class MultilineJsonFormatter(jsonlogger.JsonFormatter):
"""Logging Formatter to remove newline characters."""
2020-04-05 21:16:53 +00:00
def format(self, record): # noqa
record.msg = record.msg.replace("\n", " ")
return jsonlogger.JsonFormatter.format(self, record)
2019-10-07 06:52:00 +00:00
class Log:
2020-04-05 21:16:53 +00:00
"""Handle logging."""
def __init__(self, level=logging.WARN, name="ansibledoctor", json=False):
self.logger = logging.getLogger(name)
self.logger.setLevel(level)
self.logger.addHandler(self._get_error_handler(json=json))
self.logger.addHandler(self._get_warn_handler(json=json))
self.logger.addHandler(self._get_info_handler(json=json))
self.logger.addHandler(self._get_critical_handler(json=json))
2019-10-08 09:30:31 +00:00
self.logger.addHandler(self._get_debug_handler(json=json))
self.logger.propagate = False
def _get_error_handler(self, json=False):
handler = logging.StreamHandler(sys.stderr)
handler.setLevel(logging.ERROR)
handler.addFilter(LogFilter(logging.ERROR))
2020-04-05 21:16:53 +00:00
handler.setFormatter(
MultilineFormatter(
2020-04-06 07:45:26 +00:00
self.error(
CONSOLE_FORMAT.format(
colorama.Fore.RED, colorama.Style.BRIGHT, colorama.Style.RESET_ALL
)
)
2020-04-05 21:16:53 +00:00
)
)
if json:
handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT))
return handler
def _get_warn_handler(self, json=False):
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.WARN)
handler.addFilter(LogFilter(logging.WARN))
2020-04-05 21:16:53 +00:00
handler.setFormatter(
MultilineFormatter(
2020-04-06 07:45:26 +00:00
self.warn(
CONSOLE_FORMAT.format(
colorama.Fore.YELLOW, colorama.Style.BRIGHT, colorama.Style.RESET_ALL
)
)
2020-04-05 21:16:53 +00:00
)
)
if json:
handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT))
return handler
def _get_info_handler(self, json=False):
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
handler.addFilter(LogFilter(logging.INFO))
2020-04-05 21:16:53 +00:00
handler.setFormatter(
MultilineFormatter(
2020-04-06 07:45:26 +00:00
self.info(
CONSOLE_FORMAT.format(
colorama.Fore.CYAN, colorama.Style.BRIGHT, colorama.Style.RESET_ALL
)
)
2020-04-05 21:16:53 +00:00
)
)
if json:
handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT))
return handler
def _get_critical_handler(self, json=False):
handler = logging.StreamHandler(sys.stderr)
handler.setLevel(logging.CRITICAL)
handler.addFilter(LogFilter(logging.CRITICAL))
2020-04-05 21:16:53 +00:00
handler.setFormatter(
MultilineFormatter(
2020-04-06 07:45:26 +00:00
self.critical(
CONSOLE_FORMAT.format(
colorama.Fore.RED, colorama.Style.BRIGHT, colorama.Style.RESET_ALL
)
)
2020-04-05 21:16:53 +00:00
)
)
if json:
handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT))
return handler
2019-10-07 06:52:00 +00:00
2019-10-08 09:30:31 +00:00
def _get_debug_handler(self, json=False):
handler = logging.StreamHandler(sys.stderr)
handler.setLevel(logging.DEBUG)
handler.addFilter(LogFilter(logging.DEBUG))
2020-04-05 21:16:53 +00:00
handler.setFormatter(
MultilineFormatter(
2020-04-06 07:45:26 +00:00
self.critical(
CONSOLE_FORMAT.format(
colorama.Fore.BLUE, colorama.Style.BRIGHT, colorama.Style.RESET_ALL
)
)
2020-04-05 21:16:53 +00:00
)
)
2019-10-08 09:30:31 +00:00
if json:
handler.setFormatter(MultilineJsonFormatter(JSON_FORMAT))
return handler
2019-10-07 06:52:00 +00:00
def set_level(self, s):
self.logger.setLevel(s)
2019-10-07 06:52:00 +00:00
def debug(self, msg):
"""Format info messages and return string."""
return msg
2019-10-07 06:52:00 +00:00
def critical(self, msg):
"""Format critical messages and return string."""
return msg
2019-10-07 06:52:00 +00:00
def error(self, msg):
"""Format error messages and return string."""
return msg
2019-10-07 06:52:00 +00:00
def warn(self, msg):
"""Format warn messages and return string."""
return msg
2019-10-07 06:52:00 +00:00
def info(self, msg):
"""Format info messages and return string."""
return msg
2019-10-07 06:52:00 +00:00
def _color_text(self, color, msg):
"""
Colorize strings.
2019-10-07 06:52:00 +00:00
:param color: colorama color settings
:param msg: string to colorize
:returns: string
2019-10-07 06:52:00 +00:00
"""
return "{}{}{}".format(color, msg, colorama.Style.RESET_ALL)
2019-10-07 06:52:00 +00:00
2019-10-08 09:30:31 +00:00
def sysexit(self, code=1):
sys.exit(code)
def sysexit_with_message(self, msg, code=1):
self.logger.critical(str(msg))
self.sysexit(code)
2019-10-07 06:52:00 +00:00
class SingleLog(Log, metaclass=Singleton):
2020-04-05 21:16:53 +00:00
"""Singleton logging class."""
2019-10-07 06:52:00 +00:00
pass
class UnsafeTag:
2020-04-05 21:16:53 +00:00
"""Handle custom yaml unsafe tag."""
2020-02-24 18:12:35 +00:00
yaml_tag = u"!unsafe"
def __init__(self, value):
self.unsafe = value
@staticmethod
def yaml_constructor(loader, node):
return loader.construct_scalar(node)
2019-10-07 06:52:00 +00:00
class FileUtils:
2020-04-05 21:16:53 +00:00
"""Mics static methods for file handling."""
2019-10-07 06:52:00 +00:00
@staticmethod
def create_path(path):
os.makedirs(path, exist_ok=True)
@staticmethod
2019-10-08 09:30:31 +00:00
def query_yes_no(question, default=True):
2019-10-07 06:52:00 +00:00
"""Ask a yes/no question via input() and return their answer.
"question" is a string that is presented to the user.
"default" is the presumed answer if the user just hits <Enter>.
It must be "yes" (the default), "no" or None (meaning
an answer is required of the user).
The "answer" return value is one of "yes" or "no".
"""
2019-10-08 09:30:31 +00:00
if default:
prompt = "[Y/n]"
2019-10-07 06:52:00 +00:00
else:
2019-10-08 09:30:31 +00:00
prompt = "[N/y]"
try:
2020-04-11 10:28:38 +00:00
# input method is safe in python3
choice = input("{} {} ".format(question, prompt)) or default # nosec
2020-01-22 12:33:19 +00:00
return to_bool(choice)
2019-10-08 09:30:31 +00:00
except (KeyboardInterrupt, ValueError) as e:
raise ansibledoctor.exception.InputError("Error while reading input", e)