416 lines
12 KiB
Python
416 lines
12 KiB
Python
|
# -*- coding: utf-8 -*-
|
||
|
"""OpenSSL PKCS12 module."""
|
||
|
|
||
|
ANSIBLE_METADATA = {"metadata_version": "1.0", "status": ["preview"], "supported_by": "community"}
|
||
|
|
||
|
DOCUMENTATION = """
|
||
|
---
|
||
|
module: openssl_pkcs12
|
||
|
author: "Guillaume Delpierre (@gdelpierre)"
|
||
|
version_added: "2.4"
|
||
|
short_description: Generate OpenSSL pkcs12 archive.
|
||
|
description:
|
||
|
- "This module allows one to (re-)generate PKCS#12."
|
||
|
requirements:
|
||
|
- "python-pyOpenSSL"
|
||
|
options:
|
||
|
ca_certificates:
|
||
|
required: False
|
||
|
description:
|
||
|
- List of CA certificate to include.
|
||
|
cert_path:
|
||
|
required: False
|
||
|
description:
|
||
|
- The path to read certificates and private keys from.
|
||
|
Must be in PEM format.
|
||
|
action:
|
||
|
required: False
|
||
|
default: "export"
|
||
|
choices: ["parse", "export"]
|
||
|
description:
|
||
|
- Create (export) or parse a PKCS#12.
|
||
|
src:
|
||
|
required: False
|
||
|
description:
|
||
|
- PKCS#12 file path to parse.
|
||
|
path:
|
||
|
required: True
|
||
|
default: null
|
||
|
description:
|
||
|
- Filename to write the PKCS#12 file to.
|
||
|
force:
|
||
|
required: False
|
||
|
default: False
|
||
|
description:
|
||
|
- Should the file be regenerated even it it already exists.
|
||
|
friendly_name:
|
||
|
required: False
|
||
|
default: null
|
||
|
aliases: "name"
|
||
|
description:
|
||
|
- Specifies the friendly name for the certificate and private key.
|
||
|
iter_size:
|
||
|
required: False
|
||
|
default: 2048
|
||
|
description:
|
||
|
- Number of times to repeat the encryption step.
|
||
|
maciter_size:
|
||
|
required: False
|
||
|
default: 1
|
||
|
description:
|
||
|
- Number of times to repeat the MAC step.
|
||
|
mode:
|
||
|
required: False
|
||
|
default: 0400
|
||
|
description:
|
||
|
- Default mode for the generated PKCS#12 file.
|
||
|
passphrase:
|
||
|
required: False
|
||
|
default: null
|
||
|
description:
|
||
|
- The PKCS#12 password.
|
||
|
privatekey_path:
|
||
|
required: False
|
||
|
description:
|
||
|
- File to read private key from.
|
||
|
privatekey_passphrase:
|
||
|
required: False
|
||
|
default: null
|
||
|
description:
|
||
|
- Passphrase source to decrypt any input private keys with.
|
||
|
state:
|
||
|
required: False
|
||
|
default: "present"
|
||
|
choices: ["present", "absent"]
|
||
|
description:
|
||
|
- Whether the file should exist or not.
|
||
|
"""
|
||
|
|
||
|
EXAMPLES = """
|
||
|
- name: "Generate PKCS#12 file"
|
||
|
openssl_pkcs12:
|
||
|
path: "/opt/certs/ansible.p12"
|
||
|
friendly_name: "raclette"
|
||
|
privatekey_path: "/opt/certs/keys/key.pem"
|
||
|
cert_path: "/opt/certs/cert.pem"
|
||
|
ca_certificates: "/opt/certs/ca.pem"
|
||
|
state: present
|
||
|
|
||
|
- name: "Change PKCS#12 file permission"
|
||
|
openssl_pkcs12:
|
||
|
path: "/opt/certs/ansible.p12"
|
||
|
friendly_name: "raclette"
|
||
|
privatekey_path: "/opt/certs/keys/key.pem"
|
||
|
cert_path: "/opt/certs/cert.pem"
|
||
|
ca_certificates: "/opt/certs/ca.pem"
|
||
|
state: present
|
||
|
mode: 0600
|
||
|
|
||
|
- name: "Regen PKCS#12 file"
|
||
|
openssl_pkcs12:
|
||
|
path: "/opt/certs/ansible.p12"
|
||
|
friendly_name: "raclette"
|
||
|
privatekey_path: "/opt/certs/keys/key.pem"
|
||
|
cert_path: "/opt/certs/cert.pem"
|
||
|
ca_certificates: "/opt/certs/ca.pem"
|
||
|
state: present
|
||
|
mode: 0600
|
||
|
force: True
|
||
|
|
||
|
- name: "Dump/Parse PKCS#12 file"
|
||
|
openssl_pkcs12:
|
||
|
src: "/opt/certs/ansible.p12"
|
||
|
path: "/opt/certs/ansible.pem"
|
||
|
state: present
|
||
|
|
||
|
- name: "Remove PKCS#12 file"
|
||
|
openssl_pkcs12:
|
||
|
path: "/opt/certs/ansible.p12"
|
||
|
state: absent
|
||
|
"""
|
||
|
|
||
|
RETURN = """
|
||
|
filename:
|
||
|
description: Path to the generate PKCS#12 file.
|
||
|
returned: changed or success
|
||
|
type: string
|
||
|
sample: /opt/certs/ansible.p12
|
||
|
"""
|
||
|
|
||
|
import errno
|
||
|
import os
|
||
|
|
||
|
from ansible.module_utils._text import to_native
|
||
|
from ansible.module_utils.basic import AnsibleModule
|
||
|
|
||
|
try:
|
||
|
from OpenSSL import crypto
|
||
|
except ImportError:
|
||
|
pyopenssl_found = False
|
||
|
else:
|
||
|
pyopenssl_found = True
|
||
|
|
||
|
|
||
|
class PkcsError(Exception):
|
||
|
pass
|
||
|
|
||
|
|
||
|
class Pkcs(object):
|
||
|
|
||
|
def __init__(self, module):
|
||
|
self.path = module.params["path"]
|
||
|
self.force = module.params["force"]
|
||
|
self.state = module.params["state"]
|
||
|
self.action = module.params["action"]
|
||
|
self.check_mode = module.check_mode
|
||
|
self.iter_size = module.params["iter_size"]
|
||
|
self.maciter_size = module.params["maciter_size"]
|
||
|
self.pkcs12 = None
|
||
|
self.src = module.params["src"]
|
||
|
self.privatekey_path = module.params["privatekey_path"]
|
||
|
self.privatekey_passphrase = module.params["privatekey_passphrase"]
|
||
|
self.cert_path = module.params["cert_path"]
|
||
|
self.ca_certificates = module.params["ca_certificates"]
|
||
|
self.friendly_name = module.params["friendly_name"]
|
||
|
self.passphrase = module.params["passphrase"]
|
||
|
self.mode = module.params["mode"]
|
||
|
self.changed = False
|
||
|
if not self.mode:
|
||
|
self.mode = int("0400", 8)
|
||
|
|
||
|
def load_privatekey(self, path, passphrase=None):
|
||
|
"""Load the specified OpenSSL private key."""
|
||
|
try:
|
||
|
if passphrase:
|
||
|
privatekey = crypto.load_privatekey(
|
||
|
crypto.FILETYPE_PEM,
|
||
|
open(path, "rb").read(), passphrase
|
||
|
)
|
||
|
else:
|
||
|
privatekey = crypto.load_privatekey(crypto.FILETYPE_PEM, open(path, "rb").read())
|
||
|
|
||
|
return privatekey
|
||
|
except (IOError, OSError) as exc:
|
||
|
raise PkcsError(exc)
|
||
|
|
||
|
def load_certificate(self, path):
|
||
|
"""Load the specified certificate."""
|
||
|
try:
|
||
|
cert_content = open(path, "rb").read()
|
||
|
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_content)
|
||
|
return cert
|
||
|
except (IOError, OSError) as exc:
|
||
|
raise PkcsError(exc)
|
||
|
|
||
|
def load_pkcs12(self, path, passphrase=None):
|
||
|
"""Load pkcs12 file."""
|
||
|
try:
|
||
|
if passphrase:
|
||
|
return crypto.load_pkcs12(open(path, "rb").read(), passphrase)
|
||
|
else:
|
||
|
return crypto.load_pkcs12(open(path, "rb").read())
|
||
|
except (IOError, OSError) as exc:
|
||
|
raise PkcsError(exc)
|
||
|
|
||
|
def dump_privatekey(self, path):
|
||
|
"""Dump the specified OpenSSL private key."""
|
||
|
try:
|
||
|
return crypto.dump_privatekey(
|
||
|
crypto.FILETYPE_PEM,
|
||
|
self.load_pkcs12(path).get_privatekey()
|
||
|
)
|
||
|
except (IOError, OSError) as exc:
|
||
|
raise PkcsError(exc)
|
||
|
|
||
|
def dump_certificate(self, path):
|
||
|
"""Dump the specified certificate."""
|
||
|
try:
|
||
|
return crypto.dump_certificate(
|
||
|
crypto.FILETYPE_PEM,
|
||
|
self.load_pkcs12(path).get_certificate()
|
||
|
)
|
||
|
except (IOError, OSError) as exc:
|
||
|
raise PkcsError(exc)
|
||
|
|
||
|
def generate(self, module):
|
||
|
"""Generate PKCS#12 file archive."""
|
||
|
if not os.path.exists(self.path) or self.force:
|
||
|
self.pkcs12 = crypto.PKCS12()
|
||
|
|
||
|
try:
|
||
|
self.remove()
|
||
|
except PkcsError as exc:
|
||
|
module.fail_json(msg=to_native(exc))
|
||
|
|
||
|
if self.ca_certificates:
|
||
|
ca_certs = [self.load_certificate(ca_cert) for ca_cert in self.ca_certificates]
|
||
|
self.pkcs12.set_ca_certificates(ca_certs)
|
||
|
|
||
|
if self.cert_path:
|
||
|
self.pkcs12.set_certificate(self.load_certificate(self.cert_path))
|
||
|
|
||
|
if self.friendly_name:
|
||
|
self.pkcs12.set_friendlyname(self.friendly_name)
|
||
|
|
||
|
if self.privatekey_path:
|
||
|
self.pkcs12.set_privatekey(
|
||
|
self.load_privatekey(self.privatekey_path, self.privatekey_passphrase)
|
||
|
)
|
||
|
|
||
|
try:
|
||
|
with open(self.path, "wb", self.mode) as archive:
|
||
|
archive.write(
|
||
|
self.pkcs12.export(self.passphrase, self.iter_size, self.maciter_size)
|
||
|
)
|
||
|
module.set_mode_if_different(self.path, self.mode, False)
|
||
|
self.changed = True
|
||
|
except (IOError, OSError) as exc:
|
||
|
self.remove()
|
||
|
raise PkcsError(exc)
|
||
|
|
||
|
file_args = module.load_file_common_arguments(module.params)
|
||
|
if module.set_fs_attributes_if_different(file_args, False):
|
||
|
module.set_mode_if_different(self.path, self.mode, False)
|
||
|
self.changed = True
|
||
|
|
||
|
def parse(self, module):
|
||
|
"""Read PKCS#12 file."""
|
||
|
if not os.path.exists(self.path) or self.force:
|
||
|
try:
|
||
|
self.remove()
|
||
|
|
||
|
with open(self.path, "wb") as content:
|
||
|
content.write(
|
||
|
"{0}{1}".format(
|
||
|
self.dump_privatekey(self.src), self.dump_certificate(self.src)
|
||
|
)
|
||
|
)
|
||
|
module.set_mode_if_different(self.path, self.mode, False)
|
||
|
self.changed = True
|
||
|
except IOError as exc:
|
||
|
raise PkcsError(exc)
|
||
|
|
||
|
file_args = module.load_file_common_arguments(module.params)
|
||
|
if module.set_fs_attributes_if_different(file_args, False):
|
||
|
module.set_mode_if_different(self.path, self.mode, False)
|
||
|
self.changed = True
|
||
|
|
||
|
def remove(self):
|
||
|
"""Remove the PKCS#12 file archive from the filesystem."""
|
||
|
try:
|
||
|
os.remove(self.path)
|
||
|
self.changed = True
|
||
|
except OSError as exc:
|
||
|
if exc.errno != errno.ENOENT:
|
||
|
raise PkcsError(exc)
|
||
|
else:
|
||
|
pass
|
||
|
|
||
|
def check(self, module, perms_required=True):
|
||
|
|
||
|
def _check_pkey_passphrase():
|
||
|
if self.privatekey_passphrase:
|
||
|
try:
|
||
|
self.load_privatekey(self.path, self.privatekey_passphrase)
|
||
|
return True
|
||
|
except crypto.Error:
|
||
|
return False
|
||
|
return True
|
||
|
|
||
|
if not os.path.exists(self.path):
|
||
|
return os.path.exists(self.path)
|
||
|
|
||
|
return _check_pkey_passphrase
|
||
|
|
||
|
def dump(self):
|
||
|
"""Serialize the object into a dictionary."""
|
||
|
result = {
|
||
|
"changed": self.changed,
|
||
|
"filename": self.path,
|
||
|
}
|
||
|
|
||
|
if self.privatekey_path:
|
||
|
result["privatekey_path"] = self.privatekey_path
|
||
|
|
||
|
return result
|
||
|
|
||
|
|
||
|
def main():
|
||
|
argument_spec = dict(
|
||
|
action=dict(default="export", choices=["parse", "export"], type="str"),
|
||
|
ca_certificates=dict(type="list"),
|
||
|
cert_path=dict(type="path"),
|
||
|
force=dict(default=False, type="bool"),
|
||
|
friendly_name=dict(type="str", aliases=["name"]),
|
||
|
iter_size=dict(default=2048, type="int"),
|
||
|
maciter_size=dict(default=1, type="int"),
|
||
|
passphrase=dict(type="str", no_log=True),
|
||
|
path=dict(required=True, type="path"),
|
||
|
privatekey_path=dict(type="path"),
|
||
|
privatekey_passphrase=dict(type="str", no_log=True),
|
||
|
state=dict(default="present", choices=["present", "absent"], type="str"),
|
||
|
src=dict(type="path"),
|
||
|
)
|
||
|
|
||
|
required_if = [
|
||
|
["action", "export", ["friendly_name"]],
|
||
|
["action", "parse", ["src"]],
|
||
|
]
|
||
|
|
||
|
required_together = [
|
||
|
["privatekey_path", "friendly_name"],
|
||
|
]
|
||
|
|
||
|
module = AnsibleModule(
|
||
|
argument_spec=argument_spec,
|
||
|
add_file_common_args=True,
|
||
|
required_if=required_if,
|
||
|
required_together=required_together,
|
||
|
supports_check_mode=True,
|
||
|
)
|
||
|
|
||
|
if not pyopenssl_found:
|
||
|
module.fail_json(msg="The python pyOpenSSL library is required")
|
||
|
|
||
|
base_dir = os.path.dirname(module.params["path"])
|
||
|
if not os.path.isdir(base_dir):
|
||
|
module.fail_json(
|
||
|
name=base_dir,
|
||
|
msg="The directory {0} does not exist or "
|
||
|
"the file is not a directory".format(base_dir)
|
||
|
)
|
||
|
|
||
|
pkcs12 = Pkcs(module)
|
||
|
|
||
|
if module.params["state"] == "present":
|
||
|
if module.check_mode:
|
||
|
result = pkcs12.dump()
|
||
|
result["changed"] = module.params["force"] or not pkcs12.check(module)
|
||
|
module.exit_json(**result)
|
||
|
|
||
|
try:
|
||
|
if module.params["action"] == "export":
|
||
|
pkcs12.generate(module)
|
||
|
else:
|
||
|
pkcs12.parse(module)
|
||
|
except PkcsError as exc:
|
||
|
module.fail_json(msg=to_native(exc))
|
||
|
else:
|
||
|
if module.check_mode:
|
||
|
result = pkcs12.dump()
|
||
|
result["changed"] = os.path.exists(module.params["path"])
|
||
|
module.exit_json(**result)
|
||
|
|
||
|
try:
|
||
|
pkcs12.remove()
|
||
|
except PkcsError as exc:
|
||
|
module.fail_json(msg=to_native(exc))
|
||
|
|
||
|
result = pkcs12.dump()
|
||
|
|
||
|
module.exit_json(**result)
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|