nixpkgs/nixos/tests/acme/python-utils.py
Lucas Savva 229640ed3a nixos/tests/acme: Refactor test suite
Split tests up based on certain use cases:

- http01-builtin: Tests most functionality of the core module, such
 as the systemd and hashing components, whilst utilising lego's built
 in http01 resolution mechanis.
- dns01: Tests only that DNS01 renewal works as expected.
- nginx: Tests nginx compatability
- httpd: Tests httpd compatability
- caddy: Tests caddy compatability
2025-02-22 23:58:29 +00:00

166 lines
5.7 KiB
Python

#!/usr/bin/env python3
import time
TOTAL_RETRIES = 20
def run(node, cmd, fail=False):
if fail:
return node.fail(cmd)
else:
return node.succeed(cmd)
# Waits for the system to finish booting or switching configuration
def wait_for_running(node):
node.succeed("systemctl is-system-running --wait")
# On first switch, this will create a symlink to the current system so that we can
# quickly switch between derivations
def switch_to(node, name, fail=False) -> None:
root_specs = "/tmp/specialisation"
node.execute(
f"test -e {root_specs}"
f" || ln -s $(readlink /run/current-system)/specialisation {root_specs}"
)
switcher_path = (
f"/run/current-system/specialisation/{name}/bin/switch-to-configuration"
)
rc, _ = node.execute(f"test -e '{switcher_path}'")
if rc > 0:
switcher_path = f"/tmp/specialisation/{name}/bin/switch-to-configuration"
cmd = f"{switcher_path} test"
run(node, cmd, fail=fail)
if not fail:
wait_for_running(node)
# Ensures the issuer of our cert matches the chain
# and matches the issuer we expect it to be.
# It's a good validation to ensure the cert.pem and fullchain.pem
# are not still selfsigned after verification
def check_issuer(node, cert_name, issuer) -> None:
for fname in ("cert.pem", "fullchain.pem"):
actual_issuer = node.succeed(
f"openssl x509 -noout -issuer -in /var/lib/acme/{cert_name}/{fname}"
).partition("=")[2]
assert (
issuer.lower() in actual_issuer.lower()
), f"{fname} issuer mismatch. Expected {issuer} got {actual_issuer}"
# Ensures the provided domain matches with the given cert
def check_domain(node, cert_name, domain, fail=False) -> None:
cmd = f"openssl x509 -noout -checkhost '{domain}' -in /var/lib/acme/{cert_name}/cert.pem"
run(node, cmd, fail=fail)
# Ensures the required values for OCSP stapling are present
# Pebble doesn't provide a full OCSP responder, so just checks the URL
def check_stapling(node, cert_name, ca_domain, fail=False):
rc, _ = node.execute(
f"openssl x509 -noout -ocsp_uri -in /var/lib/acme/{cert_name}/cert.pem"
f" | grep -i 'http://{ca_domain}:4002' 2>&1",
)
assert rc == 0 or fail, "Failed to find OCSP URI in issued certificate"
run(
node,
f"openssl x509 -noout -ext tlsfeature -in /var/lib/acme/{cert_name}/cert.pem"
f" | grep -iv 'no extensions' 2>&1",
fail=fail,
)
# Checks the keyType by validating the number of bits
def check_key_bits(node, cert_name, bits, fail=False):
run(
node,
f"openssl x509 -noout -text -in /var/lib/acme/{cert_name}/cert.pem"
f" | grep -i Public-Key | grep {bits} | tee /dev/stderr",
fail=fail,
)
# Ensure cert comes before chain in fullchain.pem
def check_fullchain(node, cert_name):
cert_file = f"/var/lib/acme/{cert_name}/fullchain.pem"
num_certs = node.succeed(f"grep -o 'END CERTIFICATE' {cert_file}")
assert len(num_certs.strip().split("\n")) > 1, "Insufficient certs in fullchain.pem"
first_cert_data = node.succeed(
f"grep -m1 -B50 'END CERTIFICATE' {cert_file}"
" | openssl x509 -noout -text"
)
for line in first_cert_data.lower().split("\n"):
if "dns:" in line:
print(f"First DNSName in fullchain.pem: {line}")
assert cert_name.lower() in line, f"{cert_name} not found in {line}"
return
assert False
# Checks the permissions in the cert directories are as expected
def check_permissions(node, cert_name, group):
stat = "stat -L -c '%a %U %G' "
node.succeed(
f"test $({stat} /var/lib/acme/{cert_name}/*.pem"
f" | tee /dev/stderr | grep -v '640 acme {group}' | wc -l) -eq 0"
)
node.succeed(
f"test $({stat} /var/lib/acme/.lego/{cert_name}/*/{cert_name}*"
f" | tee /dev/stderr | grep -v '600 acme {group}' | wc -l) -eq 0"
)
node.succeed(
f"test $({stat} /var/lib/acme/{cert_name}"
f" | tee /dev/stderr | grep -v '750 acme {group}' | wc -l) -eq 0"
)
node.succeed(
f"test $(find /var/lib/acme/.lego/accounts -type f -exec {stat} {{}} \\;"
f" | tee /dev/stderr | grep -v '600 acme {group}' | wc -l) -eq 0"
)
# BackoffTracker provides a robust system for handling test retries
class BackoffTracker:
delay = 1
increment = 1
def handle_fail(self, retries, message) -> int:
assert retries < TOTAL_RETRIES, message
print(f"Retrying in {self.delay}s, {retries + 1}/{TOTAL_RETRIES}")
time.sleep(self.delay)
# Only increment after the first try
if retries == 0:
self.delay += self.increment
self.increment *= 2
return retries + 1
def protect(self, func):
def wrapper(*args, retries: int = 0, **kwargs):
try:
return func(*args, **kwargs)
except Exception as err:
retries = self.handle_fail(retries, err.args)
return wrapper(*args, retries=retries, **kwargs)
return wrapper
backoff = BackoffTracker()
@backoff.protect
def download_ca_certs(node, ca_domain):
node.succeed(f"curl https://{ca_domain}:15000/roots/0 > /tmp/ca.crt")
node.succeed(f"curl https://{ca_domain}:15000/intermediate-keys/0 >> /tmp/ca.crt")
@backoff.protect
def check_connection(node, domain, fail=False, minica=False):
cafile = "/tmp/ca.crt"
if minica:
cafile = "/var/lib/acme/.minica/cert.pem"
run(node,
f"openssl s_client -brief -CAfile {cafile}"
f" -verify 2 -verify_return_error -verify_hostname {domain}"
f" -servername {domain} -connect {domain}:443 < /dev/null",
fail=fail,
)