ansible-later/ansiblelater/standard.py

370 lines
13 KiB
Python

"""Standard definition."""
import codecs
import importlib
import inspect
import os
import pathlib
import re
from abc import ABCMeta
from abc import abstractmethod
from collections import defaultdict
import toolz
import yaml
from yamllint import linter
from yamllint.config import YamlLintConfig
from ansiblelater.exceptions import LaterAnsibleError
from ansiblelater.exceptions import LaterError
from ansiblelater.utils import Singleton
from ansiblelater.utils import sysexit_with_message
from ansiblelater.utils.yamlhelper import UnsafeTag
from ansiblelater.utils.yamlhelper import VaultTag
from ansiblelater.utils.yamlhelper import action_tasks
from ansiblelater.utils.yamlhelper import normalize_task
from ansiblelater.utils.yamlhelper import normalized_yaml
from ansiblelater.utils.yamlhelper import parse_yaml_linenumbers
class StandardMeta(type):
def __call__(cls, *args, **kwargs):
mcls = type.__call__(cls, *args)
setattr(mcls, "sid", cls.sid)
setattr(mcls, "description", getattr(cls, "description", "__unknown__"))
setattr(mcls, "helptext", getattr(cls, "helptext", ""))
setattr(mcls, "version", getattr(cls, "version", None))
setattr(mcls, "types", getattr(cls, "types", []))
return mcls
class StandardExtendedMeta(StandardMeta, ABCMeta):
pass
class StandardBase(object, metaclass=StandardExtendedMeta):
@property
@abstractmethod
def sid(self):
pass
@abstractmethod
def check(self, candidate, settings):
pass
def __repr__(self): # noqa
return "Standard: {description} (version: {version}, types: {types})".format(
description=self.description, version=self.version, types=self.types
)
@staticmethod
def get_tasks(candidate, settings):
errors = []
yamllines = []
if not candidate.faulty:
try:
with codecs.open(candidate.path, mode="rb", encoding="utf-8") as f:
yamllines = parse_yaml_linenumbers(f, candidate.path)
except LaterError as ex:
e = ex.original
errors.append(
StandardBase.Error(
e.problem_mark.line + 1, "syntax error: {msg}".format(msg=e.problem)
)
)
candidate.faulty = True
except LaterAnsibleError as e:
errors.append(
StandardBase.Error(e.line, "syntax error: {msg}".format(msg=e.message))
)
candidate.faulty = True
return yamllines, errors
@staticmethod
def get_action_tasks(candidate, settings):
tasks = []
errors = []
if not candidate.faulty:
try:
with codecs.open(candidate.path, mode="rb", encoding="utf-8") as f:
yamllines = parse_yaml_linenumbers(f, candidate.path)
if yamllines:
tasks = action_tasks(yamllines, candidate)
except LaterError as ex:
e = ex.original
errors.append(
StandardBase.Error(
e.problem_mark.line + 1, "syntax error: {msg}".format(msg=e.problem)
)
)
candidate.faulty = True
except LaterAnsibleError as e:
errors.append(StandardBase.Error(e.line, "syntax error: {}".format(e.message)))
candidate.faulty = True
return tasks, errors
@staticmethod
def get_normalized_task(task, candidate, settings):
normalized = None
errors = []
if not candidate.faulty:
try:
normalized = normalize_task(
task, candidate.path, settings["ansible"]["custom_modules"]
)
except LaterError as ex:
e = ex.original
errors.append(
StandardBase.Error(
e.problem_mark.line + 1, "syntax error: {msg}".format(msg=e.problem)
)
)
candidate.faulty = True
except LaterAnsibleError as e:
errors.append(
StandardBase.Error(e.line, "syntax error: {msg}".format(msg=e.message))
)
candidate.faulty = True
return normalized, errors
@staticmethod
def get_normalized_tasks(candidate, settings, full=False):
normalized = []
errors = []
if not candidate.faulty:
try:
with codecs.open(candidate.path, mode="rb", encoding="utf-8") as f:
yamllines = parse_yaml_linenumbers(f, candidate.path)
if yamllines:
tasks = action_tasks(yamllines, candidate)
for task in tasks:
# An empty `tags` block causes `None` to be returned if
# the `or []` is not present - `task.get("tags", [])`
# does not suffice.
# Deprecated.
if "skip_ansible_lint" in (task.get("tags") or []) and not full:
# No need to normalize_task if we are skipping it.
continue
if "skip_ansible_later" in (task.get("tags") or []) and not full:
# No need to normalize_task if we are skipping it.
continue
normalized.append(
normalize_task(
task, candidate.path, settings["ansible"]["custom_modules"]
)
)
except LaterError as ex:
e = ex.original
errors.append(
StandardBase.Error(
e.problem_mark.line + 1, "syntax error: {msg}".format(msg=e.problem)
)
)
candidate.faulty = True
except LaterAnsibleError as e:
errors.append(
StandardBase.Error(e.line, "syntax error: {msg}".format(msg=e.message))
)
candidate.faulty = True
return normalized, errors
@staticmethod
def get_normalized_yaml(candidate, settings, options=None):
errors = []
yamllines = []
if not candidate.faulty:
if not options:
options = defaultdict(dict)
options.update(remove_empty=True)
options.update(remove_markers=True)
try:
yamllines = normalized_yaml(candidate.path, options)
except LaterError as ex:
e = ex.original
errors.append(
StandardBase.Error(
e.problem_mark.line + 1, "syntax error: {msg}".format(msg=e.problem)
)
)
candidate.faulty = True
except LaterAnsibleError as e:
errors.append(
StandardBase.Error(e.line, "syntax error: {msg}".format(msg=e.message))
)
candidate.faulty = True
return yamllines, errors
@staticmethod
def get_raw_yaml(candidate, settings):
content = None
errors = []
if not candidate.faulty:
try:
with codecs.open(candidate.path, mode="rb", encoding="utf-8") as f:
yaml.add_constructor(
UnsafeTag.yaml_tag, UnsafeTag.yaml_constructor, Loader=yaml.SafeLoader
)
yaml.add_constructor(
VaultTag.yaml_tag, VaultTag.yaml_constructor, Loader=yaml.SafeLoader
)
content = yaml.safe_load(f)
except yaml.YAMLError as e:
errors.append(
StandardBase.Error(
e.problem_mark.line + 1, "syntax error: {msg}".format(msg=e.problem)
)
)
candidate.faulty = True
return content, errors
@staticmethod
def run_yamllint(candidate, options="extends: default"):
errors = []
if not candidate.faulty:
try:
with codecs.open(candidate.path, mode="rb", encoding="utf-8") as f:
yaml.add_constructor(
UnsafeTag.yaml_tag, UnsafeTag.yaml_constructor, Loader=yaml.SafeLoader
)
yaml.add_constructor(
VaultTag.yaml_tag, VaultTag.yaml_constructor, Loader=yaml.SafeLoader
)
yaml.safe_load(f)
for problem in linter.run(f, YamlLintConfig(options)):
errors.append(StandardBase.Error(problem.line, problem.desc))
except yaml.YAMLError as e:
errors.append(
StandardBase.Error(
e.problem_mark.line + 1, "syntax error: {msg}".format(msg=e.problem)
)
)
candidate.faulty = True
return errors
@staticmethod
def get_first_cmd_arg(task):
if "cmd" in task["action"]:
first_cmd_arg = task["action"]["cmd"].split()[0]
elif "argv" in task["action"]:
first_cmd_arg = task["action"]["argv"][0]
else:
first_cmd_arg = task["action"]["__ansible_arguments__"][0]
return first_cmd_arg
class Error(object):
"""Default error object created if a rule failed."""
def __init__(self, lineno, message, error_type=None, **kwargs):
"""
Initialize a new error object and returns None.
:param lineno: Line number where the error from de rule occures
:param message: Detailed error description provided by the rule
"""
self.lineno = lineno
self.message = message
self.kwargs = kwargs
for (key, value) in kwargs.items():
setattr(self, key, value)
def __repr__(self): # noqa
if self.lineno:
return "{no}: {msg}".format(no=self.lineno, msg=self.message)
else:
return " {msg}".format(msg=self.message)
def to_dict(self):
result = dict(lineno=self.lineno, message=self.message)
for (key, value) in self.kwargs.items():
result[key] = value
return result
class Result(object):
"""Generic result object."""
def __init__(self, candidate, errors=None):
self.candidate = candidate
self.errors = errors or []
def message(self):
return "\n".join(["{0}:{1}".format(self.candidate, error) for error in self.errors])
class StandardLoader():
def __init__(self, source):
self.rules = []
for s in source:
for p in pathlib.Path(s).glob("*.py"):
filename = os.path.splitext(os.path.basename(p))[0]
if not re.match(r"^[A-Za-z]+$", filename):
continue
spec = importlib.util.spec_from_file_location(filename, p)
module = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(module)
except (ImportError, NameError) as e:
sysexit_with_message(
"Failed to load roles file {module}: \n {msg}".format(
msg=str(e), module=filename
)
)
try:
for name, obj in inspect.getmembers(module):
if self._is_plugin(obj):
self.rules.append(obj())
except TypeError as e:
sysexit_with_message("Failed to load roles file: \n {msg}".format(msg=str(e)))
self.validate()
def _is_plugin(self, obj):
return inspect.isclass(obj) and issubclass(
obj, StandardBase
) and obj is not StandardBase and not None
def validate(self):
normalized_std = (list(toolz.remove(lambda x: x.sid == "", self.rules)))
unique_std = len(list(toolz.unique(normalized_std, key=lambda x: x.sid)))
all_std = len(normalized_std)
if not all_std == unique_std:
sysexit_with_message(
"Detect duplicate ID's in standards definition. Please use unique ID's only."
)
class SingleStandards(StandardLoader, metaclass=Singleton):
"""Singleton config class."""
pass