ansible-later/ansiblelater/rule.py

351 lines
12 KiB
Python

"""Rule definition."""
import copy
import importlib
import inspect
import os
import pathlib
import re
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from urllib.parse import urlparse
import toolz
import yaml
from yamllint import linter
from yamllint.config import YamlLintConfig
from ansiblelater.exceptions import LaterAnsibleError, LaterError
from ansiblelater.utils import Singleton, sysexit_with_message
from ansiblelater.utils.yamlhelper import (
UnsafeTag,
VaultTag,
action_tasks,
normalize_task,
normalized_yaml,
parse_yaml_linenumbers,
)
class RuleMeta(type):
def __call__(cls, *args):
mcls = type.__call__(cls, *args)
mcls.rid = cls.rid
mcls.description = getattr(cls, "description", "__unknown__")
mcls.helptext = getattr(cls, "helptext", "")
mcls.types = getattr(cls, "types", [])
return mcls
class RuleExtendedMeta(RuleMeta, ABCMeta):
pass
class RuleBase(metaclass=RuleExtendedMeta):
SHELL_PIPE_CHARS = "&|<>;$\n*[]{}?"
@property
@abstractmethod
def rid(self):
pass
@abstractmethod
def check(self, candidate, settings):
pass
def __repr__(self):
return f"Rule: {self.description} (types: {self.types})"
@staticmethod
def get_tasks(candidate, settings): # noqa
errors = []
yamllines = []
if not candidate.faulty:
try:
with open(candidate.path, encoding="utf-8") as f:
yamllines = parse_yaml_linenumbers(f, candidate.path)
except LaterError as ex:
e = ex.original
errors.append(
RuleBase.Error(e.problem_mark.line + 1, f"syntax error: {e.problem}")
)
candidate.faulty = True
except LaterAnsibleError as e:
errors.append(RuleBase.Error(e.line, f"syntax error: {e.message}"))
candidate.faulty = True
return yamllines, errors
@staticmethod
def get_action_tasks(candidate, settings): # noqa
tasks = []
errors = []
if not candidate.faulty:
try:
with open(candidate.path, encoding="utf-8") as f:
yamllines = parse_yaml_linenumbers(f, candidate.path)
if yamllines:
tasks = action_tasks(yamllines, candidate)
except LaterError as ex:
e = ex.original
errors.append(
RuleBase.Error(e.problem_mark.line + 1, f"syntax error: {e.problem}")
)
candidate.faulty = True
except LaterAnsibleError as e:
errors.append(RuleBase.Error(e.line, f"syntax error: {e.message}"))
candidate.faulty = True
return tasks, errors
@staticmethod
def get_normalized_task(task, candidate, settings):
normalized = None
errors = []
if not candidate.faulty:
try:
normalized = normalize_task(
copy.copy(task), candidate.path, settings["ansible"]["custom_modules"]
)
except LaterError as ex:
e = ex.original
errors.append(
RuleBase.Error(e.problem_mark.line + 1, f"syntax error: {e.problem}")
)
candidate.faulty = True
except LaterAnsibleError as e:
errors.append(RuleBase.Error(e.line, f"syntax error: {e.message}"))
candidate.faulty = True
return normalized, errors
@staticmethod
def get_normalized_tasks(candidate, settings, full=False):
normalized = []
errors = []
if not candidate.faulty:
try:
with open(candidate.path, encoding="utf-8") as f:
yamllines = parse_yaml_linenumbers(f, candidate.path)
if yamllines:
tasks = action_tasks(yamllines, candidate)
for task in tasks:
# An empty `tags` block causes `None` to be returned if
# the `or []` is not present - `task.get("tags", [])`
# does not suffice.
# Deprecated.
if "skip_ansible_lint" in (task.get("tags") or []) and not full:
# No need to normalize_task if we are skipping it.
continue
if "skip_ansible_later" in (task.get("tags") or []) and not full:
# No need to normalize_task if we are skipping it.
continue
normalized_task = normalize_task(
task, candidate.path, settings["ansible"]["custom_modules"]
)
normalized_task["__raw_task__"] = task
normalized.append(normalized_task)
except LaterError as ex:
e = ex.original
errors.append(
RuleBase.Error(e.problem_mark.line + 1, f"syntax error: {e.problem}")
)
candidate.faulty = True
except LaterAnsibleError as e:
errors.append(RuleBase.Error(e.line, f"syntax error: {e.message}"))
candidate.faulty = True
return normalized, errors
@staticmethod
def get_normalized_yaml(candidate, settings, options=None): # noqa
errors = []
yamllines = []
if not candidate.faulty:
if not options:
options = defaultdict(dict)
options.update(remove_empty=True)
options.update(remove_markers=True)
try:
yamllines = normalized_yaml(candidate.path, options)
except LaterError as ex:
e = ex.original
errors.append(
RuleBase.Error(e.problem_mark.line + 1, f"syntax error: {e.problem}")
)
candidate.faulty = True
except LaterAnsibleError as e:
errors.append(RuleBase.Error(e.line, f"syntax error: {e.message}"))
candidate.faulty = True
return yamllines, errors
@staticmethod
def get_raw_yaml(candidate, settings): # noqa
content = None
errors = []
if not candidate.faulty:
try:
with open(candidate.path, encoding="utf-8") as f:
yaml.add_constructor(
UnsafeTag.yaml_tag, UnsafeTag.yaml_constructor, Loader=yaml.SafeLoader
)
yaml.add_constructor(
VaultTag.yaml_tag, VaultTag.yaml_constructor, Loader=yaml.SafeLoader
)
content = yaml.safe_load(f)
except yaml.YAMLError as e:
errors.append(
RuleBase.Error(e.problem_mark.line + 1, f"syntax error: {e.problem}")
)
candidate.faulty = True
return content, errors
@staticmethod
def run_yamllint(candidate, options="extends: default"):
errors = []
if not candidate.faulty:
try:
with open(candidate.path, encoding="utf-8") as f:
for problem in linter.run(f, YamlLintConfig(options)):
errors.append(RuleBase.Error(problem.line, problem.desc))
except yaml.YAMLError as e:
errors.append(
RuleBase.Error(e.problem_mark.line + 1, f"syntax error: {e.problem}")
)
candidate.faulty = True
except (TypeError, ValueError) as e:
errors.append(RuleBase.Error(None, f"yamllint error: {e}"))
candidate.faulty = True
return errors
@staticmethod
def get_first_cmd_arg(task):
if "cmd" in task["action"]:
first_cmd_arg = task["action"]["cmd"].split()[0]
elif "argv" in task["action"]:
first_cmd_arg = task["action"]["argv"][0]
else:
first_cmd_arg = task["action"]["__ansible_arguments__"][0]
return first_cmd_arg
@staticmethod
def get_safe_cmd(task):
if "cmd" in task["action"]:
cmd = task["action"].get("cmd", "")
else:
cmd = " ".join(task["action"].get("__ansible_arguments__", []))
cmd = re.sub(r"{{.+?}}", "JINJA_EXPRESSION", cmd)
cmd = re.sub(r"{%.+?%}", "JINJA_STATEMENT", cmd)
cmd = re.sub(r"{#.+?#}", "JINJA_COMMENT", cmd)
parts = cmd.split()
parts = [p if not urlparse(p.strip('"').strip("'")).scheme else "URL" for p in parts]
return " ".join(parts)
class Error:
"""Default error object created if a rule failed."""
def __init__(self, lineno, message, **kwargs):
"""
Initialize a new error object and returns None.
:param lineno: Line number where the error from de rule occures
:param message: Detailed error description provided by the rule
"""
self.lineno = lineno
self.message = message
self.kwargs = kwargs
for key, value in kwargs.items():
setattr(self, key, value)
def __repr__(self):
if self.lineno:
return f"{self.lineno}: {self.message}"
return f" {self.message}"
def to_dict(self):
result = {"lineno": self.lineno, "message": self.message}
for key, value in self.kwargs.items():
result[key] = value
return result
class Result:
"""Generic result object."""
def __init__(self, candidate, errors=None):
self.candidate = candidate
self.errors = errors or []
def message(self):
return "\n".join([f"{self.candidate}:{error}" for error in self.errors])
class RulesLoader:
def __init__(self, source):
self.rules = []
for s in source:
for p in pathlib.Path(s).glob("*.py"):
filename = os.path.splitext(os.path.basename(p))[0]
if not re.match(r"^[A-Za-z]+$", filename):
continue
spec = importlib.util.spec_from_file_location(filename, p)
module = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(module)
except (ImportError, NameError) as e:
sysexit_with_message(f"Failed to load roles file {filename}: \n {e!s}")
try:
for _name, obj in inspect.getmembers(module):
if self._is_plugin(obj):
self.rules.append(obj())
except TypeError as e:
sysexit_with_message(f"Failed to load roles file: \n {e!s}")
self.validate()
def _is_plugin(self, obj):
return (
inspect.isclass(obj) and issubclass(obj, RuleBase) and obj is not RuleBase and not None
)
def validate(self):
normalize_rule = list(toolz.remove(lambda x: x.rid == "", self.rules))
unique_rule = len(list(toolz.unique(normalize_rule, key=lambda x: x.rid)))
all_rules = len(normalize_rule)
if all_rules != unique_rule:
sysexit_with_message(
"Found duplicate tags in rules definition. Please use unique tags only."
)
class SingleRules(RulesLoader, metaclass=Singleton):
"""Singleton config class."""
pass