ansible-later/ansiblelater/utils/__init__.py
2019-03-21 16:05:55 +01:00

154 lines
4.0 KiB
Python

from __future__ import print_function
import importlib
import logging
import os
import subprocess
import sys
import re
import colorama
from distutils.version import LooseVersion
from ansible.module_utils.parsing.convert_bool import boolean as to_bool
try:
import ConfigParser as configparser
except ImportError:
import configparser
# try:
# from ansible.utils.color import stringc
# except ImportError:
# from ansible.color import stringc
# from yamlhelper import *
def should_do_markup():
py_colors = os.environ.get('PY_COLORS', None)
if py_colors is not None:
return to_bool(py_colors, strict=False)
return sys.stdout.isatty() and os.environ.get('TERM') != 'dumb'
colorama.init(autoreset=True, strip=not should_do_markup())
def abort(message, file=sys.stderr):
return color_text(colorama.Fore.RED, "FATAL: {}".format(message))
sys.exit(1)
def error(message, file=sys.stderr):
return color_text(colorama.Fore.RED, "ERROR: {}".format(message))
def warn(message, settings, file=sys.stdout):
if settings.log_level <= logging.WARNING:
return color_text(colorama.Fore.YELLOW, "WARN: {}".format(message))
def info(message, settings, file=sys.stdout):
if settings.log_level <= logging.INFO:
return color_text(colorama.Fore.BLUE, "INFO: {}".format(message))
def color_text(color, msg):
print('{}{}{}'.format(color, msg, colorama.Style.RESET_ALL))
def count_spaces(c_string):
leading_spaces = 0
trailing_spaces = 0
for i, e in enumerate(c_string):
if not e.isspace():
break
leading_spaces += 1
for i, e in reversed(list(enumerate(c_string))):
if not e.isspace():
break
trailing_spaces += 1
return((leading_spaces, trailing_spaces))
def get_property(prop):
currentdir = os.path.dirname(os.path.realpath(__file__))
parentdir = os.path.dirname(currentdir)
result = re.search(
r'{}\s*=\s*[\'"]([^\'"]*)[\'"]'.format(prop),
open(os.path.join(parentdir, '__init__.py')).read())
return result.group(1)
def standards_latest(standards):
return max([standard.version for standard in standards if standard.version] or ["0.1"],
key=LooseVersion)
def lines_ranges(lines_spec):
if not lines_spec:
return None
result = []
for interval in lines_spec.split(","):
(start, end) = interval.split("-")
result.append(range(int(start), int(end) + 1))
return result
def is_line_in_ranges(line, ranges):
return not ranges or any([line in r for r in ranges])
def read_standards(settings):
if not settings.rulesdir:
abort("Standards directory is not set on command line or in configuration file - aborting")
sys.path.append(os.path.abspath(os.path.expanduser(settings.rulesdir)))
try:
standards = importlib.import_module('standards')
except ImportError as e:
abort("Could not import standards from directory %s: %s" % (settings.rulesdir, str(e)))
return standards
def read_config(config_file):
config = configparser.RawConfigParser({'standards': None})
config.read(config_file)
return Settings(config, config_file)
def execute(cmd):
result = ExecuteResult()
encoding = 'UTF-8'
env = dict(os.environ)
env['PYTHONIOENCODING'] = encoding
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, env=env)
result.output = proc.communicate()[0].decode(encoding)
result.rc = proc.returncode
return result
class Settings(object):
def __init__(self, config, config_file):
self.rulesdir = None
self.custom_modules = []
self.log_level = None
self.standards_filter = None
if config.has_section('rules'):
self.rulesdir = config.get('rules', 'standards')
if config.has_section('ansible'):
modules = config.get('ansible', 'custom_modules')
self.custom_modules = [x.strip() for x in modules.split(',')]
self.configfile = config_file
class ExecuteResult(object):
pass