0
0
mirror of https://github.com/thegeeklab/corenetworks.git synced 2024-06-02 16:59:41 +02:00
corenetworks/corenetworks/authenticators.py

87 lines
2.3 KiB
Python
Raw Normal View History

2020-03-30 23:28:34 +02:00
# -*- coding: utf-8 -*-
"""Custom authenticators."""
import datetime
2020-03-30 23:28:34 +02:00
import json
from requests import ConnectionError
from requests import HTTPError
from requests import Request
from requests import Session
from requests.auth import AuthBase
from .exceptions import AuthError
class CoreNetworksBasicAuth(AuthBase):
2020-04-13 15:37:11 +02:00
"""Define login based auth."""
2020-03-30 23:28:34 +02:00
def __init__(self, user, password, endpoint):
self.user = user
self.password = password
self.endpoint = endpoint
self.token, self.expires = self._login()
2020-03-30 23:28:34 +02:00
def __eq__(self, other): # noqa
return all([
self.user == getattr(other, "user", None),
self.password == getattr(other, "password", None),
])
def __ne__(self, other): # noqa
return not self == other
def __call__(self, r): # noqa
r.headers["Authorization"] = "Bearer {0!s}".format(self.token)
return r
def _login(self):
data = {}
data["login"] = self.user
data["password"] = self.password
json_data = json.dumps(data)
url = "{endpoint}/auth/token".format(endpoint=self.endpoint)
request = Request(method="POST", url=url, data=json_data)
prepared_request = request.prepare()
try:
session = Session()
handle = session.send(prepared_request)
handle.raise_for_status()
except HTTPError as e:
raise AuthError(
"Login failed: {code} {reason}".format(
code=e.response.status_code, reason=e.response.reason
),
payload=e
)
2020-04-11 12:11:55 +02:00
except ConnectionError:
2020-04-11 12:05:42 +02:00
raise
2020-03-30 23:28:34 +02:00
response = handle.json()
token = response["token"]
expires = datetime.datetime.now() + datetime.timedelta(seconds=response["expires"])
2020-03-30 23:28:34 +02:00
return token, expires
2020-03-30 23:28:34 +02:00
class CoreNetworksTokenAuth(AuthBase):
"""Define token based auth."""
def __init__(self, token):
self.token = token
def __eq__(self, other): # noqa
return all([
self.token == getattr(other, "api_token", None),
])
def __ne__(self, other): # noqa
return not self == other
def __call__(self, r): # noqa
r.headers["Authorization"] = "Bearer {0!s}".format(self.token)
return r