mirror of
https://github.com/thegeeklab/ansible-later.git
synced 2024-11-13 00:30:39 +00:00
281 lines
11 KiB
Python
281 lines
11 KiB
Python
# This file is dual licensed under the terms of the Apache License, Version
|
|
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
|
# for complete details.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
|
|
from cryptography import utils
|
|
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives.asymmetric import dh
|
|
|
|
|
|
def _dh_params_dup(dh_cdata, backend):
|
|
lib = backend._lib
|
|
ffi = backend._ffi
|
|
|
|
param_cdata = lib.DHparams_dup(dh_cdata)
|
|
backend.openssl_assert(param_cdata != ffi.NULL)
|
|
param_cdata = ffi.gc(param_cdata, lib.DH_free)
|
|
if lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_102:
|
|
# In OpenSSL versions < 1.0.2 or libressl DHparams_dup don't copy q
|
|
q = ffi.new("BIGNUM **")
|
|
lib.DH_get0_pqg(dh_cdata, ffi.NULL, q, ffi.NULL)
|
|
q_dup = lib.BN_dup(q[0])
|
|
res = lib.DH_set0_pqg(param_cdata, ffi.NULL, q_dup, ffi.NULL)
|
|
backend.openssl_assert(res == 1)
|
|
|
|
return param_cdata
|
|
|
|
|
|
def _dh_cdata_to_parameters(dh_cdata, backend):
|
|
param_cdata = _dh_params_dup(dh_cdata, backend)
|
|
return _DHParameters(backend, param_cdata)
|
|
|
|
|
|
@utils.register_interface(dh.DHParametersWithSerialization)
|
|
class _DHParameters(object):
|
|
def __init__(self, backend, dh_cdata):
|
|
self._backend = backend
|
|
self._dh_cdata = dh_cdata
|
|
|
|
def parameter_numbers(self):
|
|
p = self._backend._ffi.new("BIGNUM **")
|
|
g = self._backend._ffi.new("BIGNUM **")
|
|
q = self._backend._ffi.new("BIGNUM **")
|
|
self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
|
|
self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
|
|
self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
|
|
if q[0] == self._backend._ffi.NULL:
|
|
q_val = None
|
|
else:
|
|
q_val = self._backend._bn_to_int(q[0])
|
|
return dh.DHParameterNumbers(
|
|
p=self._backend._bn_to_int(p[0]),
|
|
g=self._backend._bn_to_int(g[0]),
|
|
q=q_val
|
|
)
|
|
|
|
def generate_private_key(self):
|
|
return self._backend.generate_dh_private_key(self)
|
|
|
|
def parameter_bytes(self, encoding, format):
|
|
if format is not serialization.ParameterFormat.PKCS3:
|
|
raise ValueError(
|
|
"Only PKCS3 serialization is supported"
|
|
)
|
|
if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
|
|
q = self._backend._ffi.new("BIGNUM **")
|
|
self._backend._lib.DH_get0_pqg(self._dh_cdata,
|
|
self._backend._ffi.NULL,
|
|
q,
|
|
self._backend._ffi.NULL)
|
|
if q[0] != self._backend._ffi.NULL:
|
|
raise UnsupportedAlgorithm(
|
|
"DH X9.42 serialization is not supported",
|
|
_Reasons.UNSUPPORTED_SERIALIZATION)
|
|
|
|
return self._backend._parameter_bytes(
|
|
encoding,
|
|
format,
|
|
self._dh_cdata
|
|
)
|
|
|
|
|
|
def _handle_dh_compute_key_error(errors, backend):
|
|
lib = backend._lib
|
|
|
|
backend.openssl_assert(
|
|
errors[0]._lib_reason_match(
|
|
lib.ERR_LIB_DH, lib.DH_R_INVALID_PUBKEY
|
|
)
|
|
)
|
|
|
|
raise ValueError("Public key value is invalid for this exchange.")
|
|
|
|
|
|
def _get_dh_num_bits(backend, dh_cdata):
|
|
p = backend._ffi.new("BIGNUM **")
|
|
backend._lib.DH_get0_pqg(dh_cdata, p,
|
|
backend._ffi.NULL,
|
|
backend._ffi.NULL)
|
|
backend.openssl_assert(p[0] != backend._ffi.NULL)
|
|
return backend._lib.BN_num_bits(p[0])
|
|
|
|
|
|
@utils.register_interface(dh.DHPrivateKeyWithSerialization)
|
|
class _DHPrivateKey(object):
|
|
def __init__(self, backend, dh_cdata, evp_pkey):
|
|
self._backend = backend
|
|
self._dh_cdata = dh_cdata
|
|
self._evp_pkey = evp_pkey
|
|
self._key_size_bytes = self._backend._lib.DH_size(dh_cdata)
|
|
|
|
@property
|
|
def key_size(self):
|
|
return _get_dh_num_bits(self._backend, self._dh_cdata)
|
|
|
|
def private_numbers(self):
|
|
p = self._backend._ffi.new("BIGNUM **")
|
|
g = self._backend._ffi.new("BIGNUM **")
|
|
q = self._backend._ffi.new("BIGNUM **")
|
|
self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
|
|
self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
|
|
self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
|
|
if q[0] == self._backend._ffi.NULL:
|
|
q_val = None
|
|
else:
|
|
q_val = self._backend._bn_to_int(q[0])
|
|
pub_key = self._backend._ffi.new("BIGNUM **")
|
|
priv_key = self._backend._ffi.new("BIGNUM **")
|
|
self._backend._lib.DH_get0_key(self._dh_cdata, pub_key, priv_key)
|
|
self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
|
|
self._backend.openssl_assert(priv_key[0] != self._backend._ffi.NULL)
|
|
return dh.DHPrivateNumbers(
|
|
public_numbers=dh.DHPublicNumbers(
|
|
parameter_numbers=dh.DHParameterNumbers(
|
|
p=self._backend._bn_to_int(p[0]),
|
|
g=self._backend._bn_to_int(g[0]),
|
|
q=q_val
|
|
),
|
|
y=self._backend._bn_to_int(pub_key[0])
|
|
),
|
|
x=self._backend._bn_to_int(priv_key[0])
|
|
)
|
|
|
|
def exchange(self, peer_public_key):
|
|
|
|
buf = self._backend._ffi.new("unsigned char[]", self._key_size_bytes)
|
|
pub_key = self._backend._ffi.new("BIGNUM **")
|
|
self._backend._lib.DH_get0_key(peer_public_key._dh_cdata, pub_key,
|
|
self._backend._ffi.NULL)
|
|
self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
|
|
res = self._backend._lib.DH_compute_key(
|
|
buf,
|
|
pub_key[0],
|
|
self._dh_cdata
|
|
)
|
|
|
|
if res == -1:
|
|
errors = self._backend._consume_errors()
|
|
return _handle_dh_compute_key_error(errors, self._backend)
|
|
else:
|
|
self._backend.openssl_assert(res >= 1)
|
|
|
|
key = self._backend._ffi.buffer(buf)[:res]
|
|
pad = self._key_size_bytes - len(key)
|
|
|
|
if pad > 0:
|
|
key = (b"\x00" * pad) + key
|
|
|
|
return key
|
|
|
|
def public_key(self):
|
|
dh_cdata = _dh_params_dup(self._dh_cdata, self._backend)
|
|
pub_key = self._backend._ffi.new("BIGNUM **")
|
|
self._backend._lib.DH_get0_key(self._dh_cdata,
|
|
pub_key, self._backend._ffi.NULL)
|
|
self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
|
|
pub_key_dup = self._backend._lib.BN_dup(pub_key[0])
|
|
self._backend.openssl_assert(pub_key_dup != self._backend._ffi.NULL)
|
|
|
|
res = self._backend._lib.DH_set0_key(dh_cdata,
|
|
pub_key_dup,
|
|
self._backend._ffi.NULL)
|
|
self._backend.openssl_assert(res == 1)
|
|
evp_pkey = self._backend._dh_cdata_to_evp_pkey(dh_cdata)
|
|
return _DHPublicKey(self._backend, dh_cdata, evp_pkey)
|
|
|
|
def parameters(self):
|
|
return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
|
|
|
|
def private_bytes(self, encoding, format, encryption_algorithm):
|
|
if format is not serialization.PrivateFormat.PKCS8:
|
|
raise ValueError(
|
|
"DH private keys support only PKCS8 serialization"
|
|
)
|
|
if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
|
|
q = self._backend._ffi.new("BIGNUM **")
|
|
self._backend._lib.DH_get0_pqg(self._dh_cdata,
|
|
self._backend._ffi.NULL,
|
|
q,
|
|
self._backend._ffi.NULL)
|
|
if q[0] != self._backend._ffi.NULL:
|
|
raise UnsupportedAlgorithm(
|
|
"DH X9.42 serialization is not supported",
|
|
_Reasons.UNSUPPORTED_SERIALIZATION)
|
|
|
|
return self._backend._private_key_bytes(
|
|
encoding,
|
|
format,
|
|
encryption_algorithm,
|
|
self._evp_pkey,
|
|
self._dh_cdata
|
|
)
|
|
|
|
|
|
@utils.register_interface(dh.DHPublicKeyWithSerialization)
|
|
class _DHPublicKey(object):
|
|
def __init__(self, backend, dh_cdata, evp_pkey):
|
|
self._backend = backend
|
|
self._dh_cdata = dh_cdata
|
|
self._evp_pkey = evp_pkey
|
|
self._key_size_bits = _get_dh_num_bits(self._backend, self._dh_cdata)
|
|
|
|
@property
|
|
def key_size(self):
|
|
return self._key_size_bits
|
|
|
|
def public_numbers(self):
|
|
p = self._backend._ffi.new("BIGNUM **")
|
|
g = self._backend._ffi.new("BIGNUM **")
|
|
q = self._backend._ffi.new("BIGNUM **")
|
|
self._backend._lib.DH_get0_pqg(self._dh_cdata, p, q, g)
|
|
self._backend.openssl_assert(p[0] != self._backend._ffi.NULL)
|
|
self._backend.openssl_assert(g[0] != self._backend._ffi.NULL)
|
|
if q[0] == self._backend._ffi.NULL:
|
|
q_val = None
|
|
else:
|
|
q_val = self._backend._bn_to_int(q[0])
|
|
pub_key = self._backend._ffi.new("BIGNUM **")
|
|
self._backend._lib.DH_get0_key(self._dh_cdata,
|
|
pub_key, self._backend._ffi.NULL)
|
|
self._backend.openssl_assert(pub_key[0] != self._backend._ffi.NULL)
|
|
return dh.DHPublicNumbers(
|
|
parameter_numbers=dh.DHParameterNumbers(
|
|
p=self._backend._bn_to_int(p[0]),
|
|
g=self._backend._bn_to_int(g[0]),
|
|
q=q_val
|
|
),
|
|
y=self._backend._bn_to_int(pub_key[0])
|
|
)
|
|
|
|
def parameters(self):
|
|
return _dh_cdata_to_parameters(self._dh_cdata, self._backend)
|
|
|
|
def public_bytes(self, encoding, format):
|
|
if format is not serialization.PublicFormat.SubjectPublicKeyInfo:
|
|
raise ValueError(
|
|
"DH public keys support only "
|
|
"SubjectPublicKeyInfo serialization"
|
|
)
|
|
|
|
if not self._backend._lib.Cryptography_HAS_EVP_PKEY_DHX:
|
|
q = self._backend._ffi.new("BIGNUM **")
|
|
self._backend._lib.DH_get0_pqg(self._dh_cdata,
|
|
self._backend._ffi.NULL,
|
|
q,
|
|
self._backend._ffi.NULL)
|
|
if q[0] != self._backend._ffi.NULL:
|
|
raise UnsupportedAlgorithm(
|
|
"DH X9.42 serialization is not supported",
|
|
_Reasons.UNSUPPORTED_SERIALIZATION)
|
|
|
|
return self._backend._public_key_bytes(
|
|
encoding,
|
|
format,
|
|
self,
|
|
self._evp_pkey,
|
|
None
|
|
)
|