mirror of
https://github.com/thegeeklab/ansible-later.git
synced 2024-12-01 09:10:36 +00:00
292 lines
11 KiB
Python
292 lines
11 KiB
Python
"""Checks related to ansible specific best practices."""
|
|
|
|
import os
|
|
import re
|
|
from collections import defaultdict
|
|
|
|
from ansiblelater.command.candidates import Error
|
|
from ansiblelater.command.candidates import Result
|
|
from ansiblelater.command.candidates import Template
|
|
from ansiblelater.utils import count_spaces
|
|
from ansiblelater.utils.rulehelper import get_normalized_tasks
|
|
from ansiblelater.utils.rulehelper import get_normalized_yaml
|
|
|
|
|
|
def check_braces_spaces(candidate, settings):
|
|
yamllines, errors = get_normalized_yaml(candidate, settings)
|
|
conf = settings["ansible"]["double-braces"]
|
|
description = "no suitable numbers of spaces (min: {min} max: {max})".format(
|
|
min=conf["min-spaces-inside"], max=conf["max-spaces-inside"])
|
|
|
|
matches = []
|
|
braces = re.compile("{{(.*?)}}")
|
|
|
|
if not errors:
|
|
for i, line in yamllines:
|
|
match = braces.findall(line)
|
|
if match:
|
|
for item in match:
|
|
matches.append((i, item))
|
|
|
|
for i, line in matches:
|
|
[leading, trailing] = count_spaces(line)
|
|
sum_spaces = leading + trailing
|
|
|
|
if (
|
|
(sum_spaces < conf["min-spaces-inside"] * 2)
|
|
or (sum_spaces > conf["min-spaces-inside"] * 2)
|
|
):
|
|
errors.append(Error(i, description))
|
|
return Result(candidate.path, errors)
|
|
|
|
|
|
def check_named_task(candidate, settings):
|
|
tasks, errors = get_normalized_tasks(candidate, settings)
|
|
nameless_tasks = ["meta", "debug", "include_role", "import_role",
|
|
"include_tasks", "import_tasks", "include_vars",
|
|
"block"]
|
|
description = "module '%s' used without name attribute"
|
|
|
|
if not errors:
|
|
for task in tasks:
|
|
module = task["action"]["__ansible_module__"]
|
|
if "name" not in task and module not in nameless_tasks:
|
|
errors.append(Error(task["__line__"], description % module))
|
|
|
|
return Result(candidate.path, errors)
|
|
|
|
|
|
def check_name_format(candidate, settings):
|
|
tasks, errors = get_normalized_tasks(candidate, settings)
|
|
description = "name '%s' should start with uppercase"
|
|
namelines = defaultdict(list)
|
|
|
|
if not errors:
|
|
for task in tasks:
|
|
if "name" in task:
|
|
namelines[task["name"]].append(task["__line__"])
|
|
for (name, lines) in namelines.items():
|
|
if not name[0].isupper():
|
|
errors.append(Error(lines[-1], description % name))
|
|
|
|
return Result(candidate.path, errors)
|
|
|
|
|
|
def check_unique_named_task(candidate, settings):
|
|
tasks, errors = get_normalized_tasks(candidate, settings)
|
|
description = "name '%s' appears multiple times"
|
|
|
|
namelines = defaultdict(list)
|
|
|
|
if not errors:
|
|
for task in tasks:
|
|
if "name" in task:
|
|
namelines[task["name"]].append(task["__line__"])
|
|
for (name, lines) in namelines.items():
|
|
if len(lines) > 1:
|
|
errors.append(Error(lines[-1], description % name))
|
|
|
|
return Result(candidate.path, errors)
|
|
|
|
|
|
def check_command_instead_of_module(candidate, settings):
|
|
tasks, errors = get_normalized_tasks(candidate, settings)
|
|
commands = ["command", "shell", "raw"]
|
|
modules = {
|
|
"git": "git", "hg": "hg", "curl": "get_url or uri", "wget": "get_url or uri",
|
|
"svn": "subversion", "service": "service", "mount": "mount",
|
|
"rpm": "yum or rpm_key", "yum": "yum", "apt-get": "apt-get",
|
|
"unzip": "unarchive", "tar": "unarchive", "chkconfig": "service",
|
|
"rsync": "synchronize", "supervisorctl": "supervisorctl", "systemctl": "systemd",
|
|
"sed": "template or lineinfile"
|
|
}
|
|
description = "%s command used in place of %s module"
|
|
|
|
if not errors:
|
|
for task in tasks:
|
|
if task["action"]["__ansible_module__"] in commands:
|
|
if "cmd" in task["action"]:
|
|
first_cmd_arg = task["action"]["cmd"].split()[0]
|
|
else:
|
|
first_cmd_arg = task["action"]["__ansible_arguments__"][0]
|
|
|
|
executable = os.path.basename(first_cmd_arg)
|
|
if (first_cmd_arg and executable in modules
|
|
and task["action"].get("warn", True) and "register" not in task):
|
|
errors.append(
|
|
Error(task["__line__"], description % (executable, modules[executable])))
|
|
|
|
return Result(candidate.path, errors)
|
|
|
|
|
|
def check_install_use_latest(candidate, settings):
|
|
tasks, errors = get_normalized_tasks(candidate, settings)
|
|
package_managers = ["yum", "apt", "dnf", "homebrew", "pacman", "openbsd_package", "pkg5",
|
|
"portage", "pkgutil", "slackpkg", "swdepot", "zypper", "bundler", "pip",
|
|
"pear", "npm", "yarn", "gem", "easy_install", "bower", "package", "apk",
|
|
"openbsd_pkg", "pkgng", "sorcery", "xbps"]
|
|
description = "package installs should use state=present with or without a version"
|
|
|
|
if not errors:
|
|
for task in tasks:
|
|
if (task["action"]["__ansible_module__"] in package_managers
|
|
and task["action"].get("state") == "latest"):
|
|
errors.append(Error(task["__line__"], description))
|
|
|
|
return Result(candidate.path, errors)
|
|
|
|
|
|
def check_shell_instead_command(candidate, settings):
|
|
tasks, errors = get_normalized_tasks(candidate, settings)
|
|
description = "shell should only be used when piping, redirecting or chaining commands"
|
|
|
|
if not errors:
|
|
for task in tasks:
|
|
if task["action"]["__ansible_module__"] == "shell":
|
|
if "cmd" in task["action"]:
|
|
cmd = task["action"].get("cmd", [])
|
|
else:
|
|
cmd = " ".join(task["action"].get("__ansible_arguments__", []))
|
|
|
|
unjinja = re.sub(r"\{\{[^\}]*\}\}", "JINJA_VAR", cmd)
|
|
if not any([ch in unjinja for ch in "&|<>;$\n*[]{}?"]):
|
|
errors.append(Error(task["__line__"], description))
|
|
|
|
return Result(candidate.path, errors)
|
|
|
|
|
|
def check_command_has_changes(candidate, settings):
|
|
tasks, errors = get_normalized_tasks(candidate, settings)
|
|
commands = ["command", "shell", "raw"]
|
|
description = "commands should either read information (and thus set changed_when) or not " \
|
|
"do something if it has already been done (using creates/removes) " \
|
|
"or only do it if another check has a particular result (when)"
|
|
|
|
if not errors:
|
|
for task in tasks:
|
|
if task["action"]["__ansible_module__"] in commands:
|
|
if ("changed_when" not in task and "when" not in task
|
|
and "when" not in task["__ansible_action_meta__"]
|
|
and "creates" not in task["action"]
|
|
and "removes" not in task["action"]):
|
|
errors.append(Error(task["__line__"], description))
|
|
|
|
return Result(candidate.path, errors)
|
|
|
|
|
|
def check_empty_string_compare(candidate, settings):
|
|
yamllines, errors = get_normalized_yaml(candidate, settings)
|
|
description = "use `when: var` rather than `when: var != ""` (or " \
|
|
"conversely `when: not var` rather than `when: var == ""`)"
|
|
|
|
if not errors:
|
|
if isinstance(candidate, Template):
|
|
matches = []
|
|
jinja_string = re.compile("({{|{%)(.*?)(}}|%})")
|
|
|
|
for i, line in yamllines:
|
|
match = jinja_string.findall(line)
|
|
if match:
|
|
for item in match:
|
|
matches.append((i, item[1]))
|
|
|
|
yamllines = matches
|
|
|
|
empty_string_compare = re.compile("[=!]= ?[\"'][\"']")
|
|
|
|
for i, line in yamllines:
|
|
if empty_string_compare.findall(line):
|
|
errors.append(Error(i, description))
|
|
|
|
return Result(candidate.path, errors)
|
|
|
|
|
|
def check_compare_to_literal_bool(candidate, settings):
|
|
yamllines, errors = get_normalized_yaml(candidate, settings)
|
|
description = "use `when: var` rather than `when: var == True` " \
|
|
"(or conversely `when: not var`)"
|
|
|
|
if not errors:
|
|
if isinstance(candidate, Template):
|
|
matches = []
|
|
jinja_string = re.compile("({{|{%)(.*?)(}}|%})")
|
|
|
|
for i, line in yamllines:
|
|
match = jinja_string.findall(line)
|
|
if match:
|
|
for item in match:
|
|
matches.append((i, item[1]))
|
|
|
|
yamllines = matches
|
|
|
|
literal_bool_compare = re.compile("[=!]= ?(True|true|False|false)")
|
|
|
|
for i, line in yamllines:
|
|
if literal_bool_compare.findall(line):
|
|
errors.append(Error(i, description))
|
|
|
|
return Result(candidate.path, errors)
|
|
|
|
|
|
def check_delegate_to_localhost(candidate, settings):
|
|
tasks, errors = get_normalized_tasks(candidate, settings)
|
|
description = "connection: local ensures that unexpected delegated_vars " \
|
|
"don't get set (e.g. {{ inventory_hostname }} " \
|
|
"used by vars_files)"
|
|
|
|
if not errors:
|
|
for task in tasks:
|
|
if task.get("delegate_to") == "localhost":
|
|
errors.append(Error(task["__line__"], description))
|
|
|
|
return Result(candidate.path, errors)
|
|
|
|
|
|
def check_literal_bool_format(candidate, settings):
|
|
yamllines, errors = get_normalized_yaml(candidate, settings)
|
|
description = "literal bools should be written as 'True/False' or 'yes/no'"
|
|
|
|
uppercase_bool = re.compile(r"([=!]=|:)\s*(true|false|TRUE|FALSE|Yes|No|YES|NO)\s*$")
|
|
|
|
if not errors:
|
|
for i, line in yamllines:
|
|
if uppercase_bool.findall(line):
|
|
errors.append(Error(i, description))
|
|
|
|
return Result(candidate.path, errors)
|
|
|
|
|
|
def check_become_user(candidate, settings):
|
|
tasks, errors = get_normalized_tasks(candidate, settings)
|
|
description = "the task has 'become:' enabled but 'become_user:' is missing"
|
|
true_value = [True, "true", "True", "TRUE", "yes", "Yes", "YES"]
|
|
|
|
if not errors:
|
|
gen = (task for task in tasks if "become" in task)
|
|
for task in gen:
|
|
if task["become"] in true_value and "become_user" not in task.keys():
|
|
errors.append(Error(task["__line__"], description))
|
|
|
|
return Result(candidate.path, errors)
|
|
|
|
|
|
def check_filter_separation(candidate, settings):
|
|
yamllines, errors = get_normalized_yaml(candidate, settings)
|
|
description = "no suitable numbers of spaces (required: 1)"
|
|
|
|
matches = []
|
|
braces = re.compile("{{(.*?)}}")
|
|
filters = re.compile(r"(?<=\|)([\s]{2,}[^\s}]+|[^\s]+)|([^\s{]+[\s]{2,}|[^\s]+)(?=\|)")
|
|
|
|
if not errors:
|
|
for i, line in yamllines:
|
|
match = braces.findall(line)
|
|
if match:
|
|
for item in match:
|
|
matches.append((i, item))
|
|
|
|
for i, line in matches:
|
|
if filters.findall(line):
|
|
errors.append(Error(i, description))
|
|
return Result(candidate.path, errors)
|