nixos/systemd-boot: init boot counting

Update nixos/modules/system/boot/loader/systemd-boot/boot-counting.md

Co-authored-by: Moritz Sanft <58110325+msanft@users.noreply.github.com>
This commit is contained in:
Julien Malka 2024-03-03 01:25:36 +00:00 committed by Jörg Thalheim
parent 4a1ceb0d8c
commit 64edc7f00f
6 changed files with 381 additions and 72 deletions

View file

@ -12,8 +12,9 @@ import subprocess
import sys
import warnings
import json
from typing import NamedTuple, Any
from typing import NamedTuple, Any, Type
from dataclasses import dataclass
from pathlib import Path
# These values will be replaced with actual values during the package build
EFI_SYS_MOUNT_POINT = "@efiSysMountPoint@"
@ -32,6 +33,8 @@ CAN_TOUCH_EFI_VARIABLES = "@canTouchEfiVariables@"
GRACEFUL = "@graceful@"
COPY_EXTRA_FILES = "@copyExtraFiles@"
CHECK_MOUNTPOINTS = "@checkMountpoints@"
BOOT_COUNTING_TRIES = "@bootCountingTries@"
BOOT_COUNTING = "@bootCounting@" == "True"
@dataclass
class BootSpec:
@ -46,6 +49,104 @@ class BootSpec:
sortKey: str # noqa: N815
initrdSecrets: str | None = None # noqa: N815
@dataclass
class Entry:
profile: str | None
generation_number: int
specialisation: str | None
@classmethod
def from_path(cls: Type["Entry"], path: Path) -> "Entry":
filename = path.name
# Matching nixos-$profile-generation-*.conf
rex_profile = re.compile(r"^nixos-(.*)-generation-.*\.conf$")
# Matching nixos*-generation-$number*.conf
rex_generation = re.compile(r"^nixos.*-generation-([0-9]+).*\.conf$")
# Matching nixos*-generation-$number-specialisation-$specialisation_name*.conf
rex_specialisation = re.compile(r"^nixos.*-generation-([0-9]+)-specialisation-([a-zA-Z0-9]+).*\.conf$")
profile = rex_profile.sub(r"\1", filename) if rex_profile.match(filename) else None
specialisation = rex_specialisation.sub(r"\2", filename) if rex_specialisation.match(filename) else None
try:
generation_number = int(rex_generation.sub(r"\1", filename))
except ValueError:
raise
return cls(profile, generation_number, specialisation)
@dataclass
class DiskEntry:
entry: Entry
default: bool
counters: str | None
title: str | None
description: str | None
kernel: str
initrd: str
kernel_params: str | None
machine_id: str | None
sort_key: str
@classmethod
def from_path(cls: Type["DiskEntry"], path: Path) -> "DiskEntry":
entry = Entry.from_path(path)
data = path.read_text().splitlines()
if '' in data:
data.remove('')
entry_map = dict(lines.split(' ', 1) for lines in data)
assert "linux" in entry_map
assert "initrd" in entry_map
filename = path.name
# Matching nixos*-generation-*$counters.conf
rex_counters = re.compile(r"^nixos.*-generation-.*(\+\d(-\d)?)\.conf$")
counters = rex_counters.sub(r"\1", filename) if rex_counters.match(filename) else None
disk_entry = cls(
entry=entry,
default=(entry_map.get("sort-key") == "default"),
counters=counters,
title=entry_map.get("title"),
description=entry_map.get("version"),
kernel=entry_map["linux"],
initrd=entry_map["initrd"],
kernel_params=entry_map.get("options"),
machine_id=entry_map.get("machine-id"),
sort_key=entry_map.get("sort_key", "nixos"))
return disk_entry
def write(self, sorted_first: str) -> None:
# Compute a sort-key sorted before sorted_first
# This will compute something like: nixos -> nixor-default to make sure we come before other nixos entries,
# while allowing users users can pre-pend their own entries before.
default_sort_key = sorted_first[:-1] + chr(ord(sorted_first[-1])-1) + "-default"
tmp_path = self.path.with_suffix(".tmp")
with tmp_path.open('w') as f:
# We use "sort-key" to sort the default generation first.
# The "default" string is sorted before "non-default" (alphabetically)
boot_entry = [
f"title {self.title}" if self.title is not None else None,
f"version {self.description}" if self.description is not None else None,
f"linux {self.kernel}",
f"initrd {self.initrd}",
f"options {self.kernel_params}" if self.kernel_params is not None else None,
f"machine-id {self.machine_id}" if self.machine_id is not None else None,
f"sort-key {default_sort_key if self.default else self.sort_key}"
]
f.write("\n".join(filter(None, boot_entry)))
f.flush()
os.fsync(f.fileno())
tmp_path.rename(self.path)
@property
def path(self) -> Path:
pieces = [
"nixos",
self.entry.profile or None,
"generation",
str(self.entry.generation_number),
f"specialisation-{self.entry.specialisation}" if self.entry.specialisation else None,
]
prefix = "-".join(p for p in pieces if p)
return Path(f"{BOOT_MOUNT_POINT}/loader/entries/{prefix}{self.counters if self.counters else ''}.conf")
libc = ctypes.CDLL("libc.so.6")
@ -78,30 +179,14 @@ def system_dir(profile: str | None, generation: int, specialisation: str | None)
else:
return d
BOOT_ENTRY = """title {title}
sort-key {sort_key}
version Generation {generation} {description}
linux {kernel}
initrd {initrd}
options {kernel_params}
"""
def generation_conf_filename(profile: str | None, generation: int, specialisation: str | None) -> str:
pieces = [
"nixos",
profile or None,
"generation",
str(generation),
f"specialisation-{specialisation}" if specialisation else None,
]
return "-".join(p for p in pieces if p) + ".conf"
def write_loader_conf(profile: str | None, generation: int, specialisation: str | None) -> None:
with open(f"{LOADER_CONF}.tmp", 'w') as f:
def write_loader_conf(profile: str | None) -> None:
with open(f"{EFI_SYS_MOUNT_POINT}/loader/loader.conf.tmp", 'w') as f:
if TIMEOUT != "":
f.write(f"timeout {TIMEOUT}\n")
f.write("default %s\n" % generation_conf_filename(profile, generation, specialisation))
if profile:
f.write("default nixos-%s-generation-*\n" % profile)
else:
f.write("default nixos-generation-*\n")
if not EDITOR:
f.write("editor 0\n")
f.write(f"console-mode {CONSOLE_MODE}\n")
@ -109,6 +194,19 @@ def write_loader_conf(profile: str | None, generation: int, specialisation: str
os.fsync(f.fileno())
os.rename(f"{LOADER_CONF}.tmp", LOADER_CONF)
def scan_entries() -> list[DiskEntry]:
"""
Scan all entries in $ESP/loader/entries/*
Does not support Type 2 entries as we do not support them for now.
Returns a generator of Entry.
"""
entries = []
for path in Path(f"{EFI_SYS_MOUNT_POINT}/loader/entries/").glob("nixos*-generation-[1-9]*.conf"):
try:
entries.append(DiskEntry.from_path(path))
except ValueError:
continue
return entries
def get_bootspec(profile: str | None, generation: int) -> BootSpec:
system_directory = system_dir(profile, generation, None)
@ -151,8 +249,14 @@ def copy_from_file(file: str, dry_run: bool = False) -> str:
copy_if_not_exists(store_file_path, f"{BOOT_MOUNT_POINT}{efi_file_path}")
return efi_file_path
def write_entry(profile: str | None, generation: int, specialisation: str | None,
machine_id: str, bootspec: BootSpec, current: bool) -> None:
def write_entry(profile: str | None,
generation: int,
specialisation: str | None,
machine_id: str,
bootspec: BootSpec,
entries: list[DiskEntry],
sorted_first: str,
current: bool) -> None:
if specialisation:
bootspec = bootspec.specialisations[specialisation]
kernel = copy_from_file(bootspec.kernel)
@ -175,29 +279,32 @@ def write_entry(profile: str | None, generation: int, specialisation: str | None
f'for "{title} - Configuration {generation}", an older generation', file=sys.stderr)
print("note: this is normal after having removed "
"or renamed a file in `boot.initrd.secrets`", file=sys.stderr)
entry_file = f"{BOOT_MOUNT_POINT}/loader/entries/%s" % (
generation_conf_filename(profile, generation, specialisation))
tmp_path = "%s.tmp" % (entry_file)
kernel_params = "init=%s " % bootspec.init
kernel_params = kernel_params + " ".join(bootspec.kernelParams)
build_time = int(os.path.getctime(system_dir(profile, generation, specialisation)))
build_date = datetime.datetime.fromtimestamp(build_time).strftime('%F')
counters = f"+{BOOT_COUNTING_TRIES}" if BOOT_COUNTING else ""
entry = Entry(profile, generation, specialisation)
# We check if the entry we are writing is already on disk
# and we update its "default entry" status
for entry_on_disk in entries:
if entry == entry_on_disk.entry:
entry_on_disk.default = current
entry_on_disk.write(sorted_first)
return
with open(tmp_path, 'w') as f:
f.write(BOOT_ENTRY.format(title=title,
sort_key=bootspec.sortKey,
generation=generation,
kernel=kernel,
initrd=initrd,
kernel_params=kernel_params,
description=f"{bootspec.label}, built on {build_date}"))
if machine_id is not None:
f.write("machine-id %s\n" % machine_id)
f.flush()
os.fsync(f.fileno())
os.rename(tmp_path, entry_file)
DiskEntry(
entry=entry,
title=title,
kernel=kernel,
initrd=initrd,
counters=counters,
kernel_params=kernel_params,
machine_id=machine_id,
description=f"Generation {generation} {bootspec.label}, built on {build_date}",
sort_key=bootspec.sortKey,
default=current
).write(sorted_first)
def get_generations(profile: str | None = None) -> list[SystemIdentifier]:
gen_list = run(
@ -225,30 +332,19 @@ def get_generations(profile: str | None = None) -> list[SystemIdentifier]:
return configurations[-configurationLimit:]
def remove_old_entries(gens: list[SystemIdentifier]) -> None:
rex_profile = re.compile(r"^" + re.escape(BOOT_MOUNT_POINT) + r"/loader/entries/nixos-(.*)-generation-.*\.conf$")
rex_generation = re.compile(r"^" + re.escape(BOOT_MOUNT_POINT) + r"/loader/entries/nixos.*-generation-([0-9]+)(-specialisation-.*)?\.conf$")
def remove_old_entries(gens: list[SystemIdentifier], disk_entries: list[DiskEntry]) -> None:
known_paths = []
for gen in gens:
bootspec = get_bootspec(gen.profile, gen.generation)
known_paths.append(copy_from_file(bootspec.kernel, True))
known_paths.append(copy_from_file(bootspec.initrd, True))
for path in glob.iglob(f"{BOOT_MOUNT_POINT}/loader/entries/nixos*-generation-[1-9]*.conf"):
if rex_profile.match(path):
prof = rex_profile.sub(r"\1", path)
else:
prof = None
try:
gen_number = int(rex_generation.sub(r"\1", path))
except ValueError:
continue
if (prof, gen_number, None) not in gens:
os.unlink(path)
for path in glob.iglob(f"{BOOT_MOUNT_POINT}/{NIXOS_DIR}/*"):
for disk_entry in disk_entries:
if (disk_entry.entry.profile, disk_entry.entry.generation_number, None) not in gens:
os.unlink(disk_entry.path)
for path in glob.iglob(f"{EFI_SYS_MOUNT_POINT}/efi/nixos/*"):
if path not in known_paths and not os.path.isdir(path):
os.unlink(path)
def cleanup_esp() -> None:
for path in glob.iglob(f"{EFI_SYS_MOUNT_POINT}/loader/entries/nixos*"):
os.unlink(path)
@ -267,7 +363,7 @@ def get_profiles() -> list[str]:
def install_bootloader(args: argparse.Namespace) -> None:
try:
with open("/etc/machine-id") as machine_file:
machine_id = machine_file.readlines()[0]
machine_id = machine_file.readlines()[0].strip()
except IOError as e:
if e.errno != errno.ENOENT:
raise
@ -351,18 +447,32 @@ def install_bootloader(args: argparse.Namespace) -> None:
gens = get_generations()
for profile in get_profiles():
gens += get_generations(profile)
remove_old_entries(gens)
entries = scan_entries()
remove_old_entries(gens, entries)
# Compute the sort-key that will be sorted first.
sorted_first = ""
for gen in gens:
try:
bootspec = get_bootspec(gen.profile, gen.generation)
if bootspec.sortKey < sorted_first or sorted_first == "":
sorted_first = bootspec.sortKey
except OSError as e:
# See https://github.com/NixOS/nixpkgs/issues/114552
if e.errno == errno.EINVAL:
profile = f"profile '{gen.profile}'" if gen.profile else "default profile"
print("ignoring {} in the list of boot entries because of the following error:\n{}".format(profile, e), file=sys.stderr)
else:
raise e
for gen in gens:
try:
bootspec = get_bootspec(gen.profile, gen.generation)
is_default = os.path.dirname(bootspec.init) == args.default_config
write_entry(*gen, machine_id, bootspec, current=is_default)
write_entry(*gen, machine_id, bootspec, entries, sorted_first, current=is_default)
for specialisation in bootspec.specialisations.keys():
write_entry(gen.profile, gen.generation, specialisation, machine_id, bootspec, current=is_default)
write_entry(gen.profile, gen.generation, specialisation, machine_id, bootspec, entries, sorted_first, current=(is_default and bootspec.specialisations[specialisation].sortKey == bootspec.sortKey))
if is_default:
write_loader_conf(*gen)
write_loader_conf(gen.profile)
except OSError as e:
# See https://github.com/NixOS/nixpkgs/issues/114552
if e.errno == errno.EINVAL: