2018-12-19 10:19:07 +00:00
|
|
|
from __future__ import print_function
|
|
|
|
|
|
|
|
import importlib
|
|
|
|
import logging
|
|
|
|
import os
|
|
|
|
import subprocess
|
|
|
|
import sys
|
|
|
|
import re
|
2019-03-21 15:05:55 +00:00
|
|
|
import colorama
|
2018-12-19 10:19:07 +00:00
|
|
|
|
|
|
|
from distutils.version import LooseVersion
|
2019-03-21 15:05:55 +00:00
|
|
|
from ansible.module_utils.parsing.convert_bool import boolean as to_bool
|
2018-12-19 10:19:07 +00:00
|
|
|
|
|
|
|
try:
|
|
|
|
import ConfigParser as configparser
|
|
|
|
except ImportError:
|
|
|
|
import configparser
|
|
|
|
|
2019-03-21 15:05:55 +00:00
|
|
|
# try:
|
|
|
|
# from ansible.utils.color import stringc
|
|
|
|
# except ImportError:
|
|
|
|
# from ansible.color import stringc
|
2018-12-19 10:19:07 +00:00
|
|
|
|
|
|
|
# from yamlhelper import *
|
|
|
|
|
|
|
|
|
2019-03-21 15:05:55 +00:00
|
|
|
def should_do_markup():
|
|
|
|
py_colors = os.environ.get('PY_COLORS', None)
|
|
|
|
if py_colors is not None:
|
|
|
|
return to_bool(py_colors, strict=False)
|
|
|
|
|
|
|
|
return sys.stdout.isatty() and os.environ.get('TERM') != 'dumb'
|
|
|
|
|
|
|
|
|
|
|
|
colorama.init(autoreset=True, strip=not should_do_markup())
|
|
|
|
|
|
|
|
|
2018-12-19 10:19:07 +00:00
|
|
|
def abort(message, file=sys.stderr):
|
2019-03-21 15:05:55 +00:00
|
|
|
return color_text(colorama.Fore.RED, "FATAL: {}".format(message))
|
2018-12-19 10:19:07 +00:00
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
|
|
def error(message, file=sys.stderr):
|
2019-03-21 15:05:55 +00:00
|
|
|
return color_text(colorama.Fore.RED, "ERROR: {}".format(message))
|
2018-12-19 10:19:07 +00:00
|
|
|
|
|
|
|
|
|
|
|
def warn(message, settings, file=sys.stdout):
|
|
|
|
if settings.log_level <= logging.WARNING:
|
2019-03-21 15:05:55 +00:00
|
|
|
return color_text(colorama.Fore.YELLOW, "WARN: {}".format(message))
|
2018-12-19 10:19:07 +00:00
|
|
|
|
|
|
|
|
|
|
|
def info(message, settings, file=sys.stdout):
|
|
|
|
if settings.log_level <= logging.INFO:
|
2019-03-21 15:05:55 +00:00
|
|
|
return color_text(colorama.Fore.BLUE, "INFO: {}".format(message))
|
|
|
|
|
|
|
|
|
|
|
|
def color_text(color, msg):
|
|
|
|
print('{}{}{}'.format(color, msg, colorama.Style.RESET_ALL))
|
2018-12-19 10:19:07 +00:00
|
|
|
|
|
|
|
|
2019-01-25 11:32:28 +00:00
|
|
|
def count_spaces(c_string):
|
|
|
|
leading_spaces = 0
|
|
|
|
trailing_spaces = 0
|
|
|
|
|
|
|
|
for i, e in enumerate(c_string):
|
|
|
|
if not e.isspace():
|
|
|
|
break
|
|
|
|
leading_spaces += 1
|
|
|
|
|
|
|
|
for i, e in reversed(list(enumerate(c_string))):
|
|
|
|
if not e.isspace():
|
|
|
|
break
|
|
|
|
trailing_spaces += 1
|
|
|
|
|
|
|
|
return((leading_spaces, trailing_spaces))
|
|
|
|
|
|
|
|
|
2018-12-19 10:19:07 +00:00
|
|
|
def get_property(prop):
|
|
|
|
currentdir = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
parentdir = os.path.dirname(currentdir)
|
|
|
|
result = re.search(
|
|
|
|
r'{}\s*=\s*[\'"]([^\'"]*)[\'"]'.format(prop),
|
|
|
|
open(os.path.join(parentdir, '__init__.py')).read())
|
|
|
|
return result.group(1)
|
|
|
|
|
|
|
|
|
|
|
|
def standards_latest(standards):
|
|
|
|
return max([standard.version for standard in standards if standard.version] or ["0.1"],
|
|
|
|
key=LooseVersion)
|
|
|
|
|
|
|
|
|
|
|
|
def lines_ranges(lines_spec):
|
|
|
|
if not lines_spec:
|
|
|
|
return None
|
|
|
|
result = []
|
|
|
|
for interval in lines_spec.split(","):
|
|
|
|
(start, end) = interval.split("-")
|
|
|
|
result.append(range(int(start), int(end) + 1))
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
def is_line_in_ranges(line, ranges):
|
|
|
|
return not ranges or any([line in r for r in ranges])
|
|
|
|
|
|
|
|
|
|
|
|
def read_standards(settings):
|
|
|
|
if not settings.rulesdir:
|
|
|
|
abort("Standards directory is not set on command line or in configuration file - aborting")
|
|
|
|
sys.path.append(os.path.abspath(os.path.expanduser(settings.rulesdir)))
|
|
|
|
try:
|
|
|
|
standards = importlib.import_module('standards')
|
|
|
|
except ImportError as e:
|
|
|
|
abort("Could not import standards from directory %s: %s" % (settings.rulesdir, str(e)))
|
|
|
|
return standards
|
|
|
|
|
|
|
|
|
|
|
|
def read_config(config_file):
|
|
|
|
config = configparser.RawConfigParser({'standards': None})
|
|
|
|
config.read(config_file)
|
|
|
|
|
|
|
|
return Settings(config, config_file)
|
|
|
|
|
|
|
|
|
|
|
|
def execute(cmd):
|
|
|
|
result = ExecuteResult()
|
|
|
|
encoding = 'UTF-8'
|
|
|
|
env = dict(os.environ)
|
|
|
|
env['PYTHONIOENCODING'] = encoding
|
|
|
|
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
|
|
|
|
stderr=subprocess.STDOUT, env=env)
|
|
|
|
result.output = proc.communicate()[0].decode(encoding)
|
|
|
|
result.rc = proc.returncode
|
|
|
|
return result
|
|
|
|
|
|
|
|
|
|
|
|
class Settings(object):
|
|
|
|
def __init__(self, config, config_file):
|
|
|
|
self.rulesdir = None
|
|
|
|
self.custom_modules = []
|
|
|
|
self.log_level = None
|
|
|
|
self.standards_filter = None
|
|
|
|
|
|
|
|
if config.has_section('rules'):
|
|
|
|
self.rulesdir = config.get('rules', 'standards')
|
|
|
|
if config.has_section('ansible'):
|
|
|
|
modules = config.get('ansible', 'custom_modules')
|
|
|
|
self.custom_modules = [x.strip() for x in modules.split(',')]
|
|
|
|
|
|
|
|
self.configfile = config_file
|
|
|
|
|
|
|
|
|
|
|
|
class ExecuteResult(object):
|
|
|
|
pass
|