add some useful flake8 plugins and start refactoring formatting

This commit is contained in:
Robert Kaussow 2019-03-27 17:00:38 +01:00
parent 43d5a7c41c
commit 4b011891bf
7 changed files with 180 additions and 130 deletions

View File

@ -1,3 +1,5 @@
"""Default package."""
__author__ = "Robert Kaussow"
__project__ = "ansible-later"
__version__ = "0.1.5"
@ -6,18 +8,17 @@ __maintainer__ = "Robert Kaussow"
__email__ = "mail@geeklabor.de"
__status__ = "Production"
import re
import os
import codecs
import ansible
import os
import re
from distutils.version import LooseVersion
from ansiblelater.utils import info, warn, abort, error
from ansiblelater.utils import read_standards
from ansiblelater.utils import get_property
from ansiblelater.utils import standards_latest
from ansiblelater.utils import is_line_in_ranges
from ansiblelater.utils import lines_ranges
import ansible
from ansiblelater.utils import (abort, error, get_property, info,
is_line_in_ranges, lines_ranges,
read_standards, standards_latest, warn)
from .settings import Settings
try:
# Ansible 2.4 import of module loader
@ -29,40 +30,87 @@ except ImportError:
from ansible.utils import module_finder as module_loader
class AnsibleReviewFormatter(object):
def format(self, match):
formatstr = u"{0}:{1}: [{2}] {3} {4}"
return formatstr.format(match.filename,
match.linenumber,
match.rule.id,
match.message,
match.line
)
config = Settings()
class Standard(object):
"""
Standard definition for all defined rules.
Later lookup the config file for a path to a rules directory
or fallback to default `ansiblelater/examples/*`.
"""
def __init__(self, standard_dict):
if 'id' not in standard_dict:
standard_dict.update(id='')
"""
Initialize a new standard object and returns None.
:param standard_dict: Dictionary object containing all neseccary attributes
"""
if "id" not in standard_dict:
standard_dict.update(id="")
else:
standard_dict.update(id='[{}] '.format(standard_dict.get("id")))
standard_dict.update(id="[{}] ".format(standard_dict.get("id")))
self.id = standard_dict.get("id")
self.name = standard_dict.get("name")
self.version = standard_dict.get("version")
self.check = standard_dict.get("check")
self.types = standard_dict.get("types")
def __repr__(self):
def __repr__(self): # noqa
return "Standard: %s (version: %s, types: %s)" % (
self.name, self.version, self.types)
class Candidate(object):
"""
Meta object for all files which later has to process.
Each file passed to later will be classified by type and
bundled with necessary meta informations for rule processing.
"""
def __init__(self, filename):
self.path = filename
self.binary = False
self.vault = False
try:
self.version = find_version(filename)
with codecs.open(filename, mode="rb", encoding="utf-8") as f:
if f.readline().startswith("$ANSIBLE_VAULT"):
self.vault = True
except UnicodeDecodeError:
self.binary = True
self.filetype = type(self).__name__.lower()
self.expected_version = True
def review(self, settings, lines=None):
return candidate_review(self, settings, lines)
def __repr__(self): # noqa
return "%s (%s)" % (type(self).__name__, self.path)
def __getitem__(self, item): # noqa
return self.__dict__.get(item)
class Error(object):
"""Default error object created if a rule failed."""
def __init__(self, lineno, message):
"""
Initialize a new error object and returns None.
:param lineno: Line number where the error from de rule occures
:param message: Detailed error description provided by the rule
"""
self.lineno = lineno
self.message = message
def __repr__(self):
def __repr__(self): # noqa
if self.lineno:
return "%s: %s" % (self.lineno, self.message)
else:
@ -79,33 +127,6 @@ class Result(object):
for error in self.errors])
class Candidate(object):
def __init__(self, filename):
self.path = filename
self.binary = False
self.vault = False
try:
self.version = find_version(filename)
with codecs.open(filename, mode='rb', encoding='utf-8') as f:
if f.readline().startswith("$ANSIBLE_VAULT"):
self.vault = True
except UnicodeDecodeError:
self.binary = True
self.filetype = type(self).__name__.lower()
self.expected_version = True
def review(self, settings, lines=None):
return candidate_review(self, settings, lines)
def __repr__(self):
return "%s (%s)" % (type(self).__name__, self.path)
def __getitem__(self, item):
return self.__dict__.get(item)
class RoleFile(Candidate):
def __init__(self, filename):
super(RoleFile, self).__init__(filename)
@ -118,7 +139,7 @@ class RoleFile(Candidate):
if self.version:
break
parentdir = os.path.dirname(parentdir)
role_modules = os.path.join(parentdir, 'library')
role_modules = os.path.join(parentdir, "library")
if os.path.exists(role_modules):
module_loader.add_directory(role_modules)
@ -130,13 +151,13 @@ class Playbook(Candidate):
class Task(RoleFile):
def __init__(self, filename):
super(Task, self).__init__(filename)
self.filetype = 'tasks'
self.filetype = "tasks"
class Handler(RoleFile):
def __init__(self, filename):
super(Handler, self).__init__(filename)
self.filetype = 'handlers'
self.filetype = "handlers"
class Vars(Candidate):
@ -201,34 +222,34 @@ class Rolesfile(Unversioned):
def classify(filename):
parentdir = os.path.basename(os.path.dirname(filename))
if parentdir in ['tasks']:
if parentdir in ["tasks"]:
return Task(filename)
if parentdir in ['handlers']:
if parentdir in ["handlers"]:
return Handler(filename)
if parentdir in ['vars', 'defaults']:
if parentdir in ["vars", "defaults"]:
return RoleVars(filename)
if 'group_vars' in filename.split(os.sep):
if "group_vars" in filename.split(os.sep):
return GroupVars(filename)
if 'host_vars' in filename.split(os.sep):
if "host_vars" in filename.split(os.sep):
return HostVars(filename)
if parentdir in ['meta']:
if parentdir in ["meta"]:
return Meta(filename)
if parentdir in ['library', 'lookup_plugins', 'callback_plugins',
'filter_plugins'] or filename.endswith('.py'):
if parentdir in ["library", "lookup_plugins", "callback_plugins",
"filter_plugins"] or filename.endswith(".py"):
return Code(filename)
if 'inventory' in filename or 'hosts' in filename or parentdir in ['inventory']:
if "inventory" in filename or "hosts" in filename or parentdir in ["inventory"]:
return Inventory(filename)
if 'rolesfile' in filename or 'requirements' in filename:
if "rolesfile" in filename or "requirements" in filename:
return Rolesfile(filename)
if 'Makefile' in filename:
if "Makefile" in filename:
return Makefile(filename)
if 'templates' in filename.split(os.sep) or filename.endswith('.j2'):
if "templates" in filename.split(os.sep) or filename.endswith(".j2"):
return Template(filename)
if 'files' in filename.split(os.sep):
if "files" in filename.split(os.sep):
return File(filename)
if filename.endswith('.yml') or filename.endswith('.yaml'):
if filename.endswith(".yml") or filename.endswith(".yaml"):
return Playbook(filename)
if 'README' in filename:
if "README" in filename:
return Doc(filename)
return None
@ -236,13 +257,13 @@ def classify(filename):
def candidate_review(candidate, settings, lines=None):
errors = 0
standards = read_standards(settings)
if getattr(standards, 'ansible_min_version', None) and \
if getattr(standards, "ansible_min_version", None) and \
LooseVersion(standards.ansible_min_version) > LooseVersion(ansible.__version__):
raise SystemExit("Standards require ansible version %s (current version %s). "
"Please upgrade ansible." %
(standards.ansible_min_version, ansible.__version__))
if getattr(standards, 'ansible_review_min_version', None) and \
if getattr(standards, "ansible_review_min_version", None) and \
LooseVersion(standards.ansible_review_min_version) > LooseVersion(
get_property("__version__")):
raise SystemExit("Standards require ansible-later version %s (current version %s). "
@ -269,6 +290,7 @@ def candidate_review(candidate, settings, lines=None):
settings)
for standard in standards.standards:
print(type(standard))
if type(candidate).__name__.lower() not in standard.types:
continue
if settings.standards_filter and standard.name not in settings.standards_filter:
@ -305,9 +327,10 @@ def candidate_review(candidate, settings, lines=None):
def find_version(filename, version_regex=r"^# Standards:\s*([\d.]+)"):
version_re = re.compile(version_regex)
with codecs.open(filename, mode='rb', encoding='utf-8') as f:
with codecs.open(filename, mode="rb", encoding="utf-8") as f:
for line in f:
match = version_re.match(line)
if match:
return match.group(1)
return None

View File

@ -7,7 +7,8 @@ import sys
from appdirs import AppDirs
from pkg_resources import resource_filename
from ansiblelater import classify
from ansiblelater.utils import info, warn, read_config, get_property
from ansiblelater import config
from ansiblelater.utils import info, warn, get_property
def main():
@ -28,52 +29,54 @@ def main():
const=logging.INFO, help="Show more verbose output")
options, args = parser.parse_args(sys.argv[1:])
settings = read_config(options.configfile)
# Merge CLI options with config options. CLI options override config options.
for key, value in options.__dict__.items():
if value:
setattr(settings, key, value)
print(config.rulesdir)
# settings = read_config(options.configfile)
if os.path.exists(settings.configfile):
info("Using configuration file: %s" % settings.configfile, settings)
else:
warn("No configuration file found at %s" % settings.configfile, settings, file=sys.stderr)
if not settings.rulesdir:
rules_dir = os.path.join(resource_filename('ansiblelater', 'examples'))
warn("Using example standards found at %s" % rules_dir, settings, file=sys.stderr)
settings.rulesdir = rules_dir
# # Merge CLI options with config options. CLI options override config options.
# for key, value in options.__dict__.items():
# if value:
# setattr(settings, key, value)
if len(args) == 0:
candidates = []
for root, dirs, files in os.walk("."):
for filename in files:
candidates.append(os.path.join(root, filename))
else:
candidates = args
# if os.path.exists(settings.configfile):
# info("Using configuration file: %s" % settings.configfile, settings)
# else:
# warn("No configuration file found at %s" % settings.configfile, settings, file=sys.stderr)
# if not settings.rulesdir:
# rules_dir = os.path.join(resource_filename('ansiblelater', 'examples'))
# warn("Using example standards found at %s" % rules_dir, settings, file=sys.stderr)
# settings.rulesdir = rules_dir
errors = 0
for filename in candidates:
if ':' in filename:
(filename, lines) = filename.split(":")
else:
lines = None
candidate = classify(filename)
if candidate:
if candidate.binary:
info("Not reviewing binary file %s" % filename, settings)
continue
if candidate.vault:
info("Not reviewing vault file %s" % filename, settings)
continue
if lines:
info("Reviewing %s lines %s" % (candidate, lines), settings)
else:
info("Reviewing all of %s" % candidate, settings)
errors = errors + candidate.review(settings, lines)
else:
info("Couldn't classify file %s" % filename, settings)
return errors
# if len(args) == 0:
# candidates = []
# for root, dirs, files in os.walk("."):
# for filename in files:
# candidates.append(os.path.join(root, filename))
# else:
# candidates = args
# errors = 0
# for filename in candidates:
# if ":" in filename:
# (filename, lines) = filename.split(":")
# else:
# lines = None
# candidate = classify(filename)
# if candidate:
# if candidate.binary:
# info("Not reviewing binary file %s" % filename, settings)
# continue
# if candidate.vault:
# info("Not reviewing vault file %s" % filename, settings)
# continue
# if lines:
# info("Reviewing %s lines %s" % (candidate, lines), settings)
# else:
# info("Reviewing all of %s" % candidate, settings)
# errors = errors + candidate.review(settings, lines)
# else:
# info("Couldn't classify file %s" % filename, settings)
# return errors
if __name__ == "__main__":

View File

@ -11,6 +11,7 @@ from ansiblelater.utils.rulehelper import run_yamllint
def check_yaml_has_content(candidate, settings):
Testvar = "test"
lines, errors = get_normalized_yaml(candidate, settings)
description = "the file appears to have no useful content"

20
ansiblelater/settings.py Normal file
View File

@ -0,0 +1,20 @@
try:
import ConfigParser as configparser
except ImportError:
import configparser
class Settings(object):
def __init__(self, config=configparser.ConfigParser(), config_file="asas"):
self.rulesdir = "ahjhsjahsjas"
self.custom_modules = []
self.log_level = None
self.standards_filter = None
if config.has_section('rules'):
self.rulesdir = config.get('rules', 'standards')
if config.has_section('ansible'):
modules = config.get('ansible', 'custom_modules')
self.custom_modules = [x.strip() for x in modules.split(',')]
self.configfile = config_file

View File

@ -112,18 +112,3 @@ def read_config(config_file):
return Settings(config, config_file)
class Settings(object):
def __init__(self, config, config_file):
self.rulesdir = None
self.custom_modules = []
self.log_level = None
self.standards_filter = None
if config.has_section('rules'):
self.rulesdir = config.get('rules', 'standards')
if config.has_section('ansible'):
modules = config.get('ansible', 'custom_modules')
self.custom_modules = [x.strip() for x in modules.split(',')]
self.configfile = config_file

View File

@ -8,8 +8,18 @@ universal = 1
[flake8]
ignore = E501, W503, F401, N813
max-line-length = 100
inline-quotes = double
exclude = .git,.tox,__pycache__,build,dist,tests,*.pyc,*.egg-info,.cache,.eggs
application-import-names = ansiblelater
format = ${cyan}%(path)s:%(row)d:%(col)d${reset}: ${red_bold}%(code)s${reset} %(text)s
[isort]
default_section = THIRDPARTY
known_first_party = ansiblelater
sections = FUTURE,STDLIB,THIRDPARTY,FIRSTPARTY,LOCALFOLDER
[tool:pytest]
filterwarnings =
ignore::FutureWarning
ignore:.*collections.*:DeprecationWarning
ignore:.*pep8.*:FutureWarning

View File

@ -1,5 +1,13 @@
flake8
flake8-colors
flake8-blind-except
flake8-builtins
flake8-colors
flake8-docstrings
flake8-isort
flake8-logging-format
flake8-polyfill
flake8-quotes
pep8-naming
wheel
pytest