ansible-later/ansiblelater/candidate.py
2023-06-28 08:19:08 +02:00

322 lines
9.6 KiB
Python

"""Review candidates."""
import codecs
import copy
import os
import re
from distutils.version import LooseVersion
from ansible.plugins.loader import module_loader
from ansiblelater import LOG, utils
from ansiblelater.logger import flag_extra
from ansiblelater.standard import SingleStandards, StandardBase
class Candidate:
"""
Meta object for all files which later has to process.
Each file passed to later will be classified by type and
bundled with necessary meta informations for rule processing.
"""
def __init__(self, filename, settings={}, standards=[]): # noqa
self.path = filename
self.binary = False
self.vault = False
self.filetype = type(self).__name__.lower()
self.faulty = False
self.config = settings.config
self.settings = settings
try:
with codecs.open(filename, mode="rb", encoding="utf-8") as f:
if f.readline().startswith("$ANSIBLE_VAULT"):
self.vault = True
except UnicodeDecodeError:
self.binary = True
def _get_version(self):
name = type(self).__name__
path = self.path
version = None
config_version = self.config["rules"]["version"].strip()
if config_version:
version_config_re = re.compile(r"([\d.]+)")
match = version_config_re.match(config_version)
if match:
version = match.group(1)
if not self.binary:
if isinstance(self, RoleFile):
parentdir = os.path.dirname(os.path.abspath(self.path))
while parentdir != os.path.dirname(parentdir):
meta_file = os.path.join(parentdir, "meta", "main.yml")
if os.path.exists(meta_file):
path = meta_file
break
parentdir = os.path.dirname(parentdir)
version_file_re = re.compile(r"^# Standards:\s*([\d.]+)")
with codecs.open(path, mode="rb", encoding="utf-8") as f:
for line in f:
match = version_file_re.match(line)
if match:
version = match.group(1)
if version:
LOG.info(f"{name} {path} declares standards version {version}")
return version
def _filter_standards(self):
target_standards = []
includes = self.config["rules"]["filter"]
excludes = self.config["rules"]["exclude_filter"]
if len(includes) == 0:
includes = [s.sid for s in self.standards]
for standard in self.standards:
if standard.sid in includes and standard.sid not in excludes:
target_standards.append(standard)
return target_standards
def review(self):
errors = 0
self.standards = SingleStandards(self.config["rules"]["standards"]).rules
self.version_config = self._get_version()
self.version = self.version_config or utils.standards_latest(self.standards)
for standard in self._filter_standards():
if type(self).__name__.lower() not in standard.types:
continue
result = standard.check(self, self.config)
if not result:
LOG.error(
f"Standard '{standard.sid}' returns an empty result object. Check failed!"
)
continue
labels = {
"tag": "review",
"standard": standard.description,
"file": self.path,
"passed": True
}
if standard.sid and standard.sid.strip():
labels["sid"] = standard.sid
for err in result.errors:
err_labels = copy.copy(labels)
err_labels["passed"] = False
sid = self._format_id(standard.sid)
path = self.path
description = standard.description
if isinstance(err, StandardBase.Error):
err_labels.update(err.to_dict())
if not standard.version:
LOG.warning(
f"{sid}Best practice '{description}' not met:\n{path}:{err}",
extra=flag_extra(err_labels)
)
elif LooseVersion(standard.version) > LooseVersion(self.version):
LOG.warning(
f"{sid}Future standard '{description}' not met:\n{path}:{err}",
extra=flag_extra(err_labels)
)
else:
msg = f"{sid}Standard '{description}' not met:\n{path}:{err}"
if standard.sid not in self.config["rules"]["warning_filter"]:
LOG.error(msg, extra=flag_extra(err_labels))
errors = errors + 1
else:
LOG.warning(msg, extra=flag_extra(err_labels))
return errors
@staticmethod
def classify(filename, settings={}, standards=[]): # noqa
parentdir = os.path.basename(os.path.dirname(filename))
basename = os.path.basename(filename)
ext = os.path.splitext(filename)[1][1:]
if parentdir in ["tasks"]:
return Task(filename, settings, standards)
if parentdir in ["handlers"]:
return Handler(filename, settings, standards)
if parentdir in ["vars", "defaults"]:
return RoleVars(filename, settings, standards)
if "group_vars" in filename.split(os.sep):
return GroupVars(filename, settings, standards)
if "host_vars" in filename.split(os.sep):
return HostVars(filename, settings, standards)
if parentdir in ["meta"] and "main" in basename:
return Meta(filename, settings, standards)
if parentdir in ["meta"] and "argument_specs" in basename:
return ArgumentSpecs(filename, settings, standards)
if (
parentdir in ["library", "lookup_plugins", "callback_plugins", "filter_plugins"]
or filename.endswith(".py")
):
return Code(filename, settings, standards)
if basename == "inventory" or basename == "hosts" or parentdir in ["inventories"]:
return Inventory(filename, settings, standards)
if ("rolesfile" in basename or ("requirements" in basename and ext in ["yaml", "yml"])):
return Rolesfile(filename, settings, standards)
if "Makefile" in basename:
return Makefile(filename, settings, standards)
if "templates" in filename.split(os.sep) or basename.endswith(".j2"):
return Template(filename, settings, standards)
if "files" in filename.split(os.sep):
return File(filename, settings, standards)
if basename.endswith(".yml") or basename.endswith(".yaml"):
return Playbook(filename, settings, standards)
if "README" in basename:
return Doc(filename, settings, standards)
return None
def _format_id(self, standard_id):
sid = standard_id.strip()
if sid:
standard_id = f"[{sid}] "
return standard_id
def __repr__(self):
return f"{type(self).__name__} ({self.path})"
def __getitem__(self, item):
return self.__dict__.get(item)
class RoleFile(Candidate):
"""Object classified as Ansible role file."""
def __init__(self, filename, settings={}, standards=[]): # noqa
super().__init__(filename, settings, standards)
parentdir = os.path.dirname(os.path.abspath(filename))
while parentdir != os.path.dirname(parentdir):
role_modules = os.path.join(parentdir, "library")
if os.path.exists(role_modules):
module_loader.add_directory(role_modules)
break
parentdir = os.path.dirname(parentdir)
class Playbook(Candidate):
"""Object classified as Ansible playbook."""
pass
class Task(RoleFile):
"""Object classified as Ansible task file."""
def __init__(self, filename, settings={}, standards=[]): # noqa
super().__init__(filename, settings, standards)
self.filetype = "tasks"
class Handler(RoleFile):
"""Object classified as Ansible handler file."""
def __init__(self, filename, settings={}, standards=[]): # noqa
super().__init__(filename, settings, standards)
self.filetype = "handlers"
class Vars(Candidate):
"""Object classified as Ansible vars file."""
pass
class InventoryVars(Candidate):
"""Object classified as Ansible inventory vars."""
pass
class HostVars(InventoryVars):
"""Object classified as Ansible host vars."""
pass
class GroupVars(InventoryVars):
"""Object classified as Ansible group vars."""
pass
class RoleVars(RoleFile):
"""Object classified as Ansible role vars."""
pass
class Meta(RoleFile):
"""Object classified as Ansible meta file."""
pass
class ArgumentSpecs(RoleFile):
"""Object classified as Ansible roles argument specs file."""
pass
class Inventory(Candidate):
"""Object classified as Ansible inventory file."""
pass
class Code(Candidate):
"""Object classified as code file."""
pass
class Template(RoleFile):
"""Object classified as Ansible template file."""
pass
class Doc(Candidate):
"""Object classified as documentation file."""
pass
class Makefile(Candidate):
"""Object classified as makefile."""
pass
class File(RoleFile):
"""Object classified as generic file."""
pass
class Rolesfile(Candidate):
"""Object classified as Ansible roles file."""
pass