corenetworks/corenetworks/client.py

407 lines
12 KiB
Python

# -*- coding: utf-8 -*-
"""API client."""
import copy
import json
import os
from collections import defaultdict
import jsonschema
from requests import ConnectionError
from requests import HTTPError
from requests import Request
from requests import Session
from six import iteritems
import corenetworks
from .authenticators import CoreNetworksBasicAuth
from .authenticators import CoreNetworksTokenAuth
from .exceptions import AuthError
from .exceptions import CorenetworksError
from .exceptions import ValidationError
class CoreNetworks():
"""Create authenticated API client."""
def __init__(self, user=None, password=None, api_token=None, auto_commit=None):
self.__endpoint = "https://beta.api.core-networks.de"
self.__user_agent = "Core Networks Python API {version}".format(
version=corenetworks.__version__
)
self.config = self._config(user, password, api_token, auto_commit)
self._schema = {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"ttl": {
"type": "number"
},
"type": {
"type": "string"
},
"data": {
"type": "string"
},
},
}
self._filter_schema = {
"type": "object",
"properties": {
"name": {
"anyOf": [{
"type": "string",
}, {
"type": "array",
"items": {
"type": "string"
},
}],
},
"ttl": {
"anyOf": [{
"type": "number",
}, {
"type": "array",
"items": {
"type": "number"
},
}],
},
"type": {
"anyOf": [{
"type": "string",
}, {
"type": "array",
"items": {
"type": "string"
},
}],
},
"data": {
"anyOf": [{
"type": "string",
}, {
"type": "array",
"items": {
"type": "string"
},
}],
},
},
}
if self.config["api_token"]:
self._auth = CoreNetworksTokenAuth(self.config["api_token"])
else:
self._auth = CoreNetworksBasicAuth(
self.config["user"], self.config["password"], self.__endpoint
)
@staticmethod
def _config(user, password, api_token, auto_commit):
cfg = defaultdict(
dict, {
"user": None,
"password": None,
"api_token": None,
"auto_commit": False
}
)
cfg["user"] = os.getenv("CN_API_USER")
cfg["password"] = os.getenv("CN_API_PASSWORD")
cfg["api_token"] = os.getenv("CN_API_TOKEN")
cfg["user"] = user or cfg["user"]
cfg["password"] = password or cfg["password"]
cfg["api_token"] = api_token or cfg["api_token"]
cfg["auto_commit"] = auto_commit or cfg["auto_commit"]
if not (cfg["user"] and cfg["password"]) and not cfg["api_token"]:
raise AuthError("Insufficient authentication details provided")
return cfg
# ZONES
def zones(self):
"""
Get the list of DNS zones.
Returns:
list: List of zones.
"""
result = self.__rest_helper("/dnszones/", method="GET")
return self.__normalize(result)
def zone(self, zone):
"""
Get details about a DNS zone.
Args:
zone (str): Name of the target DNS zone.
Returns:
list: List of zones.
"""
result = self.__rest_helper("/dnszones/{zone}".format(zone=zone), method="GET")
return self.__normalize(result)
# RECORDS
def records(self, zone, params={}):
"""
Get the list of records for the specific domain.
Args:
zone (str): Name of the target DNS zone.
params (dict): Dictionary of filter parameters.
See https://beta.api.core-networks.de/doc/#functon_dnszones_records
but keep in mind that you have to pass a dict not a string. The required
filter string will be created automatically.
Example: `params={"type": ["NS", "SOA"]}` will result in
`filter=?type[]=NS&type[]=SOA`
Returns:
list: List of matching records.
"""
schema = copy.deepcopy(self._filter_schema)
self.__validate(params, schema)
filter_string = self.__json_to_filter(params)
result = self.__rest_helper(
"/dnszones/{zone}/records/{filter}".format(zone=zone, filter=filter_string),
method="GET"
)
return self.__normalize(result)
def add_record(self, zone, params):
"""
Create a record for the given domain.
Args:
zone (str): Name of the target DNS zone.
params (dict): Dictionary of record parameters.
See https://beta.api.core-networks.de/doc/#functon_dnszones_records_add
Returns:
list: List of added records.
"""
schema = copy.deepcopy(self._schema)
schema["required"] = ["name", "type", "data", "ttl"]
self.__validate(params, schema)
if params["type"] == "CNAME":
if params["name"] == "@":
raise CorenetworksError("CNAME records are not allowed for the zone itself.")
records = self.records(zone, params={"name": params["name"]})
for r in records:
if params["type"] == "CNAME" and r["type"] != "CNAME":
raise CorenetworksError(
"A record with the same name already exist ({name}). "
"CNAME records cannot use the same name with other records.".format(
name=r["name"]
)
)
if params["type"] != "CNAME" and r["type"] == "CNAME":
raise CorenetworksError(
"A CNAME record with the same name already exist ({name}). "
"CNAME records cannot use the same name with other records.".format(
name=r["name"]
)
)
if params["type"] == "CNAME" and r["data"] != params["data"]:
self._delete_record_raw(zone, params=r)
curr = copy.deepcopy(params)
curr.pop("ttl")
records = self.records(zone, curr)
if len(records) > 1:
raise CorenetworksError(
"More than one record already exists for the given attributes. "
"That should be impossible, please open an issue!"
)
# delete existing record with different ttl for a fake update
for r in records:
r["ttl"] = int(r["ttl"])
if r["ttl"] != params["ttl"]:
self._delete_record_raw(zone, params=r)
self.__rest_helper(
"/dnszones/{zone}/records/".format(zone=zone), data=params, method="POST"
)
result = self.records(zone=zone, params=params)
if self.config["auto_commit"]:
self.commit(zone=zone)
return self.__normalize(result)
def _delete_record_raw(self, zone, params):
r = self.__rest_helper(
"/dnszones/{zone}/records/delete".format(zone=zone), data=params, method="POST"
)
return r
def delete_record(self, zone, params):
"""
Delete all DNS records of a zone that match the data.
Args:
zone (str): Name of the target DNS zone.
params (dict): Dictionary of record parameters.
See https://beta.api.core-networks.de/doc/#functon_dnszones_records_add
Returns:
list: Empty list.
"""
schema = copy.deepcopy(self._schema)
schema["properties"]["force_all"] = {"type": "boolean"}
schema["anyOf"] = [{
"required": ["name"]
}, {
"required": ["type"]
}, {
"required": ["data"]
}, {
"required": ["force_all"]
}]
self.__validate(params, schema)
if params.get("force_all"):
params = {}
result = self._delete_record_raw(zone, params)
if self.config["auto_commit"]:
self.commit(zone=zone)
return self.__normalize(result)
def commit(self, zone):
"""
Commit changed records to the given DNS zone.
Args:
zone (str): Name of the target DNS zone.
Returns:
list: Empty list.
"""
result = self.__rest_helper(
"/dnszones/{zone}/records/commit".format(zone=zone), method="POST"
)
return self.__normalize(result)
def __rest_helper(self, url, data=None, params=None, method="GET"):
"""Handle requests to the Core Networks API."""
url = self.__endpoint + url
headers = {
"User-Agent": self.__user_agent,
"Accept": "application/json",
"Content-Type": "application/json"
}
if data:
json_data = json.dumps(data)
else:
json_data = None
request = Request(
method=method,
url=url,
headers=headers,
data=json_data,
params=params,
auth=self._auth
)
prepared_request = request.prepare()
r_json, r_headers = self.__request_helper(prepared_request)
return r_json
@staticmethod
def __request_helper(request):
"""Handle firing off requests and exception raising."""
try:
session = Session()
handle = session.send(request)
handle.raise_for_status()
except HTTPError as e:
raise CorenetworksError(
"Invalid response: {code} {reason}".format(
code=e.response.status_code, reason=e.response.reason
),
payload=e
)
except ConnectionError:
raise
if handle.status_code == 200:
response = handle.json()
else:
response = []
return response, handle.headers
@staticmethod
def __normalize(result):
if isinstance(result, list):
return [el for el in result]
elif isinstance(result, dict):
return [result]
else:
raise CorenetworksError("Unknown type: {}".format(type(result)))
@staticmethod
def __json_to_filter(data):
filter_list = []
for (key, value) in iteritems(data):
if isinstance(value, list):
for item in value:
filter_list.append("{key}[]={value}".format(key=key, value=item))
elif isinstance(value, str) or isinstance(value, int):
filter_list.append("{key}={value}".format(key=key, value=value))
else:
raise CorenetworksError("Unknown type: {}".format(type(value)))
filter_string = "&".join(filter_list)
if filter_string:
filter_string = "?{filter}".format(filter=filter_string)
return filter_string
@staticmethod
def __validate(data, schema):
try:
jsonschema.validate(data, schema)
except jsonschema.exceptions.ValidationError as e:
raise ValidationError("Dataset invalid: {reason}".format(reason=e.message), payload=e)