mirror of
https://github.com/stefanberger/swtpm.git
synced 2025-08-22 19:04:35 +00:00

Avoid trying to create TPM certificates while the issuer certificate has not been created, yet (in a 2nd step). To resolve this do not just test for availability of the signing key, which is created first, but also test for the issuer certifcate, which is created in a 2nd step when the local CA is created. If either one is missing, attempt to create the CA. Resolves: #644 Signed-off-by: Stefan Berger <stefanb@linux.ibm.com>
712 lines
25 KiB
Python
712 lines
25 KiB
Python
#!/usr/bin/env python3
|
|
""" swtpm_localca.py
|
|
|
|
A tool for creating TPM 1.2 and TPM 2 certificates localy or using pkcs11
|
|
"""
|
|
|
|
# Disable a couple of warnings:
|
|
# R0911: Too many return statements (10/6) (too-many-return-statements)
|
|
# R0912: Too many branches (15/12) (too-many-branches)
|
|
# R0913: Too many arguments (14/5) (too-many-arguments)
|
|
# R0914: Too many local variables (21/15) (too-many-locals)
|
|
# R0915: Too many statements (57/50) (too-many-statements)
|
|
# W0703: Catching too general exception Exception (broad-except)
|
|
# pylint: disable=W0703,R0911,R0912,R0913,R0914,R0915
|
|
|
|
#
|
|
# swtpm_localca.py
|
|
#
|
|
# Authors: Stefan Berger <stefanb@linux.ibm.com>
|
|
#
|
|
# (c) Copyright IBM Corporation 2020
|
|
#
|
|
|
|
import codecs
|
|
import fcntl
|
|
import getopt
|
|
import getpass
|
|
import os
|
|
import re
|
|
import stat
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
|
|
from py_swtpm_localca.swtpm_localca_conf import SYSCONFDIR
|
|
from py_swtpm_localca.swtpm_utils import logit, logerr
|
|
|
|
# Some flags
|
|
SETUP_TPM2_F = 1
|
|
# for TPM 2 EK
|
|
ALLOW_SIGNING_F = 2
|
|
DECRYPTION_F = 4
|
|
|
|
|
|
XCH = os.getenv("XDG_CONFIG_HOME")
|
|
HOME = os.getenv("HOME")
|
|
|
|
LOCALCA_OPTIONS = "swtpm-localca.options"
|
|
if XCH and os.access(os.path.join(XCH, LOCALCA_OPTIONS), os.R_OK):
|
|
DEFAULT_LOCALCA_OPTIONS = os.path.join(XCH, LOCALCA_OPTIONS)
|
|
elif HOME and os.access(os.path.join(HOME, ".config", LOCALCA_OPTIONS), os.R_OK):
|
|
DEFAULT_LOCALCA_OPTIONS = os.path.join(HOME, ".config", LOCALCA_OPTIONS)
|
|
else:
|
|
DEFAULT_LOCALCA_OPTIONS = os.path.join(os.sep + SYSCONFDIR, LOCALCA_OPTIONS)
|
|
|
|
LOCALCA_CONFIG = "swtpm-localca.conf"
|
|
if XCH and os.access(os.path.join(XCH, LOCALCA_CONFIG), os.R_OK):
|
|
DEFAULT_LOCALCA_CONFIG = os.path.join(XCH, LOCALCA_CONFIG)
|
|
elif HOME and os.access(os.path.join(HOME, ".config", LOCALCA_CONFIG), os.R_OK):
|
|
DEFAULT_LOCALCA_CONFIG = os.path.join(HOME, ".config", LOCALCA_CONFIG)
|
|
else:
|
|
DEFAULT_LOCALCA_CONFIG = os.path.join(os.sep + SYSCONFDIR, LOCALCA_CONFIG)
|
|
|
|
# Default logging goes to stderr
|
|
LOGFILE = ""
|
|
|
|
def resolve_string(inp):
|
|
""" resolve environment variables in a string """
|
|
result = ""
|
|
sidx = 0
|
|
|
|
while True:
|
|
idx = inp.find("${", sidx)
|
|
if idx < 0:
|
|
if sidx == 0:
|
|
return inp
|
|
result += inp[sidx:]
|
|
return result
|
|
|
|
result += inp[sidx:idx]
|
|
eidx = inp.find("}", idx + 2)
|
|
if eidx < 0:
|
|
result += inp[idx:]
|
|
return result
|
|
|
|
result += os.getenv(inp[idx + 2:eidx], '')
|
|
sidx = eidx + 1
|
|
|
|
|
|
def get_config_value(lines, configname, default=None):
|
|
""" Get a config value from a list of strings """
|
|
regex = r'^' + configname + r"\s*=\s*([^#\n]*).*"
|
|
for line in lines:
|
|
match = re.match(regex, line)
|
|
if match:
|
|
return resolve_string(match.groups()[0])
|
|
return default
|
|
|
|
|
|
def get_config_envvars(lines):
|
|
""" Extract all environment variables from the config file and return a map.
|
|
Environment variable lines must start with 'env:' and must not contain
|
|
trailing spaces or a comment starting with '#' """
|
|
res = {}
|
|
|
|
regex = r"^env:([a-zA-Z_][a-zA-Z_0-9]*)\s*=\s*([^\n]*).*"
|
|
for line in lines:
|
|
match = re.match(regex, line)
|
|
if match:
|
|
try:
|
|
encoded = codecs.encode(match.group(2), "latin-1", "backslashreplace")
|
|
res[match.group(1)] = codecs.decode(encoded, "unicode_escape")
|
|
except Exception as err:
|
|
logerr(LOGFILE, "Invalid character in value of %s environment variable: %s\n" %
|
|
(match.group(1), str(err)))
|
|
return {}, 1
|
|
|
|
return res, 0
|
|
|
|
|
|
def write_file(filename, text):
|
|
""" Write some text to a file """
|
|
try:
|
|
fileobj = open(filename, "w")
|
|
fileobj.write(text)
|
|
fileobj.close()
|
|
return 0
|
|
except Exception as err:
|
|
logerr(LOGFILE, "Could not write to file %s: %s\n" % (filename, str(err)))
|
|
return 1
|
|
|
|
|
|
def read_file(filename):
|
|
""" read contents from a file """
|
|
try:
|
|
fobj = open(filename, mode='rb')
|
|
result = fobj.read()
|
|
fobj.close()
|
|
return result, 0
|
|
except Exception as err:
|
|
logerr(LOGFILE, "Could not read from file %s: %s\n" % (filename, str(err)))
|
|
return "", 1
|
|
|
|
|
|
def read_file_lines(filename):
|
|
""" Read the lines from a file and return a list of the lines """
|
|
try:
|
|
fobj = open(filename, 'r')
|
|
lines = fobj.readlines()
|
|
fobj.close()
|
|
return lines, 0
|
|
except Exception as err:
|
|
logerr(LOGFILE, "Could not read from file %s : %s\n" & (filename, str(err)))
|
|
return [], 1
|
|
|
|
|
|
def makedir(dirname, purpose):
|
|
""" Create a directory if it does not exist """
|
|
if not os.path.exists(dirname):
|
|
logit(LOGFILE, "Creating swtpm-local dir '%s'.\n" % dirname)
|
|
try:
|
|
os.makedirs(dirname)
|
|
except OSError as err:
|
|
logerr(LOGFILE, "Could not create directory for '%s': %s\n" % (purpose, str(err)))
|
|
return 1
|
|
return 0
|
|
|
|
|
|
def remove_file(filename, verbose=True):
|
|
""" remove a file """
|
|
if not os.path.exists(filename):
|
|
return 0
|
|
try:
|
|
os.remove(filename)
|
|
return 0
|
|
except Exception as err:
|
|
if verbose:
|
|
logerr(LOGFILE, "Could not remove file %s: %s\n" % (filename, str(err)))
|
|
return 1
|
|
|
|
|
|
def remove_files(filename_list):
|
|
""" remove files in a list of filenames """
|
|
for filename in filename_list:
|
|
remove_file(filename, verbose=False)
|
|
|
|
|
|
def get_certtool():
|
|
""" Get the name of the certtool to use """
|
|
if os.uname().sysname == "Darwin":
|
|
return "gnutls-certtool"
|
|
return "certtool"
|
|
|
|
|
|
def create_localca_cert(lockfile, statedir, signkey, signkey_password, issuercert):
|
|
""" Create the local CA's certificate if it doesn't already exist. """
|
|
try:
|
|
filedes = os.open(lockfile, os.O_RDWR|os.O_CREAT)
|
|
except Exception as err:
|
|
logerr(LOGFILE, "Could not open lockfile %s: %s\n" % (lockfile, str(err)))
|
|
return 1
|
|
|
|
try:
|
|
fcntl.flock(filedes, fcntl.LOCK_EX)
|
|
|
|
if not os.path.exists(statedir):
|
|
if makedir(statedir, "statedir") != 0:
|
|
return 1
|
|
if not os.access(signkey, os.R_OK) or not os.access(issuercert, os.R_OK):
|
|
directory = os.path.dirname(signkey)
|
|
cakey = os.path.join(directory, "swtpm-localca-rootca-privkey.pem")
|
|
cacert = os.path.join(directory, "swtpm-localca-rootca-cert.pem")
|
|
|
|
swtpm_rootca_password = os.getenv("SWTPM_ROOTCA_PASSWORD")
|
|
certtool = get_certtool()
|
|
|
|
# First the root CA
|
|
cmd = [certtool, "--generate-privkey", "--outfile", cakey]
|
|
if swtpm_rootca_password:
|
|
# neither env. variable nor template file work...
|
|
cmd.extend(["--password", swtpm_rootca_password])
|
|
|
|
try:
|
|
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
output = proc.communicate()[0]
|
|
if proc.returncode:
|
|
logerr(LOGFILE, "Could not create root-CA key %s\n" % cakey)
|
|
logerr(LOGFILE, "%s" % output.decode())
|
|
return 1
|
|
except Exception as err:
|
|
logerr(LOGFILE, "Could not create root-CA key %s: %s\n" % (cakey, str(err)))
|
|
return 1
|
|
|
|
os.chmod(cakey, stat.S_IRUSR|stat.S_IWUSR|stat.S_IRGRP)
|
|
|
|
temp = tempfile.NamedTemporaryFile()
|
|
try:
|
|
filecontent = \
|
|
"cn=swtpm-localca-rootca\n" \
|
|
"ca\n" \
|
|
"cert_signing_key\n" \
|
|
"expiration_days = 3650\n"
|
|
temp.write(filecontent.encode())
|
|
temp.seek(0)
|
|
cmd = [certtool,
|
|
"--generate-self-signed",
|
|
"--template", temp.name,
|
|
"--outfile", cacert,
|
|
"--load-privkey", cakey]
|
|
|
|
certtool_env = {
|
|
"PATH": os.getenv("PATH")
|
|
}
|
|
if swtpm_rootca_password:
|
|
certtool_env["GNUTLS_PIN"] = swtpm_rootca_password
|
|
|
|
try:
|
|
proc = subprocess.Popen(cmd, env=certtool_env,
|
|
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
output = proc.communicate()[0]
|
|
if proc.returncode:
|
|
logerr(LOGFILE, "Could not create root-CA\n")
|
|
logerr(LOGFILE, "%s" % output.decode())
|
|
remove_files([cakey, cacert])
|
|
return 1
|
|
except Exception as err:
|
|
logerr(LOGFILE, "Could not create root-CA: %s\n" % str(err))
|
|
remove_files([cakey, cacert])
|
|
return 1
|
|
finally:
|
|
temp.close()
|
|
|
|
# intermediate CA
|
|
cmd = [certtool, "--generate-privkey", "--outfile", signkey]
|
|
if signkey_password:
|
|
cmd.extend(["--password", signkey_password])
|
|
|
|
try:
|
|
proc = subprocess.Popen(cmd,
|
|
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
output = proc.communicate()[0]
|
|
if proc.returncode:
|
|
logerr(LOGFILE, "Could not create local-CA key %s\n" % signkey)
|
|
logerr(LOGFILE, "certtool failed: %s\n" % output.decode())
|
|
remove_files([cakey, cacert, signkey])
|
|
return 1
|
|
except Exception as err:
|
|
logerr(LOGFILE, "Could not create local-CA key %s: %s\n" % (signkey, str(err)))
|
|
remove_files([cakey, cacert, signkey])
|
|
return 1
|
|
|
|
os.chmod(signkey, stat.S_IRUSR|stat.S_IWUSR|stat.S_IRGRP)
|
|
|
|
temp = tempfile.NamedTemporaryFile()
|
|
try:
|
|
filecontent = \
|
|
"cn=swtpm-localca\n" \
|
|
"ca\n" \
|
|
"cert_signing_key\n" \
|
|
"expiration_days = 3650\n"
|
|
if swtpm_rootca_password and signkey_password:
|
|
filecontent += "password = %s\n" % swtpm_rootca_password
|
|
temp.write(filecontent.encode())
|
|
temp.seek(0)
|
|
|
|
cmd = [certtool,
|
|
"--generate-certificate",
|
|
"--template", temp.name,
|
|
"--outfile", issuercert,
|
|
"--load-privkey", signkey,
|
|
"--load-ca-privkey", cakey,
|
|
"--load-ca-certificate", cacert]
|
|
|
|
certtool_env = {
|
|
"PATH": os.getenv("PATH")
|
|
}
|
|
if signkey_password:
|
|
certtool_env["GNUTLS_PIN"] = signkey_password
|
|
elif swtpm_rootca_password:
|
|
certtool_env["GNUTLS_PIN"] = swtpm_rootca_password
|
|
|
|
try:
|
|
proc = subprocess.Popen(cmd, env=certtool_env,
|
|
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
output = proc.communicate()[0]
|
|
if proc.returncode:
|
|
logerr(LOGFILE, "Could not create local CA\n")
|
|
logerr(LOGFILE, "%s" % output.decode())
|
|
remove_files([cakey, cacert, signkey, issuercert])
|
|
return 1
|
|
except Exception as err:
|
|
logerr(LOGFILE, "Could not create local CA: %s\n" % str(err))
|
|
remove_files([cakey, cacert, signkey, issuercert])
|
|
return 1
|
|
finally:
|
|
temp.close()
|
|
finally:
|
|
os.close(filedes)
|
|
|
|
return 0
|
|
|
|
|
|
def get_next_cert_serial(certserial, lockfile):
|
|
""" Get the next serial number for a certificate """
|
|
try:
|
|
filedes = os.open(lockfile, os.O_RDWR|os.O_CREAT)
|
|
except Exception as err:
|
|
logerr(LOGFILE, "Could not open lockfile %s: %s\n" % (lockfile, str(err)))
|
|
return 1
|
|
|
|
try:
|
|
fcntl.flock(filedes, fcntl.LOCK_EX)
|
|
|
|
if not os.access(certserial, os.R_OK):
|
|
_ = write_file(certserial, "1")
|
|
serial, ret = read_file(certserial)
|
|
if ret != 0:
|
|
return "", 1
|
|
if not serial.decode().isnumeric():
|
|
serial_n = 1
|
|
else:
|
|
serial_n = int(serial) + 1
|
|
ret = write_file(certserial, "%d" % serial_n)
|
|
if ret != 0:
|
|
return "", 1
|
|
finally:
|
|
os.close(filedes)
|
|
|
|
return "%d" % serial_n, 0
|
|
|
|
|
|
def create_cert(flags, typ, directory, ekparams, vmid, tpm_spec_params, tpm_attr_params,
|
|
signkey, signkey_password, issuercert, parentkey_password, swtpm_cert_env,
|
|
certserial, lockfile, optsfile):
|
|
""" Create the certificate """
|
|
serial, ret = get_next_cert_serial(certserial, lockfile)
|
|
if ret != 0:
|
|
return 1
|
|
|
|
options = []
|
|
lines, _ = read_file_lines(optsfile)
|
|
for line in lines:
|
|
if not line.strip():
|
|
continue
|
|
options.extend([x.strip() for x in line.split(" ", 1)])
|
|
|
|
if vmid:
|
|
subj = "CN=%s" % vmid
|
|
else:
|
|
subj = "CN=unknown"
|
|
|
|
if flags & SETUP_TPM2_F:
|
|
options.append("--tpm2")
|
|
else:
|
|
options.append("--add-header")
|
|
|
|
if typ == "ek":
|
|
if flags & ALLOW_SIGNING_F:
|
|
options.append("--allow-signing")
|
|
if flags & DECRYPTION_F:
|
|
options.append("--decryption")
|
|
|
|
match = re.search(r'x=([0-9A-Fa-f]+),y=([0-9A-Fa-f]+)(,id=([^,]+))?', ekparams)
|
|
if match:
|
|
keyparams = ["--ecc-x", match.group(1), "--ecc-y", match.group(2)]
|
|
if match.group(4):
|
|
keyparams.extend(["--ecc-curveid", match.group(4)])
|
|
else:
|
|
keyparams = ["--modulus", ekparams]
|
|
|
|
cmd = ["swtpm_cert",
|
|
"--subject", subj]
|
|
cmd.extend(options)
|
|
|
|
temp1 = None
|
|
temp2 = None
|
|
|
|
if signkey_password:
|
|
temp1 = tempfile.NamedTemporaryFile()
|
|
temp1.write(signkey_password.encode())
|
|
temp1.seek(0)
|
|
cmd.extend(["--signkey-pwd", "file:%s" % temp1.name])
|
|
|
|
if parentkey_password:
|
|
temp2 = tempfile.NamedTemporaryFile()
|
|
temp2.write(parentkey_password.encode())
|
|
temp2.seek(0)
|
|
cmd.extend(["--parentkey-pwd", "file:%s" % temp2.name])
|
|
|
|
if typ == "ek":
|
|
cmd.extend(tpm_spec_params)
|
|
|
|
cmd.extend(tpm_attr_params)
|
|
|
|
if typ == "platform":
|
|
cmd.extend(["--type", "platform",
|
|
"--out-cert", os.path.join(directory, "platform.cert")])
|
|
else:
|
|
cmd.extend(["--out-cert", os.path.join(directory, "ek.cert")])
|
|
|
|
cmd.extend(keyparams)
|
|
cmd.extend(["--signkey", signkey,
|
|
"--issuercert", issuercert,
|
|
"--days", "3650",
|
|
"--serial", serial])
|
|
|
|
if typ == "ek":
|
|
certtype = "EK"
|
|
else:
|
|
certtype = "platform"
|
|
|
|
try:
|
|
proc = subprocess.Popen(cmd, env=swtpm_cert_env,
|
|
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
output = proc.communicate()[0]
|
|
if proc.returncode:
|
|
logerr(LOGFILE, "Could not create %s certificate locally\n" % certtype)
|
|
logerr(LOGFILE, "%s" % output.decode())
|
|
return 1
|
|
except Exception as err:
|
|
logerr(LOGFILE, "Could not run swtpm_cert: %s\n" % str(err))
|
|
return 1
|
|
finally:
|
|
if temp1:
|
|
temp1.close()
|
|
if temp2:
|
|
temp2.close()
|
|
|
|
logit(LOGFILE, "Successfully created %s certificate locally.\n" % certtype)
|
|
|
|
return 0
|
|
|
|
|
|
def usage(prgname):
|
|
""" Display usage """
|
|
print(
|
|
"Usage: {prgname} [options]\n"
|
|
"\n"
|
|
"The following options are supported:\n"
|
|
"\n"
|
|
"--type type The type of certificate to create: 'ek' or 'platform'\n"
|
|
"--ek key-param The modulus of an RSA key or x=...,y=,... for an EC key\n"
|
|
"--dir directory The directory to write the resulting certificate into\n"
|
|
"--vmid vmid The ID of the virtual machine\n"
|
|
"--optsfile file A file containing options to pass to swtpm_cert\n"
|
|
"--configfile file A file containing configuration parameters for directory,\n"
|
|
" signing key and password and certificate to use\n"
|
|
"--logfile file A file to write a log into\n"
|
|
"--tpm-spec-family s The implemented spec family, e.g., '2.0'\n"
|
|
"--tpm-spec-revision i The spec revision of the TPM as integer; e.g., 146\n"
|
|
"--tpm-spec-level i The spec level of the TPM; must be an integer; e.g. 0\n"
|
|
"--tpm-manufacturer s The manufacturer of the TPM; e.g., id:00001014\n"
|
|
"--tpm-model s The model of the TPM; e.g., 'swtpm'\n"
|
|
"--tpm-version i The (firmware) version of the TPM; e.g., id:20160511\n"
|
|
"--tpm2 Generate a certificate for a TPM 2\n"
|
|
"--allow-signing The TPM 2's EK can be used for signing\n"
|
|
"--decryption The TPM 2's EK can be used for decryption\n"
|
|
"--help, -h, -? Display this help screen and exit\n"
|
|
"\n"
|
|
"\n"
|
|
"The following environment variables are supported:\n"
|
|
"\n"
|
|
"SWTPM_ROOTCA_PASSWORD The root CA's private key password\n"
|
|
"\n".format_map({
|
|
'prgname': prgname,
|
|
}))
|
|
|
|
|
|
def main():
|
|
""" main function - parses command line parameters and low level dealing with them """
|
|
global LOGFILE # pylint: disable=W0603
|
|
|
|
try:
|
|
opts, _ = getopt.getopt(sys.argv[1:], "h?",
|
|
["type=",
|
|
"ek=",
|
|
"dir=",
|
|
"vmid=",
|
|
"optsfile=",
|
|
"configfile=",
|
|
"logfile=",
|
|
"tpm-spec-family=",
|
|
"tpm-spec-revision=",
|
|
"tpm-spec-level=",
|
|
"tpm-manufacturer=",
|
|
"tpm-model=",
|
|
"tpm-version=",
|
|
"tpm2",
|
|
"allow-signing",
|
|
"decryption",
|
|
"help"])
|
|
except getopt.GetoptError as err:
|
|
print(err)
|
|
usage(sys.argv[0])
|
|
sys.exit(1)
|
|
|
|
flags = 0
|
|
typ = ""
|
|
ekparams = ""
|
|
directory = ""
|
|
vmid = ""
|
|
optsfile = DEFAULT_LOCALCA_OPTIONS
|
|
configfile = DEFAULT_LOCALCA_CONFIG
|
|
tpm_spec_params = []
|
|
tpm_attr_params = []
|
|
|
|
for opt, arg in opts:
|
|
if opt == '--type':
|
|
typ = arg
|
|
elif opt == '--ek':
|
|
ekparams = arg
|
|
elif opt == '--dir':
|
|
directory = arg
|
|
elif opt == '--vmid':
|
|
vmid = arg
|
|
elif opt == '--optsfile':
|
|
optsfile = arg
|
|
elif opt == '--configfile':
|
|
configfile = arg
|
|
elif opt == '--logfile':
|
|
LOGFILE = arg
|
|
elif opt in ['--tpm-spec-family', '--tpm-spec-revision', '--tpm-spec-level']:
|
|
tpm_spec_params.extend([opt, arg])
|
|
elif opt in ['--tpm-manufacturer', '--tpm-model', '--tpm-version']:
|
|
tpm_attr_params.extend([opt, arg])
|
|
elif opt == '--tpm2':
|
|
flags |= SETUP_TPM2_F
|
|
elif opt == '--allow-signing':
|
|
flags |= ALLOW_SIGNING_F
|
|
elif opt == '--decryption':
|
|
flags |= DECRYPTION_F
|
|
elif opt in ['--help', '-h', '-?']:
|
|
usage(sys.argv[0])
|
|
sys.exit(0)
|
|
|
|
if len(LOGFILE) > 0:
|
|
if os.path.islink(LOGFILE):
|
|
sys.stderr.write("Logfile must not be a symlink.\n")
|
|
sys.exit(1)
|
|
try:
|
|
fobj = open(LOGFILE, "a") # do not truncate
|
|
fobj.close()
|
|
except PermissionError:
|
|
sys.stderr.write("Cannot write to logfile %s.\n", LOGFILE)
|
|
sys.exit(1)
|
|
|
|
if not os.access(optsfile, os.R_OK):
|
|
logerr(LOGFILE, "Need read rights on options file %s for user %s.\n" %
|
|
(optsfile, getpass.getuser()))
|
|
sys.exit(1)
|
|
|
|
if not os.access(configfile, os.R_OK):
|
|
logerr(LOGFILE, "Need read rights on options file %s for user %s.\n" %
|
|
(configfile, getpass.getuser()))
|
|
sys.exit(1)
|
|
|
|
lines, ret = read_file_lines(configfile)
|
|
if ret != 0:
|
|
sys.exit(1)
|
|
|
|
statedir = get_config_value(lines, "statedir")
|
|
if not statedir:
|
|
logerr(LOGFILE, "Missing 'statedir' config value in config file %s.\n" % configfile)
|
|
sys.exit(1)
|
|
if not os.access(statedir, os.W_OK | os.R_OK):
|
|
logerr(LOGFILE, "Need read/write rights on statedir %s for user %s.\n" %
|
|
(statedir, getpass.getuser()))
|
|
if makedir(statedir, "statedir") != 0:
|
|
sys.exit(1)
|
|
|
|
lockfile = os.path.join(statedir, ".lock.swtpm-localca")
|
|
if os.path.exists(lockfile) and not os.access(lockfile, os.W_OK | os.R_OK):
|
|
logerr(LOGFILE, "Need read/write rights on %s for user %s.\n" %
|
|
(lockfile, getpass.getuser()))
|
|
sys.exit(1)
|
|
|
|
signkey = get_config_value(lines, "signingkey")
|
|
if not signkey:
|
|
logerr(LOGFILE, "Missing 'signingkey' config value in config file %s.\n" % configfile)
|
|
sys.exit(1)
|
|
|
|
# SIGNKEY may be a GNUTLS url like tpmkey:file= or tpmkey:uuid=
|
|
if not signkey.startswith("tpmkey:file=") and \
|
|
not signkey.startswith("tpmkey:uuid=") and \
|
|
not signkey.startswith("pkcs11:"):
|
|
if makedir(os.path.dirname(signkey), "signkey") != 0:
|
|
sys.exit(1)
|
|
|
|
signkey_password = get_config_value(lines, "signingkey_password")
|
|
parentkey_password = get_config_value(lines, "parentkey_password")
|
|
|
|
issuercert = get_config_value(lines, 'issuercert')
|
|
if not issuercert:
|
|
logerr(LOGFILE, "Missing 'issuercert' config value in config file %s.\n" % configfile)
|
|
sys.exit(1)
|
|
if makedir(os.path.dirname(issuercert), "issuercert") != 0:
|
|
sys.exit(1)
|
|
|
|
# environment needed for calling swtpm_cert
|
|
swtpm_cert_env = os.environ
|
|
|
|
# TPM keys are GNUTLS URIs...
|
|
if signkey.startswith("tpmkey:file=") or signkey.startswith("tpmkey:uuid="):
|
|
tss_tcsd_hostname = get_config_value(lines, "TSS_TCSD_HOSTNAME", "localhost")
|
|
tss_tcsd_port = get_config_value(lines, "TSS_TCSD_PORT", 30003)
|
|
swtpm_cert_env["TSS_TCSD_HOSTNAME"] = tss_tcsd_hostname
|
|
swtpm_cert_env["TSS_TCSD_PORT"] = tss_tcsd_port
|
|
|
|
logit(LOGFILE, "CA uses a GnuTLS TPM key; using TSS_TCSD_HOSTNAME=%s " \
|
|
"TSS_TCSD_PORT=%s\n" % (tss_tcsd_hostname, tss_tcsd_port))
|
|
elif signkey.startswith("pkcs11:"):
|
|
signkey = signkey.replace(r"\;", ";")
|
|
if signkey_password:
|
|
swtpm_cert_env["SWTPM_PKCS11_PIN"] = signkey_password
|
|
logit(LOGFILE, "CA uses a PKCS#11 key; using password from 'signingkey_password'\n")
|
|
else:
|
|
swtpm_pkcs11_pin = get_config_value(lines, "SWTPM_PKCS11_PIN", "swtpm-tpmca")
|
|
swtpm_cert_env["SWTPM_PKCS11_PIN"] = swtpm_pkcs11_pin
|
|
logit(LOGFILE, "CA uses a PKCS#11 key; using SWTPM_PKCS11_PIN\n")
|
|
# Get additional environment variables pkcs11 modules may need
|
|
envvars, ret = get_config_envvars(lines)
|
|
if ret != 0:
|
|
sys.exit(1)
|
|
swtpm_cert_env.update(envvars)
|
|
else:
|
|
create_certs = False
|
|
# create certificate if either the signing key or issuer cert are missing
|
|
if not os.access(signkey, os.R_OK):
|
|
if os.path.exists(signkey):
|
|
logerr(LOGFILE, "Need read rights on signing key %s for user %s.\n" %
|
|
(signkey, getpass.getuser()))
|
|
sys.exit(1)
|
|
create_certs = True
|
|
|
|
if not os.access(issuercert, os.R_OK):
|
|
if os.path.exists(issuercert):
|
|
logerr(LOGFILE, "Need read rights on issuer certficate %s for user %s.\n" %
|
|
(issuercert, getpass.getuser()))
|
|
sys.exit(1)
|
|
create_certs = True
|
|
|
|
if create_certs:
|
|
logit(LOGFILE, "Creating root CA and a local CA's signing key and issuer cert.\n")
|
|
if create_localca_cert(lockfile, statedir, signkey, signkey_password,
|
|
issuercert) != 0:
|
|
logerr(LOGFILE, "Error creating local CA's signing key and cert.\n")
|
|
sys.exit(1)
|
|
|
|
if not os.access(signkey, os.R_OK):
|
|
logerr(LOGFILE, "Need read rights on signing key %s for user %s.\n" %
|
|
(signkey, getpass.getuser()))
|
|
sys.exit(1)
|
|
|
|
if not os.access(issuercert, os.R_OK):
|
|
logerr(LOGFILE, "Need read rights on issuer certificate %s for user %s.\n" %
|
|
(issuercert, getpass.getuser()))
|
|
sys.exit(1)
|
|
|
|
certserial = get_config_value(lines, "certserial", os.path.join(statedir, "certserial"))
|
|
if makedir(os.path.dirname(certserial), "certserial") != 0:
|
|
sys.exit(1)
|
|
|
|
ret = create_cert(flags, typ, directory, ekparams, vmid, tpm_spec_params, tpm_attr_params,
|
|
signkey, signkey_password, issuercert, parentkey_password, swtpm_cert_env,
|
|
certserial, lockfile, optsfile)
|
|
|
|
sys.exit(ret)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|