2019-10-07 12:44:45 +00:00
|
|
|
"""Global settings object definition."""
|
|
|
|
|
2019-10-07 06:52:00 +00:00
|
|
|
import os
|
2019-10-07 12:44:45 +00:00
|
|
|
import sys
|
2019-10-07 06:52:00 +00:00
|
|
|
|
2019-10-07 12:44:45 +00:00
|
|
|
import anyconfig
|
2019-10-07 06:52:00 +00:00
|
|
|
import yaml
|
2019-10-07 12:44:45 +00:00
|
|
|
from appdirs import AppDirs
|
|
|
|
from jsonschema._utils import format_as_index
|
|
|
|
from pkg_resources import resource_filename
|
2019-10-07 06:52:00 +00:00
|
|
|
|
|
|
|
from ansibledoctor.Utils import Singleton
|
|
|
|
|
2019-10-07 12:44:45 +00:00
|
|
|
config_dir = AppDirs("ansible-doctor").user_config_dir
|
|
|
|
default_config_file = os.path.join(config_dir, "config.yml")
|
|
|
|
|
|
|
|
|
|
|
|
class Config():
|
|
|
|
"""
|
|
|
|
Create an object with all necessary settings.
|
|
|
|
|
|
|
|
Settings are loade from multiple locations in defined order (last wins):
|
|
|
|
- default settings defined by `self._get_defaults()`
|
|
|
|
- yaml config file, defaults to OS specific user config dir (https://pypi.org/project/appdirs/)
|
|
|
|
- provides cli parameters
|
|
|
|
"""
|
|
|
|
|
|
|
|
def __init__(self, args={}, config_file=None):
|
|
|
|
"""
|
|
|
|
Initialize a new settings class.
|
2019-10-07 06:52:00 +00:00
|
|
|
|
2019-10-07 12:44:45 +00:00
|
|
|
:param args: An optional dict of options, arguments and commands from the CLI.
|
|
|
|
:param config_file: An optional path to a yaml config file.
|
|
|
|
:returns: None
|
|
|
|
|
|
|
|
"""
|
|
|
|
self.config_file = None
|
|
|
|
self.schema = None
|
|
|
|
self.dry_run = False
|
|
|
|
self.args = self._set_args(args)
|
|
|
|
self.config = self._get_config()
|
|
|
|
self.is_role = self._set_is_role() or False
|
|
|
|
self._annotations = self._set_annotations()
|
|
|
|
|
|
|
|
def _set_args(self, args):
|
|
|
|
defaults = self._get_defaults()
|
|
|
|
self.config_file = args.get("config_file") or default_config_file
|
|
|
|
|
|
|
|
args.pop("config_file", None)
|
|
|
|
tmp_args = dict(filter(lambda item: item[1] is not None, args.items()))
|
|
|
|
|
|
|
|
tmp_dict = {}
|
|
|
|
for key, value in tmp_args.items():
|
|
|
|
tmp_dict = self._add_dict_branch(tmp_dict, key.split("."), value)
|
|
|
|
|
|
|
|
# Override correct log level from argparse
|
|
|
|
levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
|
|
|
|
log_level = levels.index(defaults["logging"]["level"])
|
|
|
|
if tmp_dict.get("logging"):
|
|
|
|
for adjustment in tmp_dict["logging"]["level"]:
|
|
|
|
log_level = min(len(levels) - 1, max(log_level + adjustment, 0))
|
|
|
|
tmp_dict["logging"]["level"] = levels[log_level]
|
|
|
|
|
|
|
|
return tmp_dict
|
|
|
|
|
|
|
|
def _get_defaults(self):
|
|
|
|
default_output = os.getcwd()
|
|
|
|
default_template = os.path.join(os.path.dirname(os.path.realpath(__file__)), "templates")
|
|
|
|
defaults = {
|
|
|
|
"logging": {
|
|
|
|
"level": "WARNING",
|
|
|
|
"json": False
|
|
|
|
},
|
|
|
|
"output_dir": default_output,
|
|
|
|
"template_dir": default_template,
|
|
|
|
"template": "readme",
|
|
|
|
"force_overwrite": False,
|
|
|
|
"exclude_files": [],
|
|
|
|
}
|
|
|
|
|
|
|
|
self.schema = anyconfig.gen_schema(defaults)
|
|
|
|
return defaults
|
|
|
|
|
|
|
|
def _get_config(self):
|
|
|
|
defaults = self._get_defaults()
|
|
|
|
source_files = []
|
|
|
|
source_files.append(self.config_file)
|
|
|
|
# TODO: support multipel filename formats e.g. .yaml or .ansibledoctor
|
|
|
|
source_files.append(os.path.relpath(
|
|
|
|
os.path.normpath(os.path.join(os.getcwd(), ".ansibledoctor.yml"))))
|
|
|
|
cli_options = self.args
|
|
|
|
|
|
|
|
for config in source_files:
|
|
|
|
if config and os.path.exists(config):
|
|
|
|
with open(config, "r", encoding="utf8") as stream:
|
|
|
|
s = stream.read()
|
|
|
|
# TODO: catch malformed files
|
|
|
|
sdict = yaml.safe_load(s)
|
|
|
|
if self._validate(sdict):
|
|
|
|
anyconfig.merge(defaults, sdict, ac_merge=anyconfig.MS_DICTS)
|
|
|
|
defaults["logging"]["level"] = defaults["logging"]["level"].upper()
|
|
|
|
|
|
|
|
if cli_options and self._validate(cli_options):
|
|
|
|
anyconfig.merge(defaults, cli_options, ac_merge=anyconfig.MS_DICTS)
|
|
|
|
|
|
|
|
return defaults
|
|
|
|
|
|
|
|
def _set_annotations(self):
|
|
|
|
annotations = {
|
|
|
|
"meta": {
|
|
|
|
"name": "meta",
|
|
|
|
"automatic": True
|
|
|
|
},
|
|
|
|
"todo": {
|
|
|
|
"name": "todo",
|
|
|
|
"automatic": True,
|
|
|
|
},
|
|
|
|
"var": {
|
|
|
|
"name": "var",
|
|
|
|
"automatic": True,
|
|
|
|
},
|
|
|
|
"example": {
|
|
|
|
"name": "example",
|
|
|
|
"regex": r"(\#\ *\@example\ *\: *.*)"
|
|
|
|
},
|
|
|
|
"tag": {
|
|
|
|
"name": "tag",
|
|
|
|
"automatic": True,
|
|
|
|
},
|
|
|
|
}
|
|
|
|
return annotations
|
|
|
|
|
|
|
|
def _set_is_role(self):
|
|
|
|
if os.path.isdir(os.path.join(os.getcwd(), "tasks")):
|
|
|
|
return True
|
|
|
|
|
|
|
|
def _validate(self, config):
|
|
|
|
try:
|
|
|
|
anyconfig.validate(config, self.schema, ac_schema_safe=False)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
|
|
schema_error = "Failed validating '{validator}' in schema{schema}".format(
|
|
|
|
validator=e.validator,
|
|
|
|
schema=format_as_index(list(e.relative_schema_path)[:-1])
|
|
|
|
)
|
|
|
|
|
|
|
|
# TODO: raise exception
|
|
|
|
print("{schema}: {msg}".format(schema=schema_error, msg=e.message))
|
|
|
|
sys.exit(999)
|
|
|
|
|
|
|
|
def _add_dict_branch(self, tree, vector, value):
|
|
|
|
key = vector[0]
|
|
|
|
tree[key] = value \
|
|
|
|
if len(vector) == 1 \
|
|
|
|
else self._add_dict_branch(tree[key] if key in tree else {},
|
|
|
|
vector[1:], value)
|
|
|
|
return tree
|
2019-10-07 06:52:00 +00:00
|
|
|
|
|
|
|
def get_annotations_definition(self, automatic=True):
|
|
|
|
annotations = {}
|
|
|
|
if automatic:
|
2019-10-07 12:44:45 +00:00
|
|
|
for k, item in self._annotations.items():
|
2019-10-07 06:52:00 +00:00
|
|
|
if "automatic" in item.keys() and item["automatic"]:
|
|
|
|
annotations[k] = item
|
|
|
|
return annotations
|
|
|
|
|
|
|
|
def get_annotations_names(self, automatic=True):
|
|
|
|
annotations = []
|
|
|
|
if automatic:
|
2019-10-07 12:44:45 +00:00
|
|
|
for k, item in self._annotations.items():
|
2019-10-07 06:52:00 +00:00
|
|
|
if "automatic" in item.keys() and item["automatic"]:
|
|
|
|
annotations.append(k)
|
|
|
|
return annotations
|
|
|
|
|
2019-10-07 12:44:45 +00:00
|
|
|
def get_template(self):
|
2019-10-07 06:52:00 +00:00
|
|
|
"""
|
|
|
|
Get the base dir for the template to use.
|
|
|
|
|
|
|
|
:return: str abs path
|
|
|
|
"""
|
2019-10-07 12:44:45 +00:00
|
|
|
template_dir = self.config.get("template_dir")
|
|
|
|
template = self.config.get("template")
|
|
|
|
return os.path.realpath(os.path.join(template_dir, template))
|
2019-10-07 06:52:00 +00:00
|
|
|
|
|
|
|
|
|
|
|
class SingleConfig(Config, metaclass=Singleton):
|
|
|
|
pass
|