set configuration through env variable

This commit is contained in:
Robert Kaussow 2019-10-09 00:56:39 +02:00
parent 5f2215457c
commit f23aa552c1
6 changed files with 179 additions and 109 deletions

View File

@ -32,17 +32,17 @@ class AnsibleDoctor:
:return: args objec :return: args objec
""" """
# TODO: add function to print to stdout instead of file
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="Generate documentation from annotated Ansible roles using templates") description="Generate documentation from annotated Ansible roles using templates")
parser.add_argument("base_dir", nargs="?", help="role directory, (default: current working dir)") parser.add_argument("base_dir", nargs="?", help="role directory, (default: current working dir)")
parser.add_argument("-c", "--config", nargs="?", dest="config_file", help="location of configuration file") parser.add_argument("-c", "--config", nargs="?", dest="config_file", help="location of configuration file")
parser.add_argument("-o", "--output", action="store", dest="output_dir", type=str, parser.add_argument("-o", "--output", action="store", dest="output_dir", type=str,
help="output base dir") help="output base dir")
parser.add_argument("-f", "--force", action="store_true", dest="force_overwrite", parser.add_argument("-f", "--force", action="store_true", default=None, dest="force_overwrite",
help="force overwrite output file") help="force overwrite output file")
parser.add_argument("-d", "--dry-run", action="store_true", help="dry run without writing") parser.add_argument("-d", "--dry-run", action="store_true", default=None, dest="dry_run",
# parser.add_argument("-p", "--print", action="store_true", help="dry run without writing")
# help="print to stdout instead of file")
parser.add_argument("-v", dest="logging.level", action="append_const", const=-1, parser.add_argument("-v", dest="logging.level", action="append_const", const=-1,
help="increase log level") help="increase log level")
parser.add_argument("-q", dest="logging.level", action="append_const", parser.add_argument("-q", dest="logging.level", action="append_const",

View File

@ -8,6 +8,7 @@ import anyconfig
import jsonschema.exceptions import jsonschema.exceptions
import yaml import yaml
from appdirs import AppDirs from appdirs import AppDirs
import environs
from jsonschema._utils import format_as_index from jsonschema._utils import format_as_index
from pkg_resources import resource_filename from pkg_resources import resource_filename
@ -28,7 +29,97 @@ class Config():
- provides cli parameters - provides cli parameters
""" """
def __init__(self, args={}, config_file=None): SETTINGS = {
"config_file": {
"default": "",
"env": "CONFIG_FILE",
"type": environs.Env().str
},
"base_dir": {
"default": "",
"env": "BASE_DIR",
"type": environs.Env().str
},
"dry_run": {
"default": False,
"env": "DRY_RUN",
"file": True,
"type": environs.Env().bool
},
"logging.level": {
"default": "WARNING",
"env": "LOG_LEVEL",
"file": True,
"type": environs.Env().str
},
"logging.json": {
"default": False,
"env": "LOG_JSON",
"file": True,
"type": environs.Env().bool
},
"output_dir": {
"default": os.getcwd(),
"env": "OUTPUT_DIR",
"file": True,
"type": environs.Env().str
},
"template_dir": {
"default": os.path.join(os.path.dirname(os.path.realpath(__file__)), "templates"),
"env": "TEMPLATE_DIR",
"file": True,
"type": environs.Env().str
},
"template": {
"default": "readme",
"env": "TEMPLATE",
"file": True,
"type": environs.Env().str
},
"force_overwrite": {
"default": False,
"env": "FORCE_OVERWRITE",
"file": True,
"type": environs.Env().bool
},
"custom_header": {
"default": "",
"env": "CUSTOM_HEADER",
"file": True,
"type": environs.Env().str
},
"exclude_files": {
"default": [],
"env": "EXCLUDE_FILES",
"file": True,
"type": environs.Env().list
},
}
ANNOTATIONS = {
"meta": {
"name": "meta",
"automatic": True
},
"todo": {
"name": "todo",
"automatic": True,
},
"var": {
"name": "var",
"automatic": True,
},
"example": {
"name": "example",
"regex": r"(\#\ *\@example\ *\: *.*)"
},
"tag": {
"name": "tag",
"automatic": True,
},
}
def __init__(self, args={}):
""" """
Initialize a new settings class. Initialize a new settings class.
@ -37,135 +128,118 @@ class Config():
:returns: None :returns: None
""" """
self.config_file = None self._args = args
self.schema = None self._schema = None
self.args = self._set_args(args) self.config_file = default_config_file
self.base_dir = self._set_base_dir() self.base_dir = os.getcwd()
self.config = None
self._set_config()
self.is_role = self._set_is_role() or False self.is_role = self._set_is_role() or False
self.dry_run = self._set_dry_run() or False
self.config = self._get_config()
self._annotations = self._set_annotations()
self._post_processing()
def _set_args(self, args): def _get_args(self, args):
defaults = self._get_defaults() cleaned = dict(filter(lambda item: item[1] is not None, args.items()))
if args.get("config_file"):
self.config_file = os.path.abspath(os.path.expanduser(os.path.expandvars(args.get("config_file"))))
else:
self.config_file = default_config_file
args.pop("config_file", None) normalized = {}
tmp_args = dict(filter(lambda item: item[1] is not None, args.items())) for key, value in cleaned.items():
normalized = self._add_dict_branch(normalized, key.split("."), value)
tmp_dict = {}
for key, value in tmp_args.items():
tmp_dict = self._add_dict_branch(tmp_dict, key.split("."), value)
# Override correct log level from argparse # Override correct log level from argparse
levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
log_level = levels.index(defaults["logging"]["level"]) log_level = levels.index(self.SETTINGS["logging.level"]["default"])
if tmp_dict.get("logging"): if normalized.get("logging"):
for adjustment in tmp_dict["logging"]["level"]: for adjustment in normalized["logging"]["level"]:
log_level = min(len(levels) - 1, max(log_level + adjustment, 0)) log_level = min(len(levels) - 1, max(log_level + adjustment, 0))
tmp_dict["logging"]["level"] = levels[log_level] normalized["logging"]["level"] = levels[log_level]
return tmp_dict return normalized
def _get_defaults(self): def _get_defaults(self):
default_template = os.path.join(os.path.dirname(os.path.realpath(__file__)), "templates") normalized = {}
defaults = { for key, item in self.SETTINGS.items():
"logging": { normalized = self._add_dict_branch(normalized, key.split("."), item["default"])
"level": "WARNING",
"json": False
},
"output_dir": os.getcwd(),
"template_dir": default_template,
"template": "readme",
"force_overwrite": False,
"appent_to_file": "",
"exclude_files": [],
}
self.schema = anyconfig.gen_schema(defaults) self.schema = anyconfig.gen_schema(normalized)
return defaults return normalized
def _get_config(self): def _get_envs(self):
normalized = {}
for key, item in self.SETTINGS.items():
if item.get("env"):
prefix = "ANSIBLE_DOCTOR_"
envname = prefix + item["env"]
try:
value = item["type"](envname)
normalized = self._add_dict_branch(normalized, key.split("."), value)
except environs.EnvError as e:
if '"{}" not set'.format(envname) in str(e):
pass
else:
raise ansibledoctor.Exception.ConfigError('Unable to read environment variable', str(e))
return normalized
def _set_config(self):
args = self._get_args(self._args)
envs = self._get_envs()
defaults = self._get_defaults() defaults = self._get_defaults()
# preset config file path
if envs.get("config_file"):
self.config_file = self._normalize_path(envs.get("config_file"))
if envs.get("base_dir"):
self.base_dir = self._normalize_path(envs.get("base_dir"))
if args.get("config_file"):
self.config_file = self._normalize_path(args.get("config_file"))
if args.get("base_dir"):
self.base_dir = self._normalize_path(args.get("base_dir"))
source_files = [] source_files = []
source_files.append(self.config_file) source_files.append(self.config_file)
source_files.append(os.path.join(self.base_dir, ".ansibledoctor")) source_files.append(os.path.join(self.base_dir, ".ansibledoctor"))
source_files.append(os.path.join(self.base_dir, ".ansibledoctor.yml")) source_files.append(os.path.join(self.base_dir, ".ansibledoctor.yml"))
source_files.append(os.path.join(self.base_dir, ".ansibledoctor.yaml")) source_files.append(os.path.join(self.base_dir, ".ansibledoctor.yaml"))
cli_options = self.args
for config in source_files: for config in source_files:
if config and os.path.exists(config): if config and os.path.exists(config):
with open(config, "r", encoding="utf8") as stream: with open(config, "r", encoding="utf8") as stream:
s = stream.read() s = stream.read()
try: try:
sdict = yaml.safe_load(s) file_dict = yaml.safe_load(s)
except yaml.parser.ParserError as e: except yaml.parser.ParserError as e:
message = "{}\n{}".format(e.problem, str(e.problem_mark)) message = "{}\n{}".format(e.problem, str(e.problem_mark))
raise ansibledoctor.Exception.ConfigError("Unable to read file", message) raise ansibledoctor.Exception.ConfigError("Unable to read file", message)
if self._validate(sdict): if self._validate(file_dict):
anyconfig.merge(defaults, sdict, ac_merge=anyconfig.MS_DICTS) anyconfig.merge(defaults, file_dict, ac_merge=anyconfig.MS_DICTS)
defaults["logging"]["level"] = defaults["logging"]["level"].upper() defaults["logging"]["level"] = defaults["logging"]["level"].upper()
if cli_options and self._validate(cli_options): if self._validate(envs):
anyconfig.merge(defaults, cli_options, ac_merge=anyconfig.MS_DICTS) anyconfig.merge(defaults, envs, ac_merge=anyconfig.MS_DICTS)
return defaults if self._validate(args):
anyconfig.merge(defaults, args, ac_merge=anyconfig.MS_DICTS)
def _set_annotations(self): defaults["output_dir"] = self._normalize_path(defaults["output_dir"])
annotations = { defaults["template_dir"] = self._normalize_path(defaults["template_dir"])
"meta": { defaults["custom_header"] = self._normalize_path(defaults["custom_header"])
"name": "meta",
"automatic": True
},
"todo": {
"name": "todo",
"automatic": True,
},
"var": {
"name": "var",
"automatic": True,
},
"example": {
"name": "example",
"regex": r"(\#\ *\@example\ *\: *.*)"
},
"tag": {
"name": "tag",
"automatic": True,
},
}
return annotations
def _set_base_dir(self): if defaults.get("config_file"):
if self.args.get("base_dir"): defaults.pop("config_file")
real = os.path.abspath(os.path.expanduser(os.path.expandvars(self.args.get("base_dir")))) if defaults.get("base_dir"):
defaults.pop("base_dir")
self.config = defaults
def _normalize_path(self, path):
if not os.path.isabs(path):
return os.path.abspath(os.path.expanduser(os.path.expandvars(path)))
else: else:
real = os.getcwd() return path
return real
def _set_is_role(self): def _set_is_role(self):
if os.path.isdir(os.path.join(self.base_dir, "tasks")): if os.path.isdir(os.path.join(self.base_dir, "tasks")):
return True return True
def _set_dry_run(self):
if self.args.get("dry_run"):
return True
def _post_processing(self):
# Override append file path
append_file = self.config.get("append_to_file")
if append_file:
if not os.path.isabs(os.path.expanduser(os.path.expandvars(append_file))):
append_file = os.path.join(self.base_dir, append_file)
self.config["append_to_file"] = os.path.abspath(os.path.expanduser(os.path.expandvars(append_file)))
def _validate(self, config): def _validate(self, config):
try: try:
anyconfig.validate(config, self.schema, ac_schema_safe=False) anyconfig.validate(config, self.schema, ac_schema_safe=False)
@ -183,14 +257,13 @@ class Config():
key = vector[0] key = vector[0]
tree[key] = value \ tree[key] = value \
if len(vector) == 1 \ if len(vector) == 1 \
else self._add_dict_branch(tree[key] if key in tree else {}, else self._add_dict_branch(tree[key] if key in tree else {}, vector[1:], value)
vector[1:], value)
return tree return tree
def get_annotations_definition(self, automatic=True): def get_annotations_definition(self, automatic=True):
annotations = {} annotations = {}
if automatic: if automatic:
for k, item in self._annotations.items(): for k, item in self.ANNOTATIONS.items():
if "automatic" in item.keys() and item["automatic"]: if "automatic" in item.keys() and item["automatic"]:
annotations[k] = item annotations[k] = item
return annotations return annotations
@ -198,7 +271,7 @@ class Config():
def get_annotations_names(self, automatic=True): def get_annotations_names(self, automatic=True):
annotations = [] annotations = []
if automatic: if automatic:
for k, item in self._annotations.items(): for k, item in self.ANNOTATIONS.items():
if "automatic" in item.keys() and item["automatic"]: if "automatic" in item.keys() and item["automatic"]:
annotations.append(k) annotations.append(k)
return annotations return annotations

View File

@ -44,7 +44,6 @@ class Generator:
base_dir = self.config.get_template() base_dir = self.config.get_template()
for file in glob.iglob(base_dir + "/**/*." + self.extension, recursive=True): for file in glob.iglob(base_dir + "/**/*." + self.extension, recursive=True):
relative_file = file[len(base_dir) + 1:] relative_file = file[len(base_dir) + 1:]
if ntpath.basename(file)[:1] != "_": if ntpath.basename(file)[:1] != "_":
self.logger.debug("Found template file: " + relative_file) self.logger.debug("Found template file: " + relative_file)
@ -53,7 +52,7 @@ class Generator:
self.logger.debug("Ignoring template file: " + relative_file) self.logger.debug("Ignoring template file: " + relative_file)
def _create_dir(self, directory): def _create_dir(self, directory):
if not self.config.dry_run: if not self.config.config["dry_run"]:
os.makedirs(directory, exist_ok=True) os.makedirs(directory, exist_ok=True)
else: else:
self.logger.info("Creating dir: " + directory) self.logger.info("Creating dir: " + directory)
@ -67,7 +66,7 @@ class Generator:
files_to_overwite.append(doc_file) files_to_overwite.append(doc_file)
if len(files_to_overwite) > 0 and self.config.config.get("force_overwrite") is False: if len(files_to_overwite) > 0 and self.config.config.get("force_overwrite") is False:
if not self.config.dry_run: if not self.config.config["dry_run"]:
self.logger.warn("This files will be overwritten:") self.logger.warn("This files will be overwritten:")
print(*files_to_overwite, sep="\n") print(*files_to_overwite, sep="\n")
@ -103,7 +102,7 @@ class Generator:
jenv.filters["to_nice_yaml"] = self._to_nice_yaml jenv.filters["to_nice_yaml"] = self._to_nice_yaml
jenv.filters["deep_get"] = self._deep_get jenv.filters["deep_get"] = self._deep_get
data = jenv.from_string(data).render(role_data, role=role_data) data = jenv.from_string(data).render(role_data, role=role_data)
if not self.config.dry_run: if not self.config.config["dry_run"]:
with open(doc_file, "wb") as outfile: with open(doc_file, "wb") as outfile:
outfile.write(custom_header.encode("utf-8")) outfile.write(custom_header.encode("utf-8"))
outfile.write(data.encode("utf-8")) outfile.write(data.encode("utf-8"))

View File

@ -1,7 +1,4 @@
# demo-role-custom-header # demo-role
[![Build Status](https://cloud.drone.io/api/badges/xoxys/ansible-later/status.svg)](https://cloud.drone.io/xoxys/ansible-later)
![License](https://img.shields.io/pypi/l/ansible-later)
Role to demonstrate ansible-doctor Role to demonstrate ansible-doctor

View File

@ -1,2 +1,2 @@
--- ---
append_to_file: HEADER.md custom_header: HEADER.md

View File

@ -63,7 +63,8 @@ setup(
"pathspec", "pathspec",
"python-json-logger", "python-json-logger",
"jsonschema", "jsonschema",
"jinja2" "jinja2",
"environs"
], ],
entry_points={ entry_points={
"console_scripts": [ "console_scripts": [