407 lines
12 KiB
Python
407 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""API client."""
|
|
|
|
import copy
|
|
import json
|
|
import os
|
|
from collections import defaultdict
|
|
|
|
import jsonschema
|
|
from requests import ConnectionError
|
|
from requests import HTTPError
|
|
from requests import Request
|
|
from requests import Session
|
|
from six import iteritems
|
|
|
|
import corenetworks
|
|
|
|
from .authenticators import CoreNetworksBasicAuth
|
|
from .authenticators import CoreNetworksTokenAuth
|
|
from .exceptions import AuthError
|
|
from .exceptions import CorenetworksError
|
|
from .exceptions import ValidationError
|
|
|
|
|
|
class CoreNetworks():
|
|
"""Create authenticated API client."""
|
|
|
|
def __init__(self, user=None, password=None, api_token=None, auto_commit=None):
|
|
self.__endpoint = "https://beta.api.core-networks.de"
|
|
self.__user_agent = "Core Networks Python API {version}".format(
|
|
version=corenetworks.__version__
|
|
)
|
|
self.config = self._config(user, password, api_token, auto_commit)
|
|
|
|
self._schema = {
|
|
"type": "object",
|
|
"properties": {
|
|
"name": {
|
|
"type": "string"
|
|
},
|
|
"ttl": {
|
|
"type": "number"
|
|
},
|
|
"type": {
|
|
"type": "string"
|
|
},
|
|
"data": {
|
|
"type": "string"
|
|
},
|
|
},
|
|
}
|
|
|
|
self._filter_schema = {
|
|
"type": "object",
|
|
"properties": {
|
|
"name": {
|
|
"anyOf": [{
|
|
"type": "string",
|
|
}, {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "string"
|
|
},
|
|
}],
|
|
},
|
|
"ttl": {
|
|
"anyOf": [{
|
|
"type": "number",
|
|
}, {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "number"
|
|
},
|
|
}],
|
|
},
|
|
"type": {
|
|
"anyOf": [{
|
|
"type": "string",
|
|
}, {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "string"
|
|
},
|
|
}],
|
|
},
|
|
"data": {
|
|
"anyOf": [{
|
|
"type": "string",
|
|
}, {
|
|
"type": "array",
|
|
"items": {
|
|
"type": "string"
|
|
},
|
|
}],
|
|
},
|
|
},
|
|
}
|
|
|
|
if self.config["api_token"]:
|
|
self._auth = CoreNetworksTokenAuth(self.config["api_token"])
|
|
else:
|
|
self._auth = CoreNetworksBasicAuth(
|
|
self.config["user"], self.config["password"], self.__endpoint
|
|
)
|
|
|
|
@staticmethod
|
|
def _config(user, password, api_token, auto_commit):
|
|
cfg = defaultdict(
|
|
dict, {
|
|
"user": None,
|
|
"password": None,
|
|
"api_token": None,
|
|
"auto_commit": False
|
|
}
|
|
)
|
|
|
|
cfg["user"] = os.getenv("CN_API_USER")
|
|
cfg["password"] = os.getenv("CN_API_PASSWORD")
|
|
cfg["api_token"] = os.getenv("CN_API_TOKEN")
|
|
|
|
cfg["user"] = user or cfg["user"]
|
|
cfg["password"] = password or cfg["password"]
|
|
cfg["api_token"] = api_token or cfg["api_token"]
|
|
cfg["auto_commit"] = auto_commit or cfg["auto_commit"]
|
|
|
|
if not (cfg["user"] and cfg["password"]) and not cfg["api_token"]:
|
|
raise AuthError("Insufficient authentication details provided")
|
|
|
|
return cfg
|
|
|
|
# ZONES
|
|
|
|
def zones(self):
|
|
"""
|
|
Get the list of DNS zones.
|
|
|
|
Returns:
|
|
list: List of zones.
|
|
|
|
"""
|
|
result = self.__rest_helper("/dnszones/", method="GET")
|
|
|
|
return self.__normalize(result)
|
|
|
|
def zone(self, zone):
|
|
"""
|
|
Get details about a DNS zone.
|
|
|
|
Args:
|
|
zone (str): Name of the target DNS zone.
|
|
|
|
Returns:
|
|
list: List of zones.
|
|
|
|
"""
|
|
result = self.__rest_helper("/dnszones/{zone}".format(zone=zone), method="GET")
|
|
|
|
return self.__normalize(result)
|
|
|
|
# RECORDS
|
|
|
|
def records(self, zone, params={}):
|
|
"""
|
|
Get the list of records for the specific domain.
|
|
|
|
Args:
|
|
zone (str): Name of the target DNS zone.
|
|
params (dict): Dictionary of filter parameters.
|
|
See https://beta.api.core-networks.de/doc/#functon_dnszones_records
|
|
but keep in mind that you have to pass a dict not a string. The required
|
|
filter string will be created automatically.
|
|
|
|
Example: `params={"type": ["NS", "SOA"]}` will result in
|
|
`filter=?type[]=NS&type[]=SOA`
|
|
|
|
Returns:
|
|
list: List of matching records.
|
|
|
|
"""
|
|
schema = copy.deepcopy(self._filter_schema)
|
|
self.__validate(params, schema)
|
|
|
|
filter_string = self.__json_to_filter(params)
|
|
result = self.__rest_helper(
|
|
"/dnszones/{zone}/records/{filter}".format(zone=zone, filter=filter_string),
|
|
method="GET"
|
|
)
|
|
|
|
return self.__normalize(result)
|
|
|
|
def add_record(self, zone, params):
|
|
"""
|
|
Create a record for the given domain.
|
|
|
|
Args:
|
|
zone (str): Name of the target DNS zone.
|
|
params (dict): Dictionary of record parameters.
|
|
See https://beta.api.core-networks.de/doc/#functon_dnszones_records_add
|
|
|
|
Returns:
|
|
list: List of added records.
|
|
|
|
"""
|
|
schema = copy.deepcopy(self._schema)
|
|
schema["required"] = ["name", "type", "data", "ttl"]
|
|
self.__validate(params, schema)
|
|
|
|
if params["type"] == "CNAME":
|
|
if params["name"] == "@":
|
|
raise CorenetworksError("CNAME records are not allowed for the zone itself.")
|
|
|
|
records = self.records(zone, params={"name": params["name"]})
|
|
for r in records:
|
|
if params["type"] == "CNAME" and r["type"] != "CNAME":
|
|
raise CorenetworksError(
|
|
"A record with the same name already exist ({name}). "
|
|
"CNAME records cannot use the same name with other records.".format(
|
|
name=r["name"]
|
|
)
|
|
)
|
|
if params["type"] != "CNAME" and r["type"] == "CNAME":
|
|
raise CorenetworksError(
|
|
"A CNAME record with the same name already exist ({name}). "
|
|
"CNAME records cannot use the same name with other records.".format(
|
|
name=r["name"]
|
|
)
|
|
)
|
|
if params["type"] == "CNAME" and r["data"] != params["data"]:
|
|
self._delete_record_raw(zone, params=r)
|
|
|
|
curr = copy.deepcopy(params)
|
|
curr.pop("ttl")
|
|
records = self.records(zone, curr)
|
|
|
|
if len(records) > 1:
|
|
raise CorenetworksError(
|
|
"More than one record already exists for the given attributes. "
|
|
"That should be impossible, please open an issue!"
|
|
)
|
|
|
|
# delete existing record with different ttl for a fake update
|
|
for r in records:
|
|
r["ttl"] = int(r["ttl"])
|
|
if r["ttl"] != params["ttl"]:
|
|
self._delete_record_raw(zone, params=r)
|
|
|
|
self.__rest_helper(
|
|
"/dnszones/{zone}/records/".format(zone=zone), data=params, method="POST"
|
|
)
|
|
|
|
result = self.records(zone=zone, params=params)
|
|
|
|
if self.config["auto_commit"]:
|
|
self.commit(zone=zone)
|
|
|
|
return self.__normalize(result)
|
|
|
|
def _delete_record_raw(self, zone, params):
|
|
r = self.__rest_helper(
|
|
"/dnszones/{zone}/records/delete".format(zone=zone), data=params, method="POST"
|
|
)
|
|
|
|
return r
|
|
|
|
def delete_record(self, zone, params):
|
|
"""
|
|
Delete all DNS records of a zone that match the data.
|
|
|
|
Args:
|
|
zone (str): Name of the target DNS zone.
|
|
params (dict): Dictionary of record parameters.
|
|
See https://beta.api.core-networks.de/doc/#functon_dnszones_records_add
|
|
|
|
Returns:
|
|
list: Empty list.
|
|
|
|
"""
|
|
schema = copy.deepcopy(self._schema)
|
|
schema["properties"]["force_all"] = {"type": "boolean"}
|
|
schema["anyOf"] = [{
|
|
"required": ["name"]
|
|
}, {
|
|
"required": ["type"]
|
|
}, {
|
|
"required": ["data"]
|
|
}, {
|
|
"required": ["force_all"]
|
|
}]
|
|
self.__validate(params, schema)
|
|
|
|
if params.get("force_all"):
|
|
params = {}
|
|
|
|
result = self._delete_record_raw(zone, params)
|
|
|
|
if self.config["auto_commit"]:
|
|
self.commit(zone=zone)
|
|
|
|
return self.__normalize(result)
|
|
|
|
def commit(self, zone):
|
|
"""
|
|
Commit changed records to the given DNS zone.
|
|
|
|
Args:
|
|
zone (str): Name of the target DNS zone.
|
|
|
|
Returns:
|
|
list: Empty list.
|
|
|
|
"""
|
|
result = self.__rest_helper(
|
|
"/dnszones/{zone}/records/commit".format(zone=zone), method="POST"
|
|
)
|
|
|
|
return self.__normalize(result)
|
|
|
|
def __rest_helper(self, url, data=None, params=None, method="GET"):
|
|
"""Handle requests to the Core Networks API."""
|
|
url = self.__endpoint + url
|
|
headers = {
|
|
"User-Agent": self.__user_agent,
|
|
"Accept": "application/json",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
if data:
|
|
json_data = json.dumps(data)
|
|
else:
|
|
json_data = None
|
|
|
|
request = Request(
|
|
method=method,
|
|
url=url,
|
|
headers=headers,
|
|
data=json_data,
|
|
params=params,
|
|
auth=self._auth
|
|
)
|
|
|
|
prepared_request = request.prepare()
|
|
|
|
r_json, r_headers = self.__request_helper(prepared_request)
|
|
|
|
return r_json
|
|
|
|
@staticmethod
|
|
def __request_helper(request):
|
|
"""Handle firing off requests and exception raising."""
|
|
try:
|
|
session = Session()
|
|
handle = session.send(request)
|
|
|
|
handle.raise_for_status()
|
|
except HTTPError as e:
|
|
raise CorenetworksError(
|
|
"Invalid response: {code} {reason}".format(
|
|
code=e.response.status_code, reason=e.response.reason
|
|
),
|
|
payload=e
|
|
)
|
|
except ConnectionError:
|
|
raise
|
|
|
|
if handle.status_code == 200:
|
|
response = handle.json()
|
|
else:
|
|
response = []
|
|
|
|
return response, handle.headers
|
|
|
|
@staticmethod
|
|
def __normalize(result):
|
|
if isinstance(result, list):
|
|
return [el for el in result]
|
|
elif isinstance(result, dict):
|
|
return [result]
|
|
else:
|
|
raise CorenetworksError("Unknown type: {}".format(type(result)))
|
|
|
|
@staticmethod
|
|
def __json_to_filter(data):
|
|
filter_list = []
|
|
|
|
for (key, value) in iteritems(data):
|
|
if isinstance(value, list):
|
|
for item in value:
|
|
filter_list.append("{key}[]={value}".format(key=key, value=item))
|
|
elif isinstance(value, str) or isinstance(value, int):
|
|
filter_list.append("{key}={value}".format(key=key, value=value))
|
|
else:
|
|
raise CorenetworksError("Unknown type: {}".format(type(value)))
|
|
|
|
filter_string = "&".join(filter_list)
|
|
|
|
if filter_string:
|
|
filter_string = "?{filter}".format(filter=filter_string)
|
|
|
|
return filter_string
|
|
|
|
@staticmethod
|
|
def __validate(data, schema):
|
|
try:
|
|
jsonschema.validate(data, schema)
|
|
except jsonschema.exceptions.ValidationError as e:
|
|
raise ValidationError("Dataset invalid: {reason}".format(reason=e.message), payload=e)
|