# -*- coding: utf-8 -*- """OpenSSL PKCS12 module.""" ANSIBLE_METADATA = {"metadata_version": "1.0", "status": ["preview"], "supported_by": "community"} DOCUMENTATION = """ --- module: openssl_pkcs12 author: "Guillaume Delpierre (@gdelpierre)" version_added: "2.4" short_description: Generate OpenSSL pkcs12 archive. description: - "This module allows one to (re-)generate PKCS#12." requirements: - "python-pyOpenSSL" options: ca_certificates: required: False description: - List of CA certificate to include. cert_path: required: False description: - The path to read certificates and private keys from. Must be in PEM format. action: required: False default: "export" choices: ["parse", "export"] description: - Create (export) or parse a PKCS#12. src: required: False description: - PKCS#12 file path to parse. path: required: True default: null description: - Filename to write the PKCS#12 file to. force: required: False default: False description: - Should the file be regenerated even it it already exists. friendly_name: required: False default: null aliases: "name" description: - Specifies the friendly name for the certificate and private key. iter_size: required: False default: 2048 description: - Number of times to repeat the encryption step. maciter_size: required: False default: 1 description: - Number of times to repeat the MAC step. mode: required: False default: 0400 description: - Default mode for the generated PKCS#12 file. passphrase: required: False default: null description: - The PKCS#12 password. privatekey_path: required: False description: - File to read private key from. privatekey_passphrase: required: False default: null description: - Passphrase source to decrypt any input private keys with. state: required: False default: "present" choices: ["present", "absent"] description: - Whether the file should exist or not. """ EXAMPLES = """ - name: "Generate PKCS#12 file" openssl_pkcs12: path: "/opt/certs/ansible.p12" friendly_name: "raclette" privatekey_path: "/opt/certs/keys/key.pem" cert_path: "/opt/certs/cert.pem" ca_certificates: "/opt/certs/ca.pem" state: present - name: "Change PKCS#12 file permission" openssl_pkcs12: path: "/opt/certs/ansible.p12" friendly_name: "raclette" privatekey_path: "/opt/certs/keys/key.pem" cert_path: "/opt/certs/cert.pem" ca_certificates: "/opt/certs/ca.pem" state: present mode: 0600 - name: "Regen PKCS#12 file" openssl_pkcs12: path: "/opt/certs/ansible.p12" friendly_name: "raclette" privatekey_path: "/opt/certs/keys/key.pem" cert_path: "/opt/certs/cert.pem" ca_certificates: "/opt/certs/ca.pem" state: present mode: 0600 force: True - name: "Dump/Parse PKCS#12 file" openssl_pkcs12: src: "/opt/certs/ansible.p12" path: "/opt/certs/ansible.pem" state: present - name: "Remove PKCS#12 file" openssl_pkcs12: path: "/opt/certs/ansible.p12" state: absent """ RETURN = """ filename: description: Path to the generate PKCS#12 file. returned: changed or success type: string sample: /opt/certs/ansible.p12 """ import errno import os from ansible.module_utils._text import to_native from ansible.module_utils.basic import AnsibleModule try: from OpenSSL import crypto except ImportError: pyopenssl_found = False else: pyopenssl_found = True class PkcsError(Exception): pass class Pkcs(object): def __init__(self, module): self.path = module.params["path"] self.force = module.params["force"] self.state = module.params["state"] self.action = module.params["action"] self.check_mode = module.check_mode self.iter_size = module.params["iter_size"] self.maciter_size = module.params["maciter_size"] self.pkcs12 = None self.src = module.params["src"] self.privatekey_path = module.params["privatekey_path"] self.privatekey_passphrase = module.params["privatekey_passphrase"] self.cert_path = module.params["cert_path"] self.ca_certificates = module.params["ca_certificates"] self.friendly_name = module.params["friendly_name"] self.passphrase = module.params["passphrase"] self.mode = module.params["mode"] self.changed = False if not self.mode: self.mode = int("0400", 8) def load_privatekey(self, path, passphrase=None): """Load the specified OpenSSL private key.""" try: if passphrase: privatekey = crypto.load_privatekey( crypto.FILETYPE_PEM, open(path, "rb").read(), passphrase ) else: privatekey = crypto.load_privatekey(crypto.FILETYPE_PEM, open(path, "rb").read()) return privatekey except (IOError, OSError) as exc: raise PkcsError(exc) def load_certificate(self, path): """Load the specified certificate.""" try: cert_content = open(path, "rb").read() cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_content) return cert except (IOError, OSError) as exc: raise PkcsError(exc) def load_pkcs12(self, path, passphrase=None): """Load pkcs12 file.""" try: if passphrase: return crypto.load_pkcs12(open(path, "rb").read(), passphrase) else: return crypto.load_pkcs12(open(path, "rb").read()) except (IOError, OSError) as exc: raise PkcsError(exc) def dump_privatekey(self, path): """Dump the specified OpenSSL private key.""" try: return crypto.dump_privatekey( crypto.FILETYPE_PEM, self.load_pkcs12(path).get_privatekey() ) except (IOError, OSError) as exc: raise PkcsError(exc) def dump_certificate(self, path): """Dump the specified certificate.""" try: return crypto.dump_certificate( crypto.FILETYPE_PEM, self.load_pkcs12(path).get_certificate() ) except (IOError, OSError) as exc: raise PkcsError(exc) def generate(self, module): """Generate PKCS#12 file archive.""" if not os.path.exists(self.path) or self.force: self.pkcs12 = crypto.PKCS12() try: self.remove() except PkcsError as exc: module.fail_json(msg=to_native(exc)) if self.ca_certificates: ca_certs = [self.load_certificate(ca_cert) for ca_cert in self.ca_certificates] self.pkcs12.set_ca_certificates(ca_certs) if self.cert_path: self.pkcs12.set_certificate(self.load_certificate(self.cert_path)) if self.friendly_name: self.pkcs12.set_friendlyname(self.friendly_name) if self.privatekey_path: self.pkcs12.set_privatekey( self.load_privatekey(self.privatekey_path, self.privatekey_passphrase) ) try: with open(self.path, "wb", self.mode) as archive: archive.write( self.pkcs12.export(self.passphrase, self.iter_size, self.maciter_size) ) module.set_mode_if_different(self.path, self.mode, False) self.changed = True except (IOError, OSError) as exc: self.remove() raise PkcsError(exc) file_args = module.load_file_common_arguments(module.params) if module.set_fs_attributes_if_different(file_args, False): module.set_mode_if_different(self.path, self.mode, False) self.changed = True def parse(self, module): """Read PKCS#12 file.""" if not os.path.exists(self.path) or self.force: try: self.remove() with open(self.path, "wb") as content: content.write( "{0}{1}".format( self.dump_privatekey(self.src), self.dump_certificate(self.src) ) ) module.set_mode_if_different(self.path, self.mode, False) self.changed = True except IOError as exc: raise PkcsError(exc) file_args = module.load_file_common_arguments(module.params) if module.set_fs_attributes_if_different(file_args, False): module.set_mode_if_different(self.path, self.mode, False) self.changed = True def remove(self): """Remove the PKCS#12 file archive from the filesystem.""" try: os.remove(self.path) self.changed = True except OSError as exc: if exc.errno != errno.ENOENT: raise PkcsError(exc) else: pass def check(self, module, perms_required=True): def _check_pkey_passphrase(): if self.privatekey_passphrase: try: self.load_privatekey(self.path, self.privatekey_passphrase) return True except crypto.Error: return False return True if not os.path.exists(self.path): return os.path.exists(self.path) return _check_pkey_passphrase def dump(self): """Serialize the object into a dictionary.""" result = { "changed": self.changed, "filename": self.path, } if self.privatekey_path: result["privatekey_path"] = self.privatekey_path return result def main(): argument_spec = dict( action=dict(default="export", choices=["parse", "export"], type="str"), ca_certificates=dict(type="list"), cert_path=dict(type="path"), force=dict(default=False, type="bool"), friendly_name=dict(type="str", aliases=["name"]), iter_size=dict(default=2048, type="int"), maciter_size=dict(default=1, type="int"), passphrase=dict(type="str", no_log=True), path=dict(required=True, type="path"), privatekey_path=dict(type="path"), privatekey_passphrase=dict(type="str", no_log=True), state=dict(default="present", choices=["present", "absent"], type="str"), src=dict(type="path"), ) required_if = [ ["action", "export", ["friendly_name"]], ["action", "parse", ["src"]], ] required_together = [ ["privatekey_path", "friendly_name"], ] module = AnsibleModule( argument_spec=argument_spec, add_file_common_args=True, required_if=required_if, required_together=required_together, supports_check_mode=True, ) if not pyopenssl_found: module.fail_json(msg="The python pyOpenSSL library is required") base_dir = os.path.dirname(module.params["path"]) if not os.path.isdir(base_dir): module.fail_json( name=base_dir, msg="The directory {0} does not exist or " "the file is not a directory".format(base_dir) ) pkcs12 = Pkcs(module) if module.params["state"] == "present": if module.check_mode: result = pkcs12.dump() result["changed"] = module.params["force"] or not pkcs12.check(module) module.exit_json(**result) try: if module.params["action"] == "export": pkcs12.generate(module) else: pkcs12.parse(module) except PkcsError as exc: module.fail_json(msg=to_native(exc)) else: if module.check_mode: result = pkcs12.dump() result["changed"] = os.path.exists(module.params["path"]) module.exit_json(**result) try: pkcs12.remove() except PkcsError as exc: module.fail_json(msg=to_native(exc)) result = pkcs12.dump() module.exit_json(**result) if __name__ == "__main__": main()