mirror of
https://github.com/thegeeklab/corenetworks.git
synced 2024-11-14 16:50:39 +00:00
87 lines
2.3 KiB
Python
87 lines
2.3 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Custom authenticators."""
|
|
|
|
import datetime
|
|
import json
|
|
|
|
from requests import ConnectionError
|
|
from requests import HTTPError
|
|
from requests import Request
|
|
from requests import Session
|
|
from requests.auth import AuthBase
|
|
|
|
from .exceptions import AuthError
|
|
|
|
|
|
class CoreNetworksBasicAuth(AuthBase):
|
|
"""Define login based auth."""
|
|
|
|
def __init__(self, user, password, endpoint):
|
|
self.user = user
|
|
self.password = password
|
|
self.endpoint = endpoint
|
|
self.token, self.expires = self._login()
|
|
|
|
def __eq__(self, other): # noqa
|
|
return all([
|
|
self.user == getattr(other, "user", None),
|
|
self.password == getattr(other, "password", None),
|
|
])
|
|
|
|
def __ne__(self, other): # noqa
|
|
return not self == other
|
|
|
|
def __call__(self, r): # noqa
|
|
r.headers["Authorization"] = "Bearer {0!s}".format(self.token)
|
|
return r
|
|
|
|
def _login(self):
|
|
data = {}
|
|
data["login"] = self.user
|
|
data["password"] = self.password
|
|
|
|
json_data = json.dumps(data)
|
|
url = "{endpoint}/auth/token".format(endpoint=self.endpoint)
|
|
|
|
request = Request(method="POST", url=url, data=json_data)
|
|
prepared_request = request.prepare()
|
|
|
|
try:
|
|
session = Session()
|
|
handle = session.send(prepared_request)
|
|
handle.raise_for_status()
|
|
except HTTPError as e:
|
|
raise AuthError(
|
|
"Login failed: {code} {reason}".format(
|
|
code=e.response.status_code, reason=e.response.reason
|
|
),
|
|
payload=e
|
|
)
|
|
except ConnectionError:
|
|
raise
|
|
|
|
response = handle.json()
|
|
token = response["token"]
|
|
expires = datetime.datetime.now() + datetime.timedelta(seconds=response["expires"])
|
|
|
|
return token, expires
|
|
|
|
|
|
class CoreNetworksTokenAuth(AuthBase):
|
|
"""Define token based auth."""
|
|
|
|
def __init__(self, token):
|
|
self.token = token
|
|
|
|
def __eq__(self, other): # noqa
|
|
return all([
|
|
self.token == getattr(other, "api_token", None),
|
|
])
|
|
|
|
def __ne__(self, other): # noqa
|
|
return not self == other
|
|
|
|
def __call__(self, r): # noqa
|
|
r.headers["Authorization"] = "Bearer {0!s}".format(self.token)
|
|
return r
|