ansible-doctor/ansibledoctor/config.py

354 lines
11 KiB
Python
Raw Normal View History

2019-10-08 09:39:27 +00:00
#!/usr/bin/env python3
"""Global settings definition."""
2019-10-07 06:52:00 +00:00
import os
import anyconfig
2019-10-08 22:59:40 +00:00
import environs
2019-10-08 09:30:31 +00:00
import jsonschema.exceptions
import ruamel.yaml
from appdirs import AppDirs
from jsonschema._utils import format_as_index
2019-10-07 06:52:00 +00:00
import ansibledoctor.exception
from ansibledoctor.utils import Singleton
2019-10-07 06:52:00 +00:00
config_dir = AppDirs("ansible-doctor").user_config_dir
default_config_file = os.path.join(config_dir, "config.yml")
class Config():
"""
Create an object with all necessary settings.
Settings are loade from multiple locations in defined order (last wins):
- default settings defined by `self._get_defaults()`
- yaml config file, defaults to OS specific user config dir (https://pypi.org/project/appdirs/)
- provides cli parameters
"""
TEMPLATE_SRC = os.path.join(
os.path.dirname(os.path.realpath(__file__)), *["templates", "readme"]
)
2019-10-08 22:56:39 +00:00
SETTINGS = {
"config_file": {
"default": "",
"env": "CONFIG_FILE",
"type": environs.Env().str
},
"role_dir": {
2019-10-08 22:56:39 +00:00
"default": "",
"env": "ROLE_DIR",
2019-10-08 22:56:39 +00:00
"type": environs.Env().str
},
2020-01-22 12:07:05 +00:00
"role_name": {
"default": "",
"env": "ROLE_NAME",
"type": environs.Env().str
},
2019-10-08 22:56:39 +00:00
"dry_run": {
"default": False,
"env": "DRY_RUN",
"file": True,
"type": environs.Env().bool
},
"logging.level": {
"default": "WARNING",
"env": "LOG_LEVEL",
"file": True,
"type": environs.Env().str
},
"logging.json": {
"default": False,
"env": "LOG_JSON",
"file": True,
"type": environs.Env().bool
},
"output_dir": {
"default": os.getcwd(),
"env": "OUTPUT_DIR",
"file": True,
"type": environs.Env().str
},
"template_src": {
"default": f"local>{TEMPLATE_SRC}",
"env": "template_src",
2019-10-08 22:56:39 +00:00
"file": True,
"type": environs.Env().str
},
"force_overwrite": {
"default": False,
"env": "FORCE_OVERWRITE",
"file": True,
"type": environs.Env().bool
},
"custom_header": {
"default": "",
"env": "CUSTOM_HEADER",
"file": True,
"type": environs.Env().str
},
"exclude_files": {
"default": [],
"env": "EXCLUDE_FILES",
"file": True,
"type": environs.Env().list
},
"exclude_tags": {
"default": [],
"env": "EXCLUDE_TAGS",
"file": True,
"type": environs.Env().list
},
2021-07-27 20:01:56 +00:00
"role_detection": {
"default": True,
"env": "ROLE_DETECTION",
"file": True,
"type": environs.Env().bool
},
2019-10-08 22:56:39 +00:00
}
ANNOTATIONS = {
"meta": {
"name": "meta",
"automatic": True,
"subtypes": ["value"],
"allow_multiple": False
2019-10-08 22:56:39 +00:00
},
"todo": {
"name": "todo",
"automatic": True,
"subtypes": ["value"],
"allow_multiple": True
2019-10-08 22:56:39 +00:00
},
"var": {
"name": "var",
"automatic": True,
"subtypes": ["value", "example", "description"],
"allow_multiple": False
2019-10-08 22:56:39 +00:00
},
"example": {
"name": "example",
"automatic": True,
"subtypes": [],
"allow_multiple": False
2019-10-08 22:56:39 +00:00
},
"tag": {
"name": "tag",
"automatic": True,
"subtypes": ["value", "description"],
"allow_multiple": False
2019-10-08 22:56:39 +00:00
},
}
def __init__(self, args=None):
"""
Initialize a new settings class.
2019-10-07 06:52:00 +00:00
:param args: An optional dict of options, arguments and commands from the CLI.
:param config_file: An optional path to a yaml config file.
:returns: None
"""
if args is None:
self._args = {}
else:
self._args = args
2019-10-08 22:56:39 +00:00
self._schema = None
self.config_file = default_config_file
self.role_dir = os.getcwd()
2019-10-08 22:56:39 +00:00
self.config = None
self._set_config()
self.is_role = self._set_is_role() or False
self.template = self._set_template()
2019-10-08 22:56:39 +00:00
def _get_args(self, args):
cleaned = dict(filter(lambda item: item[1] is not None, args.items()))
2019-10-08 22:56:39 +00:00
normalized = {}
for key, value in cleaned.items():
normalized = self._add_dict_branch(normalized, key.split("."), value)
# Override correct log level from argparse
levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
2019-10-08 22:56:39 +00:00
log_level = levels.index(self.SETTINGS["logging.level"]["default"])
if normalized.get("logging"):
for adjustment in normalized["logging"]["level"]:
log_level = min(len(levels) - 1, max(log_level + adjustment, 0))
2019-10-08 22:56:39 +00:00
normalized["logging"]["level"] = levels[log_level]
2019-10-08 22:56:39 +00:00
return normalized
def _get_defaults(self):
2019-10-08 22:56:39 +00:00
normalized = {}
for key, item in self.SETTINGS.items():
normalized = self._add_dict_branch(normalized, key.split("."), item["default"])
2020-01-22 12:07:05 +00:00
# compute role_name default
normalized["role_name"] = os.path.basename(self.role_dir)
2019-10-08 22:56:39 +00:00
self.schema = anyconfig.gen_schema(normalized)
return normalized
def _get_envs(self):
normalized = {}
for key, item in self.SETTINGS.items():
if item.get("env"):
prefix = "ANSIBLE_DOCTOR_"
envname = prefix + item["env"]
try:
value = item["type"](envname)
normalized = self._add_dict_branch(normalized, key.split("."), value)
except environs.EnvError as e:
if f'"{envname}" not set' in str(e):
2019-10-08 22:56:39 +00:00
pass
else:
raise ansibledoctor.exception.ConfigError(
2020-04-05 21:16:53 +00:00
"Unable to read environment variable", str(e)
)
2019-10-08 22:56:39 +00:00
return normalized
def _set_config(self):
args = self._get_args(self._args)
envs = self._get_envs()
defaults = self._get_defaults()
2019-10-08 22:56:39 +00:00
# preset config file path
if envs.get("config_file"):
self.config_file = self._normalize_path(envs.get("config_file"))
if envs.get("role_dir"):
self.role_dir = self._normalize_path(envs.get("role_dir"))
2019-10-08 22:56:39 +00:00
if args.get("config_file"):
self.config_file = self._normalize_path(args.get("config_file"))
if args.get("role_dir"):
self.role_dir = self._normalize_path(args.get("role_dir"))
2019-10-08 22:56:39 +00:00
source_files = []
source_files.append(self.config_file)
source_files.append(os.path.join(os.getcwd(), ".ansibledoctor"))
source_files.append(os.path.join(os.getcwd(), ".ansibledoctor.yml"))
source_files.append(os.path.join(os.getcwd(), ".ansibledoctor.yaml"))
for config in source_files:
if config and os.path.exists(config):
with open(config, encoding="utf8") as stream:
s = stream.read()
2019-10-08 09:30:31 +00:00
try:
file_dict = ruamel.yaml.safe_load(s)
2020-04-05 21:16:53 +00:00
except (
ruamel.yaml.composer.ComposerError, ruamel.yaml.scanner.ScannerError
) as e:
message = f"{e.context} {e.problem}"
raise ansibledoctor.exception.ConfigError(
f"Unable to read config file {config}", message
) from e
2019-10-08 09:30:31 +00:00
2019-10-08 22:56:39 +00:00
if self._validate(file_dict):
anyconfig.merge(defaults, file_dict, ac_merge=anyconfig.MS_DICTS)
defaults["logging"]["level"] = defaults["logging"]["level"].upper()
2019-10-08 22:56:39 +00:00
if self._validate(envs):
anyconfig.merge(defaults, envs, ac_merge=anyconfig.MS_DICTS)
2019-10-08 22:56:39 +00:00
if self._validate(args):
anyconfig.merge(defaults, args, ac_merge=anyconfig.MS_DICTS)
fix_files = ["output_dir", "custom_header"]
for file in fix_files:
if defaults[file] and defaults[file] != "":
defaults[file] = self._normalize_path(defaults[file])
2019-10-08 22:56:39 +00:00
if "config_file" in defaults:
2019-10-08 22:56:39 +00:00
defaults.pop("config_file")
if "role_dir" in defaults:
defaults.pop("role_dir")
defaults["logging"]["level"] = defaults["logging"]["level"].upper()
2019-10-08 22:56:39 +00:00
self.config = defaults
def _normalize_path(self, path):
if not os.path.isabs(path):
base = os.path.join(os.getcwd(), path)
return os.path.abspath(os.path.expanduser(os.path.expandvars(base)))
return path
2019-10-08 09:30:31 +00:00
def _set_is_role(self):
if os.path.isdir(os.path.join(self.role_dir, "tasks")):
2019-10-08 09:30:31 +00:00
return True
return False
def _validate(self, config):
try:
anyconfig.validate(config, self.schema, ac_schema_safe=False)
2019-10-08 09:30:31 +00:00
except jsonschema.exceptions.ValidationError as e:
schema_error = "Failed validating '{validator}' in schema{schema}\n{message}".format(
validator=e.validator,
2019-10-08 09:30:31 +00:00
schema=format_as_index(list(e.relative_schema_path)[:-1]),
message=e.message
)
raise ansibledoctor.exception.ConfigError("Configuration error", schema_error) from e
2019-10-08 09:30:31 +00:00
return True
def _add_dict_branch(self, tree, vector, value):
key = vector[0]
tree[key] = value \
if len(vector) == 1 \
2019-10-08 22:56:39 +00:00
else self._add_dict_branch(tree[key] if key in tree else {}, vector[1:], value)
return tree
2019-10-07 06:52:00 +00:00
def get_annotations_definition(self, automatic=True):
annotations = {}
if automatic:
2019-10-08 22:56:39 +00:00
for k, item in self.ANNOTATIONS.items():
if "automatic" in item and item["automatic"]:
2019-10-07 06:52:00 +00:00
annotations[k] = item
return annotations
def get_annotations_names(self, automatic=True):
annotations = []
if automatic:
2019-10-08 22:56:39 +00:00
for k, item in self.ANNOTATIONS.items():
if "automatic" in item and item["automatic"]:
2019-10-07 06:52:00 +00:00
annotations.append(k)
return annotations
def _set_template(self):
allowed_provider = ["local", "git"]
2019-10-07 06:52:00 +00:00
try:
provider, path = self.config.get("template_src").split(">", 1)
except ValueError as e:
raise ansibledoctor.exception.ConfigError(
"Malformed option 'template_src'", str(e)
) from e
provider = provider.strip().lower()
path = path.strip()
if provider not in allowed_provider:
raise ansibledoctor.exception.ConfigError(
f"Unknonw template provider '{provider}'",
f"the template provider has to be one of [{', '.join(allowed_provider)}]"
)
if provider == "local":
path = self._normalize_path(os.path.realpath(path))
if provider == "git":
pass
return provider, path
2019-10-07 06:52:00 +00:00
class SingleConfig(Config, metaclass=Singleton):
2020-04-05 21:16:53 +00:00
"""Singleton config class."""
2019-10-07 06:52:00 +00:00
pass