ansible-later/ansiblelater/command/candidates.py

278 lines
9.0 KiB
Python
Raw Normal View History

2019-04-02 16:34:03 +02:00
"""Candidate module."""
import codecs
2019-04-03 17:42:46 +02:00
import copy
2019-04-02 16:34:03 +02:00
import os
import re
import sys
from distutils.version import LooseVersion
import ansible
from ansiblelater import LOG
2019-04-03 17:42:46 +02:00
from ansiblelater import utils
2019-04-03 23:39:27 +02:00
from ansiblelater.logger import flag_extra
2019-04-03 17:42:46 +02:00
from ansiblelater.command.review import Error
2019-04-02 16:34:03 +02:00
from ansiblelater.exceptions import ( # noqa
LaterError, LaterAnsibleError
)
try:
# Ansible 2.4 import of module loader
from ansible.plugins.loader import module_loader
except ImportError:
try:
from ansible.plugins import module_loader
except ImportError:
from ansible.utils import module_finder as module_loader
class Candidate(object):
"""
Meta object for all files which later has to process.
Each file passed to later will be classified by type and
bundled with necessary meta informations for rule processing.
"""
def __init__(self, filename, settings={}, standards=[]):
self.path = filename
self.binary = False
self.vault = False
self.filetype = type(self).__name__.lower()
self.expected_version = True
2019-04-03 17:42:46 +02:00
self.standards = self._get_standards(settings, standards)
self.version = self._get_version(settings)
2019-04-02 16:34:03 +02:00
try:
with codecs.open(filename, mode="rb", encoding="utf-8") as f:
if f.readline().startswith("$ANSIBLE_VAULT"):
self.vault = True
except UnicodeDecodeError:
self.binary = True
2019-04-03 17:42:46 +02:00
def _get_version(self, settings):
2019-04-02 16:34:03 +02:00
if isinstance(self, RoleFile):
parentdir = os.path.dirname(os.path.abspath(self.path))
while parentdir != os.path.dirname(parentdir):
meta_file = os.path.join(parentdir, "meta", "main.yml")
if os.path.exists(meta_file):
path = meta_file
2019-04-03 17:42:46 +02:00
break
2019-04-02 16:34:03 +02:00
parentdir = os.path.dirname(parentdir)
else:
path = self.path
version = None
version_re = re.compile(r"^# Standards:\s*([\d.]+)")
with codecs.open(path, mode="rb", encoding="utf-8") as f:
for line in f:
match = version_re.match(line)
if match:
version = match.group(1)
if not version:
2019-04-03 17:42:46 +02:00
version = utils.standards_latest(self.standards)
2019-04-02 16:34:03 +02:00
if self.expected_version:
if isinstance(self, RoleFile):
LOG.warn("%s %s is in a role that contains a meta/main.yml without a declared "
"standards version. "
"Using latest standards version %s" %
(type(self).__name__, self.path, version))
else:
LOG.warn("%s %s does not present standards version. "
"Using latest standards version %s" %
(type(self).__name__, self.path, version))
LOG.info("%s %s declares standards version %s" %
(type(self).__name__, self.path, version))
return version
2019-04-03 17:42:46 +02:00
def _get_standards(self, settings, standards):
target_standards = []
limits = settings.config["rules"]["filter"]
if limits:
for standard in standards:
if standard.id in limits:
target_standards.append(standard)
else:
target_standards = standards
# print(target_standards)
return target_standards
2019-04-02 16:34:03 +02:00
def review(self, settings, lines=None):
errors = 0
2019-04-03 17:42:46 +02:00
for standard in self.standards:
if type(self).__name__.lower() not in standard.types:
2019-04-02 16:34:03 +02:00
continue
2019-04-03 17:42:46 +02:00
result = standard.check(self, settings.config)
2019-04-02 16:34:03 +02:00
if not result:
2019-04-03 17:42:46 +02:00
utils.sysexit_with_message("Standard '%s' returns an empty result object." %
2019-04-02 16:34:03 +02:00
(standard.check.__name__))
2019-04-03 17:42:46 +02:00
labels = {"tag": "review", "standard": standard.name, "file": self.path, "passed": True}
2019-04-02 16:34:03 +02:00
for err in [err for err in result.errors
2019-04-03 17:42:46 +02:00
if not err.lineno or utils.is_line_in_ranges(err.lineno, utils.lines_ranges(lines))]:
err_labels = copy.copy(labels)
err_labels["passed"] = False
if isinstance(err, Error):
err_labels.update(err.to_dict())
2019-04-02 16:34:03 +02:00
if not standard.version:
2019-04-03 17:42:46 +02:00
LOG.warn("{id}Best practice '{name}' not met:\n{path}:{error}".format(
2019-04-03 23:39:27 +02:00
id=standard.id, name=standard.name, path=self.path, error=err), extra=flag_extra(err_labels))
2019-04-03 17:42:46 +02:00
elif LooseVersion(standard.version) > LooseVersion(self.version):
LOG.warn("{id}Future standard '{name}' not met:\n{path}:{error}".format(
2019-04-03 23:39:27 +02:00
id=standard.id, name=standard.name, path=self.path, error=err), extra=flag_extra(err_labels))
2019-04-02 16:34:03 +02:00
else:
2019-04-03 17:42:46 +02:00
LOG.error("{id}Standard '{name}' not met:\n{path}:{error}".format(
2019-04-03 23:39:27 +02:00
id=standard.id, name=standard.name, path=self.path, error=err), extra=flag_extra(err_labels))
2019-04-02 16:34:03 +02:00
errors = errors + 1
if not result.errors:
if not standard.version:
2019-04-03 23:39:27 +02:00
LOG.info("Best practice '%s' met" % standard.name, extra=flag_extra(labels))
2019-04-03 17:42:46 +02:00
elif LooseVersion(standard.version) > LooseVersion(self.version):
2019-04-03 23:39:27 +02:00
LOG.info("Future standard '%s' met" % standard.name, extra=flag_extra(labels))
2019-04-02 16:34:03 +02:00
else:
2019-04-03 17:42:46 +02:00
LOG.info("Standard '%s' met" % standard.name)
2019-04-02 16:34:03 +02:00
return errors
def __repr__(self): # noqa
return "%s (%s)" % (type(self).__name__, self.path)
def __getitem__(self, item): # noqa
return self.__dict__.get(item)
class RoleFile(Candidate):
def __init__(self, filename, settings={}, standards=[]):
super(RoleFile, self).__init__(filename, settings, standards)
2019-04-03 17:42:46 +02:00
parentdir = os.path.dirname(os.path.abspath(filename))
while parentdir != os.path.dirname(parentdir):
role_modules = os.path.join(parentdir, "library")
if os.path.exists(role_modules):
module_loader.add_directory(role_modules)
break
parentdir = os.path.dirname(parentdir)
2019-04-02 16:34:03 +02:00
class Playbook(Candidate):
pass
class Task(RoleFile):
def __init__(self, filename, settings={}, standards=[]):
super(Task, self).__init__(filename, settings, standards)
self.filetype = "tasks"
class Handler(RoleFile):
def __init__(self, filename, settings={}, standards=[]):
super(Handler, self).__init__(filename, settings, standards)
self.filetype = "handlers"
class Vars(Candidate):
pass
class Unversioned(Candidate):
def __init__(self, filename, settings={}, standards=[]):
super(Unversioned, self).__init__(filename, settings, standards)
self.expected_version = False
class InventoryVars(Unversioned):
pass
class HostVars(InventoryVars):
pass
class GroupVars(InventoryVars):
pass
class RoleVars(RoleFile):
pass
class Meta(RoleFile):
pass
class Inventory(Unversioned):
pass
class Code(Unversioned):
pass
class Template(RoleFile):
pass
class Doc(Unversioned):
pass
class Makefile(Unversioned):
pass
class File(RoleFile):
pass
class Rolesfile(Unversioned):
pass
def classify(filename, settings={}, standards=[]):
parentdir = os.path.basename(os.path.dirname(filename))
basename = os.path.basename(filename)
if parentdir in ["tasks"]:
return Task(filename, settings, standards)
if parentdir in ["handlers"]:
return Handler(filename, settings, standards)
if parentdir in ["vars", "defaults"]:
return RoleVars(filename, settings, standards)
if "group_vars" in filename.split(os.sep):
return GroupVars(filename, settings, standards)
if "host_vars" in filename.split(os.sep):
return HostVars(filename, settings, standards)
if parentdir in ["meta"]:
return Meta(filename, settings, standards)
if parentdir in ["library", "lookup_plugins", "callback_plugins",
"filter_plugins"] or filename.endswith(".py"):
return Code(filename, settings, standards)
if "inventory" in basename or "hosts" in basename or parentdir in ["inventory"]:
print("hosts" in filename)
return Inventory(filename, settings, standards)
if "rolesfile" in basename or "requirements" in basename:
return Rolesfile(filename, settings, standards)
if "Makefile" in basename:
return Makefile(filename, settings, standards)
if "templates" in filename.split(os.sep) or basename.endswith(".j2"):
return Template(filename, settings, standards)
if "files" in filename.split(os.sep):
return File(filename, settings, standards)
if basename.endswith(".yml") or basename.endswith(".yaml"):
return Playbook(filename, settings, standards)
if "README" in basename:
return Doc(filename, settings, standards)
return None