ansible-doctor/ansibledoctor/utils/yamlhelper.py

87 lines
2.3 KiB
Python

"""Utils for YAML file operations."""
from collections import defaultdict
from contextlib import suppress
import ruamel.yaml
import yaml
from ansible.parsing.yaml.loader import AnsibleLoader
import ansibledoctor.exception
class UnsafeTag:
"""Handle custom yaml unsafe tag."""
yaml_tag = "!unsafe"
def __init__(self, value):
self.unsafe = value
@staticmethod
def yaml_constructor(loader, node):
return loader.construct_scalar(node)
def parse_yaml_ansible(yamlfile):
try:
loader = AnsibleLoader(yamlfile)
data = loader.get_single_data() or []
except (
yaml.parser.ParserError,
yaml.scanner.ScannerError,
yaml.constructor.ConstructorError,
yaml.composer.ComposerError,
) as e:
message = (
f"{e.context} in line {e.context_mark.line}, column {e.context_mark.line}\n"
f"{e.problem} in line {e.problem_mark.line}, column {e.problem_mark.column}"
)
raise ansibledoctor.exception.YAMLError(message) from e
return data
def parse_yaml(yamlfile):
try:
ruamel.yaml.add_constructor(
UnsafeTag.yaml_tag,
UnsafeTag.yaml_constructor,
constructor=ruamel.yaml.SafeConstructor,
)
data = ruamel.yaml.YAML(typ="rt").load(yamlfile)
_yaml_remove_comments(data)
data = defaultdict(dict, data or {})
except (
ruamel.yaml.parser.ParserError,
ruamel.yaml.scanner.ScannerError,
ruamel.yaml.constructor.ConstructorError,
ruamel.yaml.composer.ComposerError,
) as e:
message = (
f"{e.context} in line {e.context_mark.line}, column {e.context_mark.line}\n"
f"{e.problem} in line {e.problem_mark.line}, column {e.problem_mark.column}"
)
raise ansibledoctor.exception.YAMLError(message) from e
return data
def _yaml_remove_comments(d):
if isinstance(d, dict):
for k, v in d.items():
_yaml_remove_comments(k)
_yaml_remove_comments(v)
elif isinstance(d, list):
for elem in d:
_yaml_remove_comments(elem)
with suppress(AttributeError):
attr = (
"comment"
if isinstance(d, ruamel.yaml.scalarstring.ScalarString)
else ruamel.yaml.comments.Comment.attrib
)
delattr(d, attr)