import re import os from collections import defaultdict from ansiblelater import Result, Error from ansiblelater.utils.rulehelper import (get_normalized_tasks, get_normalized_yaml) def check_braces_spaces(candidate, settings): yamllines, errors = get_normalized_yaml(candidate, settings) description = "no suitable numbers of spaces (required: 1)" lineno = 0 matches = [] braces = re.compile("{{(.*?)}}") if not errors: for line in yamllines: lineno += 1 match = braces.findall(line) if match: for item in match: matches.append((item, lineno)) for item, lineno in matches: error_count = 0 string_length = len(item) strip_length = item.rstrip() if strip_length == 0 and not string_length == 1: error_count += 1 else: x = 0 leading_spaces = 0 while (x < string_length - 1 and item[x].isspace()): x += 1 leading_spaces += 1 x = string_length - 1 trailing_spaces = 0 while (x > 0 and item[x].isspace()): x -= 1 trailing_spaces += 1 if not leading_spaces == 1 or not trailing_spaces == 1: error_count += 1 if not error_count == 0: errors.append(Error(lineno, description)) return Result(candidate.path, errors) def check_named_task(candidate, settings): tasks, errors = get_normalized_tasks(candidate, settings) nameless_tasks = ['meta', 'debug', 'include_role', 'import_role', 'include_tasks', 'import_tasks', 'include_vars', 'block'] description = "module '%s' used without name attribute" if not errors: for task in tasks: module = task["action"]["__ansible_module__"] if 'name' not in task and module not in nameless_tasks: errors.append(Error(task['__line__'], description % module)) return Result(candidate.path, errors) def check_name_format(candidate, settings): tasks, errors = get_normalized_tasks(candidate, settings) description = "name '%s' should start with uppercase" namelines = defaultdict(list) if not errors: for task in tasks: if 'name' in task: namelines[task['name']].append(task['__line__']) for (name, lines) in namelines.items(): if not name[0].isupper(): errors.append(Error(lines[-1], description % name)) return Result(candidate.path, errors) def check_unique_named_task(candidate, settings): tasks, errors = get_normalized_tasks(candidate, settings) description = "name '%s' appears multiple times" namelines = defaultdict(list) if not errors: for task in tasks: if 'name' in task: namelines[task['name']].append(task['__line__']) for (name, lines) in namelines.items(): if len(lines) > 1: errors.append(Error(lines[-1], description % name)) return Result(candidate.path, errors) def check_command_instead_of_module(candidate, settings): tasks, errors = get_normalized_tasks(candidate, settings) commands = ['command', 'shell', 'raw'] modules = { 'git': 'git', 'hg': 'hg', 'curl': 'get_url or uri', 'wget': 'get_url or uri', 'svn': 'subversion', 'service': 'service', 'mount': 'mount', 'rpm': 'yum or rpm_key', 'yum': 'yum', 'apt-get': 'apt-get', 'unzip': 'unarchive', 'tar': 'unarchive', 'chkconfig': 'service', 'rsync': 'synchronize', 'supervisorctl': 'supervisorctl', 'systemctl': 'systemd', 'sed': 'template or lineinfile' } description = "%s command used in place of %s module" if not errors: for task in tasks: if task["action"]["__ansible_module__"] in commands: if 'cmd' in task['action']: first_cmd_arg = task["action"]["cmd"].split()[0] else: first_cmd_arg = task["action"]["__ansible_arguments__"][0] executable = os.path.basename(first_cmd_arg) if first_cmd_arg and executable in modules and task['action'].get('warn', True): errors.append( Error(task["__line__"], description % (executable, modules[executable]))) return Result(candidate.path, errors) def check_install_use_latest(candidate, settings): tasks, errors = get_normalized_tasks(candidate, settings) package_managers = ['yum', 'apt', 'dnf', 'homebrew', 'pacman', 'openbsd_package', 'pkg5', 'portage', 'pkgutil', 'slackpkg', 'swdepot', 'zypper', 'bundler', 'pip', 'pear', 'npm', 'gem', 'easy_install', 'bower', 'package'] description = "package installs should use state=present with or without a version" if not errors: for task in tasks: if (task["action"]["__ansible_module__"] in package_managers and task["action"].get("state") == "latest"): errors.append(Error(task["__line__"], description)) return Result(candidate.path, errors) def check_shell_instead_command(candidate, settings): tasks, errors = get_normalized_tasks(candidate, settings) description = "shell should only be used when piping, redirecting or chaining commands" if not errors: for task in tasks: if task["action"]["__ansible_module__"] == 'shell': if 'cmd' in task['action']: cmd = task["action"].get("cmd", []) else: cmd = ' '.join(task["action"].get("__ansible_arguments__", [])) unjinja = re.sub(r"\{\{[^\}]*\}\}", "JINJA_VAR", cmd) if not any([ch in unjinja for ch in '&|<>;$\n*[]{}?']): errors.append(Error(task["__line__"], description)) return Result(candidate.path, errors) def check_command_has_changes(candidate, settings): tasks, errors = get_normalized_tasks(candidate, settings) commands = ['command', 'shell', 'raw'] description = "commands should either read information (and thus set changed_when) or not " \ "do something if it has already been done (using creates/removes) " \ "or only do it if another check has a particular result (when)" if not errors: for task in tasks: if task["action"]["__ansible_module__"] in commands: if 'changed_when' not in task and 'when' not in task \ and 'creates' not in task['action'] and 'removes' not in task['action']: errors.append(Error(task["__line__"], description)) return Result(candidate.path, errors) def check_empty_string_compare(candidate, settings): yamllines, errors = get_normalized_yaml(candidate, settings) description = 'use `when: var` rather than `when: var != ""` (or ' \ 'conversely `when: not var` rather than `when: var == ""`)' lineno = 0 empty_string_compare = re.compile("[=!]= ?[\"'][\"']") if not errors: for line in yamllines: lineno += 1 if empty_string_compare.findall(line): errors.append(Error(lineno, description)) return Result(candidate.path, errors) def check_compare_to_literal_bool(candidate, settings): yamllines, errors = get_normalized_yaml(candidate, settings) description = "use `when: var` rather than `when: var == True` " \ "(or conversely `when: not var`)" lineno = 0 literal_bool_compare = re.compile("[=!]= ?(True|true|False|false)") if not errors: for line in yamllines: lineno += 1 if literal_bool_compare.findall(line): errors.append(Error(lineno, description)) return Result(candidate.path, errors) def check_delegate_to_localhost(candidate, settings): tasks, errors = get_normalized_tasks(candidate, settings) description = "connection: local ensures that unexpected delegated_vars " \ "don't get set (e.g. {{ inventory_hostname }} " \ "used by vars_files)" if not errors: for task in tasks: if task.get('delegate_to') == 'localhost': errors.append(Error(task["__line__"], description)) return Result(candidate.path, errors)