ansible-later/ansiblelater/standard.py

363 lines
12 KiB
Python
Raw Normal View History

2019-04-04 14:06:18 +00:00
"""Standard definition."""
import codecs
import copy
import importlib
import inspect
import os
import pathlib
import re
from abc import ABCMeta
from abc import abstractmethod
from collections import defaultdict
2019-04-04 14:06:18 +00:00
import toolz
import yaml
from yamllint import linter
from yamllint.config import YamlLintConfig
2019-04-02 14:34:03 +00:00
from ansiblelater.exceptions import LaterAnsibleError
from ansiblelater.exceptions import LaterError
from ansiblelater.utils import Singleton
from ansiblelater.utils import sysexit_with_message
from ansiblelater.utils.yamlhelper import UnsafeTag
2021-04-13 20:27:26 +00:00
from ansiblelater.utils.yamlhelper import VaultTag
from ansiblelater.utils.yamlhelper import action_tasks
from ansiblelater.utils.yamlhelper import normalize_task
from ansiblelater.utils.yamlhelper import normalized_yaml
from ansiblelater.utils.yamlhelper import parse_yaml_linenumbers
2019-04-02 14:34:03 +00:00
class StandardMeta(type):
2019-04-02 14:34:03 +00:00
def __call__(cls, *args, **kwargs):
mcls = type.__call__(cls, *args)
setattr(mcls, "sid", cls.sid)
setattr(mcls, "description", getattr(cls, "description", "__unknown__"))
setattr(mcls, "helptext", getattr(cls, "helptext", ""))
setattr(mcls, "version", getattr(cls, "version", None))
setattr(mcls, "types", getattr(cls, "types", []))
return mcls
class StandardExtendedMeta(StandardMeta, ABCMeta):
pass
class StandardBase(object, metaclass=StandardExtendedMeta):
@property
@abstractmethod
def sid(self):
pass
@abstractmethod
def check(self, candidate, settings):
pass
2019-04-02 14:34:03 +00:00
2020-04-05 12:33:43 +00:00
def __repr__(self): # noqa
return "Standard: {description} (version: {version}, types: {types})".format(
description=self.description, version=self.version, types=self.types
2020-04-11 14:20:41 +00:00
)
@staticmethod
def get_tasks(candidate, settings):
errors = []
yamllines = []
if not candidate.faulty:
try:
with codecs.open(candidate.path, mode="rb", encoding="utf-8") as f:
yamllines = parse_yaml_linenumbers(f, candidate.path)
except LaterError as ex:
e = ex.original
errors.append(
StandardBase.Error(
e.problem_mark.line + 1, "syntax error: {msg}".format(msg=e.problem)
)
)
candidate.faulty = True
except LaterAnsibleError as e:
errors.append(
StandardBase.Error(e.line, "syntax error: {msg}".format(msg=e.message))
)
candidate.faulty = True
return yamllines, errors
@staticmethod
def get_action_tasks(candidate, settings):
tasks = []
errors = []
if not candidate.faulty:
try:
with codecs.open(candidate.path, mode="rb", encoding="utf-8") as f:
yamllines = parse_yaml_linenumbers(f, candidate.path)
if yamllines:
tasks = action_tasks(yamllines, candidate)
except LaterError as ex:
e = ex.original
errors.append(
StandardBase.Error(
e.problem_mark.line + 1, "syntax error: {msg}".format(msg=e.problem)
)
)
candidate.faulty = True
except LaterAnsibleError as e:
errors.append(StandardBase.Error(e.line, "syntax error: {}".format(e.message)))
candidate.faulty = True
return tasks, errors
@staticmethod
def get_normalized_task(task, candidate, settings):
normalized = None
errors = []
if not candidate.faulty:
try:
normalized = normalize_task(
copy.copy(task), candidate.path, settings["ansible"]["custom_modules"]
)
except LaterError as ex:
e = ex.original
errors.append(
StandardBase.Error(
e.problem_mark.line + 1, "syntax error: {msg}".format(msg=e.problem)
)
)
candidate.faulty = True
except LaterAnsibleError as e:
errors.append(
StandardBase.Error(e.line, "syntax error: {msg}".format(msg=e.message))
)
candidate.faulty = True
return normalized, errors
@staticmethod
def get_normalized_tasks(candidate, settings, full=False):
normalized = []
errors = []
if not candidate.faulty:
try:
with codecs.open(candidate.path, mode="rb", encoding="utf-8") as f:
yamllines = parse_yaml_linenumbers(f, candidate.path)
if yamllines:
tasks = action_tasks(yamllines, candidate)
for task in tasks:
# An empty `tags` block causes `None` to be returned if
# the `or []` is not present - `task.get("tags", [])`
# does not suffice.
# Deprecated.
if "skip_ansible_lint" in (task.get("tags") or []) and not full:
# No need to normalize_task if we are skipping it.
continue
if "skip_ansible_later" in (task.get("tags") or []) and not full:
# No need to normalize_task if we are skipping it.
continue
normalized.append(
normalize_task(
task, candidate.path, settings["ansible"]["custom_modules"]
)
)
except LaterError as ex:
e = ex.original
errors.append(
StandardBase.Error(
e.problem_mark.line + 1, "syntax error: {msg}".format(msg=e.problem)
)
)
candidate.faulty = True
except LaterAnsibleError as e:
errors.append(
StandardBase.Error(e.line, "syntax error: {msg}".format(msg=e.message))
)
candidate.faulty = True
return normalized, errors
@staticmethod
def get_normalized_yaml(candidate, settings, options=None):
errors = []
yamllines = []
if not candidate.faulty:
if not options:
options = defaultdict(dict)
options.update(remove_empty=True)
options.update(remove_markers=True)
try:
yamllines = normalized_yaml(candidate.path, options)
except LaterError as ex:
e = ex.original
errors.append(
StandardBase.Error(
e.problem_mark.line + 1, "syntax error: {msg}".format(msg=e.problem)
)
)
candidate.faulty = True
except LaterAnsibleError as e:
errors.append(
StandardBase.Error(e.line, "syntax error: {msg}".format(msg=e.message))
)
candidate.faulty = True
return yamllines, errors
@staticmethod
def get_raw_yaml(candidate, settings):
content = None
errors = []
if not candidate.faulty:
try:
with codecs.open(candidate.path, mode="rb", encoding="utf-8") as f:
yaml.add_constructor(
UnsafeTag.yaml_tag, UnsafeTag.yaml_constructor, Loader=yaml.SafeLoader
)
2021-04-13 20:27:26 +00:00
yaml.add_constructor(
VaultTag.yaml_tag, VaultTag.yaml_constructor, Loader=yaml.SafeLoader
)
content = yaml.safe_load(f)
except yaml.YAMLError as e:
errors.append(
StandardBase.Error(
e.problem_mark.line + 1, "syntax error: {msg}".format(msg=e.problem)
)
)
candidate.faulty = True
return content, errors
@staticmethod
def run_yamllint(candidate, options="extends: default"):
errors = []
if not candidate.faulty:
try:
with codecs.open(candidate.path, mode="rb", encoding="utf-8") as f:
for problem in linter.run(f, YamlLintConfig(options)):
errors.append(StandardBase.Error(problem.line, problem.desc))
except yaml.YAMLError as e:
errors.append(
StandardBase.Error(
e.problem_mark.line + 1, "syntax error: {msg}".format(msg=e.problem)
)
)
candidate.faulty = True
return errors
@staticmethod
def get_first_cmd_arg(task):
if "cmd" in task["action"]:
first_cmd_arg = task["action"]["cmd"].split()[0]
elif "argv" in task["action"]:
first_cmd_arg = task["action"]["argv"][0]
else:
first_cmd_arg = task["action"]["__ansible_arguments__"][0]
return first_cmd_arg
class Error(object):
"""Default error object created if a rule failed."""
def __init__(self, lineno, message, error_type=None, **kwargs):
"""
Initialize a new error object and returns None.
:param lineno: Line number where the error from de rule occures
:param message: Detailed error description provided by the rule
"""
self.lineno = lineno
self.message = message
self.kwargs = kwargs
for (key, value) in kwargs.items():
setattr(self, key, value)
def __repr__(self): # noqa
if self.lineno:
return "{no}: {msg}".format(no=self.lineno, msg=self.message)
else:
return " {msg}".format(msg=self.message)
def to_dict(self):
result = dict(lineno=self.lineno, message=self.message)
for (key, value) in self.kwargs.items():
result[key] = value
return result
class Result(object):
"""Generic result object."""
def __init__(self, candidate, errors=None):
self.candidate = candidate
self.errors = errors or []
def message(self):
return "\n".join(["{0}:{1}".format(self.candidate, error) for error in self.errors])
class StandardLoader():
def __init__(self, source):
self.rules = []
for s in source:
for p in pathlib.Path(s).glob("*.py"):
filename = os.path.splitext(os.path.basename(p))[0]
if not re.match(r"^[A-Za-z]+$", filename):
continue
spec = importlib.util.spec_from_file_location(filename, p)
module = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(module)
except (ImportError, NameError) as e:
sysexit_with_message(
"Failed to load roles file {module}: \n {msg}".format(
msg=str(e), module=filename
)
)
try:
for name, obj in inspect.getmembers(module):
if self._is_plugin(obj):
self.rules.append(obj())
except TypeError as e:
sysexit_with_message("Failed to load roles file: \n {msg}".format(msg=str(e)))
self.validate()
def _is_plugin(self, obj):
return inspect.isclass(obj) and issubclass(
obj, StandardBase
) and obj is not StandardBase and not None
def validate(self):
normalized_std = (list(toolz.remove(lambda x: x.sid == "", self.rules)))
unique_std = len(list(toolz.unique(normalized_std, key=lambda x: x.sid)))
all_std = len(normalized_std)
if not all_std == unique_std:
sysexit_with_message(
"Detect duplicate ID's in standards definition. Please use unique ID's only."
)
class SingleStandards(StandardLoader, metaclass=Singleton):
"""Singleton config class."""
pass