mirror of
https://github.com/NixOS/nixpkgs.git
synced 2025-06-12 12:45:27 +03:00
nixos/tests/acme: Refactor test suite
Split tests up based on certain use cases: - http01-builtin: Tests most functionality of the core module, such as the systemd and hashing components, whilst utilising lego's built in http01 resolution mechanis. - dns01: Tests only that DNS01 renewal works as expected. - nginx: Tests nginx compatability - httpd: Tests httpd compatability - caddy: Tests caddy compatability
This commit is contained in:
parent
84af416af6
commit
229640ed3a
19 changed files with 944 additions and 1061 deletions
166
nixos/tests/acme/python-utils.py
Normal file
166
nixos/tests/acme/python-utils.py
Normal file
|
@ -0,0 +1,166 @@
|
|||
#!/usr/bin/env python3
|
||||
import time
|
||||
|
||||
TOTAL_RETRIES = 20
|
||||
|
||||
|
||||
def run(node, cmd, fail=False):
|
||||
if fail:
|
||||
return node.fail(cmd)
|
||||
else:
|
||||
return node.succeed(cmd)
|
||||
|
||||
# Waits for the system to finish booting or switching configuration
|
||||
def wait_for_running(node):
|
||||
node.succeed("systemctl is-system-running --wait")
|
||||
|
||||
# On first switch, this will create a symlink to the current system so that we can
|
||||
# quickly switch between derivations
|
||||
def switch_to(node, name, fail=False) -> None:
|
||||
root_specs = "/tmp/specialisation"
|
||||
node.execute(
|
||||
f"test -e {root_specs}"
|
||||
f" || ln -s $(readlink /run/current-system)/specialisation {root_specs}"
|
||||
)
|
||||
|
||||
switcher_path = (
|
||||
f"/run/current-system/specialisation/{name}/bin/switch-to-configuration"
|
||||
)
|
||||
rc, _ = node.execute(f"test -e '{switcher_path}'")
|
||||
if rc > 0:
|
||||
switcher_path = f"/tmp/specialisation/{name}/bin/switch-to-configuration"
|
||||
|
||||
cmd = f"{switcher_path} test"
|
||||
run(node, cmd, fail=fail)
|
||||
if not fail:
|
||||
wait_for_running(node)
|
||||
|
||||
# Ensures the issuer of our cert matches the chain
|
||||
# and matches the issuer we expect it to be.
|
||||
# It's a good validation to ensure the cert.pem and fullchain.pem
|
||||
# are not still selfsigned after verification
|
||||
def check_issuer(node, cert_name, issuer) -> None:
|
||||
for fname in ("cert.pem", "fullchain.pem"):
|
||||
actual_issuer = node.succeed(
|
||||
f"openssl x509 -noout -issuer -in /var/lib/acme/{cert_name}/{fname}"
|
||||
).partition("=")[2]
|
||||
assert (
|
||||
issuer.lower() in actual_issuer.lower()
|
||||
), f"{fname} issuer mismatch. Expected {issuer} got {actual_issuer}"
|
||||
|
||||
# Ensures the provided domain matches with the given cert
|
||||
def check_domain(node, cert_name, domain, fail=False) -> None:
|
||||
cmd = f"openssl x509 -noout -checkhost '{domain}' -in /var/lib/acme/{cert_name}/cert.pem"
|
||||
run(node, cmd, fail=fail)
|
||||
|
||||
# Ensures the required values for OCSP stapling are present
|
||||
# Pebble doesn't provide a full OCSP responder, so just checks the URL
|
||||
def check_stapling(node, cert_name, ca_domain, fail=False):
|
||||
rc, _ = node.execute(
|
||||
f"openssl x509 -noout -ocsp_uri -in /var/lib/acme/{cert_name}/cert.pem"
|
||||
f" | grep -i 'http://{ca_domain}:4002' 2>&1",
|
||||
)
|
||||
assert rc == 0 or fail, "Failed to find OCSP URI in issued certificate"
|
||||
run(
|
||||
node,
|
||||
f"openssl x509 -noout -ext tlsfeature -in /var/lib/acme/{cert_name}/cert.pem"
|
||||
f" | grep -iv 'no extensions' 2>&1",
|
||||
fail=fail,
|
||||
)
|
||||
|
||||
# Checks the keyType by validating the number of bits
|
||||
def check_key_bits(node, cert_name, bits, fail=False):
|
||||
run(
|
||||
node,
|
||||
f"openssl x509 -noout -text -in /var/lib/acme/{cert_name}/cert.pem"
|
||||
f" | grep -i Public-Key | grep {bits} | tee /dev/stderr",
|
||||
fail=fail,
|
||||
)
|
||||
|
||||
# Ensure cert comes before chain in fullchain.pem
|
||||
def check_fullchain(node, cert_name):
|
||||
cert_file = f"/var/lib/acme/{cert_name}/fullchain.pem"
|
||||
num_certs = node.succeed(f"grep -o 'END CERTIFICATE' {cert_file}")
|
||||
assert len(num_certs.strip().split("\n")) > 1, "Insufficient certs in fullchain.pem"
|
||||
|
||||
first_cert_data = node.succeed(
|
||||
f"grep -m1 -B50 'END CERTIFICATE' {cert_file}"
|
||||
" | openssl x509 -noout -text"
|
||||
)
|
||||
for line in first_cert_data.lower().split("\n"):
|
||||
if "dns:" in line:
|
||||
print(f"First DNSName in fullchain.pem: {line}")
|
||||
assert cert_name.lower() in line, f"{cert_name} not found in {line}"
|
||||
return
|
||||
|
||||
assert False
|
||||
|
||||
# Checks the permissions in the cert directories are as expected
|
||||
def check_permissions(node, cert_name, group):
|
||||
stat = "stat -L -c '%a %U %G' "
|
||||
node.succeed(
|
||||
f"test $({stat} /var/lib/acme/{cert_name}/*.pem"
|
||||
f" | tee /dev/stderr | grep -v '640 acme {group}' | wc -l) -eq 0"
|
||||
)
|
||||
node.succeed(
|
||||
f"test $({stat} /var/lib/acme/.lego/{cert_name}/*/{cert_name}*"
|
||||
f" | tee /dev/stderr | grep -v '600 acme {group}' | wc -l) -eq 0"
|
||||
)
|
||||
node.succeed(
|
||||
f"test $({stat} /var/lib/acme/{cert_name}"
|
||||
f" | tee /dev/stderr | grep -v '750 acme {group}' | wc -l) -eq 0"
|
||||
)
|
||||
node.succeed(
|
||||
f"test $(find /var/lib/acme/.lego/accounts -type f -exec {stat} {{}} \\;"
|
||||
f" | tee /dev/stderr | grep -v '600 acme {group}' | wc -l) -eq 0"
|
||||
)
|
||||
|
||||
# BackoffTracker provides a robust system for handling test retries
|
||||
class BackoffTracker:
|
||||
delay = 1
|
||||
increment = 1
|
||||
|
||||
def handle_fail(self, retries, message) -> int:
|
||||
assert retries < TOTAL_RETRIES, message
|
||||
|
||||
print(f"Retrying in {self.delay}s, {retries + 1}/{TOTAL_RETRIES}")
|
||||
time.sleep(self.delay)
|
||||
|
||||
# Only increment after the first try
|
||||
if retries == 0:
|
||||
self.delay += self.increment
|
||||
self.increment *= 2
|
||||
|
||||
return retries + 1
|
||||
|
||||
def protect(self, func):
|
||||
def wrapper(*args, retries: int = 0, **kwargs):
|
||||
try:
|
||||
return func(*args, **kwargs)
|
||||
except Exception as err:
|
||||
retries = self.handle_fail(retries, err.args)
|
||||
return wrapper(*args, retries=retries, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
backoff = BackoffTracker()
|
||||
|
||||
|
||||
@backoff.protect
|
||||
def download_ca_certs(node, ca_domain):
|
||||
node.succeed(f"curl https://{ca_domain}:15000/roots/0 > /tmp/ca.crt")
|
||||
node.succeed(f"curl https://{ca_domain}:15000/intermediate-keys/0 >> /tmp/ca.crt")
|
||||
|
||||
|
||||
@backoff.protect
|
||||
def check_connection(node, domain, fail=False, minica=False):
|
||||
cafile = "/tmp/ca.crt"
|
||||
if minica:
|
||||
cafile = "/var/lib/acme/.minica/cert.pem"
|
||||
run(node,
|
||||
f"openssl s_client -brief -CAfile {cafile}"
|
||||
f" -verify 2 -verify_return_error -verify_hostname {domain}"
|
||||
f" -servername {domain} -connect {domain}:443 < /dev/null",
|
||||
fail=fail,
|
||||
)
|
Loading…
Add table
Add a link
Reference in a new issue