xoxys.general/plugins/modules/openssl_pkcs12.py

435 lines
13 KiB
Python

#!/usr/bin/python
# -*- coding: utf-8 -*-
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
"""OpenSSL PKCS12 module."""
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
ANSIBLE_METADATA = {"metadata_version": "1.0", "status": ["preview"], "supported_by": "community"}
DOCUMENTATION = """
---
module: openssl_pkcs12
author: "Guillaume Delpierre (@gdelpierre)"
version_added: 1.1.0
short_description: Generate OpenSSL pkcs12 archive.
description:
- "This module allows one to (re-)generate PKCS#12."
requirements:
- "python-pyOpenSSL"
extends_documentation_fragment: files
options:
ca_certificates:
required: False
type: list
elements: str
description:
- List of CA certificate to include.
cert_path:
required: False
type: path
description:
- The path to read certificates and private keys from.
Must be in PEM format.
action:
required: False
default: "export"
choices: ["parse", "export"]
type: str
description:
- Create (export) or parse a PKCS#12.
src:
required: False
type: path
description:
- PKCS#12 file path to parse.
path:
required: True
type: path
description:
- Filename to write the PKCS#12 file to.
force:
required: False
default: False
type: bool
description:
- Should the file be regenerated even it it already exists.
friendly_name:
required: False
type: str
aliases:
- "name"
description:
- Specifies the friendly name for the certificate and private key.
iter_size:
required: False
default: 2048
type: int
description:
- Number of times to repeat the encryption step.
maciter_size:
required: False
default: 1
type: int
description:
- Number of times to repeat the MAC step.
mode:
required: False
default: "0400"
type: str
description:
- Default mode for the generated PKCS#12 file.
passphrase:
required: False
type: str
description:
- The PKCS#12 password.
privatekey_path:
required: False
type: path
description:
- File to read private key from.
privatekey_passphrase:
required: False
type: str
description:
- Passphrase source to decrypt any input private keys with.
state:
required: False
default: "present"
choices: ["present", "absent"]
type: str
description:
- Whether the file should exist or not.
"""
EXAMPLES = """
- name: "Generate PKCS#12 file"
openssl_pkcs12:
path: "/opt/certs/ansible.p12"
friendly_name: "raclette"
privatekey_path: "/opt/certs/keys/key.pem"
cert_path: "/opt/certs/cert.pem"
ca_certificates: "/opt/certs/ca.pem"
state: present
- name: "Change PKCS#12 file permission"
openssl_pkcs12:
path: "/opt/certs/ansible.p12"
friendly_name: "raclette"
privatekey_path: "/opt/certs/keys/key.pem"
cert_path: "/opt/certs/cert.pem"
ca_certificates: "/opt/certs/ca.pem"
state: present
mode: 0600
- name: "Regen PKCS#12 file"
openssl_pkcs12:
path: "/opt/certs/ansible.p12"
friendly_name: "raclette"
privatekey_path: "/opt/certs/keys/key.pem"
cert_path: "/opt/certs/cert.pem"
ca_certificates: "/opt/certs/ca.pem"
state: present
mode: 0600
force: True
- name: "Dump/Parse PKCS#12 file"
openssl_pkcs12:
src: "/opt/certs/ansible.p12"
path: "/opt/certs/ansible.pem"
state: present
- name: "Remove PKCS#12 file"
openssl_pkcs12:
path: "/opt/certs/ansible.p12"
state: absent
"""
RETURN = """
filename:
description: Path to the generate PKCS#12 file.
returned: changed or success
type: str
sample: /opt/certs/ansible.p12
"""
import errno
import os
from ansible.module_utils._text import to_native
from ansible.module_utils.basic import AnsibleModule
try:
from OpenSSL import crypto
except ImportError:
pyopenssl_found = False
else:
pyopenssl_found = True
class PkcsError(Exception): # noqa
pass
class Pkcs(object): # noqa
def __init__(self, module):
self.path = module.params["path"]
self.force = module.params["force"]
self.state = module.params["state"]
self.action = module.params["action"]
self.check_mode = module.check_mode
self.iter_size = module.params["iter_size"]
self.maciter_size = module.params["maciter_size"]
self.pkcs12 = None
self.src = module.params["src"]
self.privatekey_path = module.params["privatekey_path"]
self.privatekey_passphrase = module.params["privatekey_passphrase"]
self.cert_path = module.params["cert_path"]
self.ca_certificates = module.params["ca_certificates"]
self.friendly_name = module.params["friendly_name"]
self.passphrase = module.params["passphrase"]
self.mode = module.params["mode"]
self.changed = False
if not self.mode:
self.mode = int("0400", 8)
def load_privatekey(self, path, passphrase=None):
"""Load the specified OpenSSL private key."""
try:
privatekey = crypto.load_privatekey(
crypto.FILETYPE_PEM,
open(path, "rb").read(), # noqa
passphrase
) if passphrase else crypto.load_privatekey(
crypto.FILETYPE_PEM,
open(path, "rb").read() # noqa
)
return privatekey
except OSError as exc:
raise PkcsError(exc) from exc
def load_certificate(self, path):
"""Load the specified certificate."""
try:
cert_content = open(path, "rb").read() # noqa
cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_content)
return cert
except OSError as exc:
raise PkcsError(exc) from exc
def load_pkcs12(self, path, passphrase=None):
"""Load pkcs12 file."""
try:
if passphrase:
return crypto.load_pkcs12(open(path, "rb").read(), passphrase) # noqa
return crypto.load_pkcs12(open(path, "rb").read()) # noqa
except OSError as exc:
raise PkcsError(exc) from exc
def dump_privatekey(self, path):
"""Dump the specified OpenSSL private key."""
try:
return crypto.dump_privatekey(
crypto.FILETYPE_PEM,
self.load_pkcs12(path).get_privatekey()
)
except OSError as exc:
raise PkcsError(exc) from exc
def dump_certificate(self, path):
"""Dump the specified certificate."""
try:
return crypto.dump_certificate(
crypto.FILETYPE_PEM,
self.load_pkcs12(path).get_certificate()
)
except OSError as exc:
raise PkcsError(exc) from exc
def generate(self, module):
"""Generate PKCS#12 file archive."""
if not os.path.exists(self.path) or self.force:
self.pkcs12 = crypto.PKCS12()
try:
self.remove()
except PkcsError as exc:
module.fail_json(msg=to_native(exc))
if self.ca_certificates:
ca_certs = [self.load_certificate(ca_cert) for ca_cert in self.ca_certificates]
self.pkcs12.set_ca_certificates(ca_certs)
if self.cert_path:
self.pkcs12.set_certificate(self.load_certificate(self.cert_path))
if self.friendly_name:
self.pkcs12.set_friendlyname(self.friendly_name)
if self.privatekey_path:
self.pkcs12.set_privatekey(
self.load_privatekey(self.privatekey_path, self.privatekey_passphrase)
)
try:
with open(self.path, "wb", self.mode) as archive:
archive.write(
self.pkcs12.export(self.passphrase, self.iter_size, self.maciter_size)
)
module.set_mode_if_different(self.path, self.mode, False)
self.changed = True
except OSError as exc:
self.remove()
raise PkcsError(exc) from exc
file_args = module.load_file_common_arguments(module.params)
if module.set_fs_attributes_if_different(file_args, False):
module.set_mode_if_different(self.path, self.mode, False)
self.changed = True
def parse(self, module):
"""Read PKCS#12 file."""
if not os.path.exists(self.path) or self.force:
try:
self.remove()
with open(self.path, "wb") as content:
content.write(
f"{self.dump_privatekey(self.src)}{self.dump_certificate(self.src)}"
)
module.set_mode_if_different(self.path, self.mode, False)
self.changed = True
except OSError as exc:
raise PkcsError(exc) from exc
file_args = module.load_file_common_arguments(module.params)
if module.set_fs_attributes_if_different(file_args, False):
module.set_mode_if_different(self.path, self.mode, False)
self.changed = True
def remove(self):
"""Remove the PKCS#12 file archive from the filesystem."""
try:
os.remove(self.path)
self.changed = True
except OSError as exc:
if exc.errno != errno.ENOENT:
raise PkcsError(exc) from exc
pass
def check(self, module, perms_required=True): # noqa
def _check_pkey_passphrase():
if self.privatekey_passphrase:
try:
self.load_privatekey(self.path, self.privatekey_passphrase)
return True
except crypto.Error:
return False
return True
if not os.path.exists(self.path):
return os.path.exists(self.path)
return _check_pkey_passphrase
def dump(self):
"""Serialize the object into a dictionary."""
result = {
"changed": self.changed,
"filename": self.path,
}
if self.privatekey_path:
result["privatekey_path"] = self.privatekey_path
return result
def main():
argument_spec = dict(
action=dict(default="export", choices=["parse", "export"], type="str", required=False),
ca_certificates=dict(type="list", elements="str", required=False),
cert_path=dict(type="path"),
force=dict(default=False, type="bool"),
friendly_name=dict(type="str", aliases=["name"]),
iter_size=dict(default=2048, type="int"),
maciter_size=dict(default=1, type="int"),
passphrase=dict(type="str", no_log=True),
path=dict(type="path", required=True),
privatekey_path=dict(type="path"),
privatekey_passphrase=dict(type="str", no_log=True),
state=dict(default="present", choices=["present", "absent"], type="str"),
src=dict(type="path"),
mode=dict(default="0400", type="str", required=False)
)
required_if = [
["action", "export", ["friendly_name"]],
["action", "parse", ["src"]],
]
required_together = [
["privatekey_path", "friendly_name"],
]
module = AnsibleModule(
argument_spec=argument_spec,
add_file_common_args=True,
required_if=required_if,
required_together=required_together,
supports_check_mode=True,
)
if not pyopenssl_found:
module.fail_json(msg="The python pyOpenSSL library is required")
base_dir = os.path.dirname(module.params["path"])
if not os.path.isdir(base_dir):
module.fail_json(
name=base_dir,
msg=f"The directory {base_dir} does not exist or the file is not a directory"
)
pkcs12 = Pkcs(module)
if module.params["state"] == "present":
if module.check_mode:
result = pkcs12.dump()
result["changed"] = module.params["force"] or not pkcs12.check(module)
module.exit_json(**result)
try:
if module.params["action"] == "export":
pkcs12.generate(module)
else:
pkcs12.parse(module)
except PkcsError as exc:
module.fail_json(msg=to_native(exc))
else:
if module.check_mode:
result = pkcs12.dump()
result["changed"] = os.path.exists(module.params["path"])
module.exit_json(**result)
try:
pkcs12.remove()
except PkcsError as exc:
module.fail_json(msg=to_native(exc))
result = pkcs12.dump()
module.exit_json(**result)
if __name__ == "__main__":
main()