diff --git a/setup.py b/setup.py index 7b68b50..a69da28 100644 --- a/setup.py +++ b/setup.py @@ -27,7 +27,10 @@ url='https://www.qubes-os.org/', packages=setuptools.find_packages(include=("vmupdate", "vmupdate*")), entry_points={ - 'console_scripts': + 'console_scripts': [ 'qubes-vm-update = vmupdate.vmupdate:main', + 'qvm-template-upgrade = ' + 'vmupdate.template_upgrade:main', + ], }, ) \ No newline at end of file diff --git a/vmupdate/template_upgrade.py b/vmupdate/template_upgrade.py new file mode 100644 index 0000000..ee393cf --- /dev/null +++ b/vmupdate/template_upgrade.py @@ -0,0 +1,340 @@ +#!/usr/bin/python3 +""" +qvm-template-upgrade — perform an N -> N+1 distro version upgrade of a qube + +Workflow: + 1. Validate that --template names an existing TemplateVM or StandaloneVM. + 2. Read os-distribution / os-version from qvm-features. + 3. Compute the target version as os-version + 1 (N -> N+1 is the + only supported scope; multi-hop is rejected by construction). + 4. Clone the qube to a new name derived from the target version. + 5. Run the in-VM version-upgrade agent inside the clone + (reuses the vmupdate qrexec transport — currently stubbed). + 6. On success: update template metadata features on the clone. + 7. On failure: remove the half-upgraded clone unless --keep-on-failure. + +The original qube is never touched by this tool. AppVMs based on a source +template continue to use it until the user manually switches them and +uninstalls the old template. +""" +import logging +import sys +from datetime import datetime, timezone + +import qubesadmin +import qubesadmin.exc +import qubesadmin.tools + +from vmupdate.agent.source.common.exit_codes import EXIT + +LOG_PATH = '/var/log/qubes/qvm-template-upgrade.log' +LOG_FORMAT = '%(asctime)s %(levelname)s %(message)s' + +SUPPORTED_DISTROS = {'fedora', 'debian'} +SUPPORTED_CLASSES = {'TemplateVM', 'StandaloneVM'} + +DATE_FMT = '%Y-%m-%d %H:%M:%S' + + +class UpgradeError(Exception): + """Failure during the upgrade run itself.""" + + +class ValidationError(Exception): + """Invalid user input or unsupported source qube.""" + + +def compute_target_version(current): + """Return current + 1 as the target distro version. + + Non-integer versions are rejected here. + """ + try: + current_n = int(current) + except ValueError: + raise ValidationError( + f"Non-numeric distro version {current!r}; multi-component " + f"versions (e.g. Debian point releases) are not yet supported " + f"by this tool.") + return str(current_n + 1) + + +def derive_clone_name(source_name, current_version, target_version, override): + """Replace the version suffix in the source name with the target version. + + Examples: + fedora-41, 41 -> 42 => fedora-42 + + fedora-41-minimal, 41 -> 42 => fedora-42-minimal + """ + if override: + return override + if current_version not in source_name: + raise ValidationError( + f"Cannot derive new template name from {source_name!r}: it does " + f"not contain the current version {current_version!r}. Pass " + f"--new-name explicitly.") + # Replace only the last occurrence (e.g. fedora-41-extras-41 stays sane). + head, _, tail = source_name.rpartition(current_version) + return f"{head}{target_version}{tail}" + +# Argument parsing / logging + +def get_parser(): + parser = qubesadmin.tools.QubesArgumentParser( + prog='qvm-template-upgrade', + description='Upgrade a TemplateVM or StandaloneVM to the next distro ' + 'version.', + version='' + ) + parser.add_argument( + '--template', required=True, + help='Name of the source TemplateVM or StandaloneVM to upgrade.') + parser.add_argument( + '--new-name', + help='Name for the upgraded clone. Defaults to replacing the version ' + 'suffix in the source name (e.g. fedora-41 -> fedora-42).') + parser.add_argument( + '--keep-on-failure', action='store_true', + help='Preserve the half-upgraded clone if the upgrade fails. ' + 'By default the clone is removed and the original remains.') + parser.add_argument( + '--dry-run', action='store_true', + help='Validate inputs and print the planned actions; do not clone ' + 'or upgrade anything.') + parser.add_argument( + '--log', default='INFO', + help='Log level (default: INFO).') + return parser + + +def parse_args(argv=None, app=None): + parser = get_parser() + return parser, parser.parse_args(argv, app=app) + + +def setup_logging(level): + log = logging.getLogger('vm-template-upgrade') + log.setLevel(level) + # Don't let our messages also flow through the root logger. + log.propagate = False + # Idempotent: if main() is called more than once in the same process + # (embedded use, repeated CLI invocations in tests), skip re-adding + # handlers so output isn't duplicated. + if log.handlers: + return log + # Always log to stderr: so user sees progress even when the log file + # is unavailable (dev machine without /var/log/qubes, perms issues, etc.). + stderr = logging.StreamHandler(sys.stderr) + stderr.setFormatter(logging.Formatter('%(message)s')) + log.addHandler(stderr) + try: + handler = logging.FileHandler(LOG_PATH, encoding='utf-8') + handler.setFormatter(logging.Formatter(LOG_FORMAT)) + log.addHandler(handler) + except OSError as err: + log.warning("Could not open log file %s: %s", LOG_PATH, err) + return log + + +# Orchestrator + + +class TemplateUpgrader: + """Stateful orchestrator for one source qube upgrade.""" + + def __init__(self, app, args, log): + self.app = app + self.args = args + self.log = log + # Populated by validate(): + self.source_vm = None + self.distro = None + self.current_version = None + self.target_version = None + self.new_name = None + # Populated by clone(): + self.clone_vm = None + + # validation + + def validate(self): + """Run all pre-flight checks. Populates planning attributes. + + Raises ValidationError on any input/setup problem. After this call, + self.source_vm / distro / current_version / target_version / new_name + are all set and the upgrade can proceed (or be reported via + describe_plan() for --dry-run). + """ + self.source_vm = self._resolve_source_qube() + self.distro, self.current_version = self._detect_distro() + self.target_version = compute_target_version(self.current_version) + self.new_name = derive_clone_name( + self.source_vm.name, + self.current_version, + self.target_version, + self.args.new_name, + ) + if self.new_name in self.app.domains: + raise ValidationError( + f"Target name {self.new_name!r} already exists. Remove it " + f"first or pass a different --new-name.") + + def _resolve_source_qube(self): + try: + vm = self.app.domains[self.args.template] + except KeyError: + raise ValidationError(f"No such qube: {self.args.template}") + if vm.klass not in SUPPORTED_CLASSES: + raise ValidationError( + f"{vm.name} is a {vm.klass}; only TemplateVMs and " + f"StandaloneVMs can be upgraded with this tool.") + return vm + + def _detect_distro(self): + distro = self.source_vm.features.get('os-distribution') + distro_like = self.source_vm.features.get('os-distribution-like', '') + version = self.source_vm.features.get('os-version') + if not distro or not version: + raise ValidationError( + f"{self.source_vm.name} is missing os-distribution / " + f"os-version features. Start the qube once so the in-VM " + f"agent can report them, then retry.") + candidates = {distro.lower(), *distro_like.lower().split()} + supported = SUPPORTED_DISTROS & candidates + if not supported: + raise ValidationError( + f"Unsupported distro {distro!r}; only Fedora- and " + f"Debian-based qubes are supported for now.") + return sorted(supported)[0], version + + def describe_plan(self): + return (f"upgrade {self.source_vm.name} " + f"({self.distro} {self.current_version}) -> " + f"clone {self.new_name} " + f"({self.distro} {self.target_version})") + + # execution + + def clone(self): + """Clone the source qube. Populates self.clone_vm.""" + self.log.info("Cloning %s -> %s", self.source_vm.name, self.new_name) + self.clone_vm = self.app.clone_vm(self.source_vm, self.new_name) + + def run_agent(self): + """Run the in-VM upgrade agent inside the clone. + + STUB: replaced in a follow-up commit by a dispatch into a new + `version_upgrade(target_version)` method on the existing + vmupdate agent (vmupdate/agent/source/{dnf,apt}/), reused via the + qrexec transport in qube_connection.py. The VM-side agent must + re-detect or verify the distro from inside the qube before running + distro-specific upgrade commands. + """ + raise NotImplementedError( + f"version-upgrade agent is not implemented yet for " + f"{self.clone_vm.name} -> {self.target_version}") + + def finalize(self): + """Write post-upgrade qvm-features on the clone. + + TemplateVM: always set template-name (required so qvm-template + recognises the upgraded clone as managed) and refresh + template-installtime. + + StandaloneVM: rewrite an existing template-name from the old to + the new release (e.g. fedora-41 -> fedora-42), keeping the value + compatible with qui.utils.check_support()'s EOL_DATES lookup + (which strips only -minimal / -xfce, not -standalone or the qube + name). We do not invent a template-name for standalones that + never had one, and we leave one in place that doesn't carry the + current version (can't safely transform). + + Inherited EVR/buildtime features are intentionally left in place + because qvm-template.query_local() uses bracket access on them and + deleting would crash qvm-template list/info for this qube. + """ + self.log.info("Updating metadata on %s", self.clone_vm.name) + if self.clone_vm.klass == 'TemplateVM': + self.clone_vm.features['template-name'] = self.clone_vm.name + self.clone_vm.features['template-installtime'] = \ + datetime.now(tz=timezone.utc).strftime(DATE_FMT) + return + old = self.clone_vm.features.get('template-name') + if not old: + return + try: + new = derive_clone_name( + old, self.current_version, self.target_version, None) + except ValidationError: + # template-name doesn't carry the current version (custom + # value, manual edit) it is safer to leave it alone than to + # guess what the user intended. + self.log.info( + "Leaving standalone template-name=%r untouched " + "(no version substring to rewrite)", old) + return + self.clone_vm.features['template-name'] = new + + def rollback(self): + """Remove the half-upgraded clone, if any. Safe to call repeatedly.""" + if self.clone_vm is None: + return + self.log.warning("Removing failed clone %s", self.clone_vm.name) + try: + del self.app.domains[self.clone_vm.name] + except qubesadmin.exc.QubesException as err: + self.log.error("Could not remove failed clone %s: %s", + self.clone_vm.name, err) + + +# CLI entry point + +def main(argv=None, app=None): + parser, args = parse_args(argv, app) + log = setup_logging(args.log) + upgrader = TemplateUpgrader(args.app, args, log) + + try: + upgrader.validate() + except ValidationError as err: + parser.print_error(str(err)) + return EXIT.ERR_USAGE + + log.info("Plan: %s", upgrader.describe_plan()) + + if args.dry_run: + print(f"[dry-run] would clone {upgrader.source_vm.name} -> " + f"{upgrader.new_name} and upgrade {upgrader.distro} " + f"{upgrader.current_version} -> {upgrader.target_version}") + return EXIT.OK + + try: + upgrader.clone() + except qubesadmin.exc.QubesException as err: + print(f"error: clone failed: {err}", file=sys.stderr) + return EXIT.ERR + + try: + upgrader.run_agent() + upgrader.finalize() + except (UpgradeError, NotImplementedError, + qubesadmin.exc.QubesException) as err: + log.error("Upgrade failed: %s", err) + if not args.keep_on_failure: + upgrader.rollback() + else: + log.info("Leaving clone %s in place (--keep-on-failure).", + upgrader.clone_vm.name) + print(f"error: {err}", file=sys.stderr) + return EXIT.ERR + + label = 'template' if upgrader.clone_vm.klass == 'TemplateVM' \ + else 'standalone' + print(f"Upgrade complete. New {label}: {upgrader.clone_vm.name}") + print(f"Original qube {upgrader.source_vm.name} is untouched.") + return EXIT.OK + + +if __name__ == '__main__': # pragma: no cover + sys.exit(main()) diff --git a/vmupdate/tests/test_template_upgrade.py b/vmupdate/tests/test_template_upgrade.py new file mode 100644 index 0000000..68b8ae9 --- /dev/null +++ b/vmupdate/tests/test_template_upgrade.py @@ -0,0 +1,326 @@ +#!/usr/bin/python3 +# coding=utf-8 +import logging +from unittest.mock import MagicMock, Mock + +import pytest + +import qubesadmin.exc + +from vmupdate import template_upgrade +from vmupdate.agent.source.common.exit_codes import EXIT +from vmupdate.tests.conftest import TestApp as _TestApp +from vmupdate.tests.conftest import TestVM as _TestVM + +# Captured at import time, before the quiet_logging autouse fixture can +# replace it. Tests that need to exercise the real setup_logging restore +# this reference explicitly. +_REAL_SETUP_LOGGING = template_upgrade.setup_logging + + +class CloneApp(_TestApp): + def __init__(self): + super().__init__() + self.clone_calls = [] + + def clone_vm(self, source_vm, new_name): + self.clone_calls.append((source_vm.name, new_name)) + clone = _TestVM(new_name, self, klass=source_vm.klass) + clone.features.update(source_vm.features) + return clone + + +def add_template(app, name="fedora-41", **features): + vm = _TestVM(name, app, klass="TemplateVM") + vm.features.update({ + "os-distribution": "fedora", + "os-version": "41", + "template-name": name, + "template-epoch": "0", + "template-version": "41", + "template-release": "20250101", + "template-buildtime": "2025-01-01 00:00:00", + }) + vm.features.update(features) + return vm + + +def add_standalone(app, name="fedora-41-standalone", **features): + vm = _TestVM(name, app, klass="StandaloneVM") + vm.features.update({ + "os-distribution": "fedora", + "os-version": "41", + }) + vm.features.update(features) + return vm + + +@pytest.fixture(autouse=True) +def quiet_logging(monkeypatch): + monkeypatch.setattr(template_upgrade, "setup_logging", lambda *_: Mock()) + + +@pytest.mark.parametrize("scenario, expected", [ + ("missing-qube", "No such qube"), + ("non-template", "only TemplateVMs and StandaloneVMs"), + ("missing-os-version", "missing os-distribution / os-version"), + ("non-numeric-os-version", "Non-numeric distro version"), + ("unsupported-distro", "Unsupported distro"), +]) +def test_validation_errors(scenario, expected, capsys): + app = CloneApp() + template_name = "fedora-41" + if scenario == "non-template": + _TestVM(template_name, app, klass="AppVM", template=add_template(app)) + elif scenario == "missing-os-version": + add_template(app) + del app.domains[template_name].features["os-version"] + elif scenario == "non-numeric-os-version": + add_template(app, **{"os-version": "rawhide"}) + elif scenario == "unsupported-distro": + add_template(app, **{"os-distribution": "arch"}) + + retcode = template_upgrade.main(["--template", template_name], app) + + assert retcode == EXIT.ERR_USAGE + assert expected in capsys.readouterr().err + + +@pytest.mark.parametrize( + "source, current, target, override, expected", + [ + ("fedora-41", "41", "42", None, "fedora-42"), + ("debian-12", "12", "13", None, "debian-13"), + ("fedora-41-minimal", "41", "42", None, "fedora-42-minimal"), + ("custom", "41", "42", "my-template", "my-template"), + ], +) +def test_clone_name_derivation(source, current, target, override, expected): + assert template_upgrade.derive_clone_name( + source, current, target, override) == expected + + +def test_clone_name_derivation_requires_version_without_override(): + with pytest.raises(template_upgrade.ValidationError): + template_upgrade.derive_clone_name("custom", "41", "42", None) + + +def test_dry_run_does_not_mutate(capsys): + app = CloneApp() + vm = add_template(app, "ubuntu-22", + **{"os-distribution": "ubuntu", + "os-distribution-like": "debian", + "os-version": "22"}) + before = dict(vm.features) + + retcode = template_upgrade.main( + ["--template", "ubuntu-22", "--dry-run"], app) + + assert retcode == EXIT.OK + assert app.clone_calls == [] + assert vm.features == before + assert "would clone ubuntu-22 -> ubuntu-23" in capsys.readouterr().out + + +def test_success_applies_metadata(monkeypatch): + app = CloneApp() + add_template(app) + monkeypatch.setattr(template_upgrade.TemplateUpgrader, "run_agent", + lambda self: None) + + retcode = template_upgrade.main(["--template", "fedora-41"], app) + + assert retcode == EXIT.OK + clone = app.domains["fedora-42"] + assert clone.features["template-name"] == "fedora-42" + assert clone.features["template-installtime"] != \ + app.domains["fedora-41"].features.get("template-installtime") + assert clone.features["template-epoch"] == "0" + assert clone.features["template-version"] == "41" + assert clone.features["template-release"] == "20250101" + assert clone.features["template-buildtime"] == "2025-01-01 00:00:00" + assert clone.features["os-distribution"] == "fedora" + assert clone.features["os-version"] == "41" + + +def test_standalone_without_template_name_left_alone(monkeypatch): + """A standalone that never had template-name doesn't get one invented.""" + app = CloneApp() + add_standalone(app) + monkeypatch.setattr(template_upgrade.TemplateUpgrader, "run_agent", + lambda self: None) + + retcode = template_upgrade.main( + ["--template", "fedora-41-standalone"], app) + + assert retcode == EXIT.OK + clone = app.domains["fedora-42-standalone"] + assert clone.klass == "StandaloneVM" + assert "template-name" not in clone.features + assert "template-installtime" not in clone.features + + +def test_standalone_with_template_name_refreshed(monkeypatch): + """Refresh stale standalone template-name for updater EOL checks.""" + app = CloneApp() + add_standalone(app, **{"template-name": "fedora-41"}) + monkeypatch.setattr(template_upgrade.TemplateUpgrader, "run_agent", + lambda self: None) + + retcode = template_upgrade.main( + ["--template", "fedora-41-standalone"], app) + + assert retcode == EXIT.OK + clone = app.domains["fedora-42-standalone"] + # check_support() resolves this through EOL_DATES. + assert clone.features["template-name"] == "fedora-42" + # template-installtime is template-only; standalones don't get one. + assert "template-installtime" not in clone.features + + +def test_default_stub_fails_and_cleans_clone(capsys): + app = CloneApp() + add_template(app) + + retcode = template_upgrade.main(["--template", "fedora-41"], app) + + assert retcode == EXIT.ERR + assert "fedora-42" not in app.domains + assert "not implemented yet" in capsys.readouterr().err + + +@pytest.mark.parametrize("keep_on_failure, expect_clone_removed", [ + (False, True), + (True, False), +]) +def test_failure_cleanup(monkeypatch, keep_on_failure, expect_clone_removed): + app = CloneApp() + add_template(app) + + def fail_agent(self): + raise template_upgrade.UpgradeError("agent failed") + + monkeypatch.setattr(template_upgrade.TemplateUpgrader, "run_agent", + fail_agent) + args = ["--template", "fedora-41"] + if keep_on_failure: + args.append("--keep-on-failure") + + retcode = template_upgrade.main(args, app) + + assert retcode == EXIT.ERR + assert ("fedora-42" not in app.domains) is expect_clone_removed + + +def test_rejects_existing_clone_name(capsys): + """If the target clone name already exists, validation fails before + anything is mutated.""" + app = CloneApp() + add_template(app) + add_template(app, name="fedora-42", **{"os-version": "42"}) + + retcode = template_upgrade.main(["--template", "fedora-41"], app) + + assert retcode == EXIT.ERR_USAGE + assert "already exists" in capsys.readouterr().err + assert app.clone_calls == [] + + +def test_standalone_template_name_without_version_is_left_alone(monkeypatch): + """Standalone whose template-name doesn't carry the current version + (custom string, manual edit) is left untouched.""" + app = CloneApp() + add_standalone(app, **{"template-name": "my-custom-base"}) + monkeypatch.setattr(template_upgrade.TemplateUpgrader, "run_agent", + lambda self: None) + + retcode = template_upgrade.main( + ["--template", "fedora-41-standalone"], app) + + assert retcode == EXIT.OK + clone = app.domains["fedora-42-standalone"] + assert clone.features["template-name"] == "my-custom-base" + + +def test_main_clone_failure(monkeypatch, capsys): + """If the Admin-API clone call raises, main() reports it as a runtime + error (EXIT.ERR), not a usage error.""" + app = CloneApp() + add_template(app) + + def boom(*_a, **_kw): + raise qubesadmin.exc.QubesException("storage pool full") + + monkeypatch.setattr(app, "clone_vm", boom) + + retcode = template_upgrade.main(["--template", "fedora-41"], app) + + assert retcode == EXIT.ERR + assert "clone failed: storage pool full" in capsys.readouterr().err + + +def test_rollback_noop_when_no_clone(): + """rollback() before clone() ran is a safe no-op.""" + upgrader = template_upgrade.TemplateUpgrader(CloneApp(), Mock(), Mock()) + upgrader.rollback() # must not raise + + +def test_rollback_handles_delete_failure(): + """If the Admin-API delete raises, rollback logs and swallows; the + caller has already decided the upgrade has failed, so re-raising would + just mask the original error. + """ + # dict's __delitem__ is looked up on the type, not the instance, so we + # use a MagicMock for app.domains (which supports __delitem__ as a side + # effect) instead of trying to patch the test-helper Domains dict. + app = MagicMock() + app.domains.__delitem__.side_effect = \ + qubesadmin.exc.QubesException("VM is running") + upgrader = template_upgrade.TemplateUpgrader(app, Mock(), Mock()) + upgrader.clone_vm = Mock(name="fedora-42") + upgrader.clone_vm.name = "fedora-42" + + upgrader.rollback() # must not raise + + upgrader.log.error.assert_called_once() + + +def _reset_template_upgrade_logger(): + logger = logging.getLogger("vm-template-upgrade") + logger.handlers.clear() + logger.propagate = True + + +def test_setup_logging_is_idempotent(tmp_path, monkeypatch): + """Calling setup_logging twice must not duplicate handlers.""" + monkeypatch.setattr(template_upgrade, "setup_logging", + _REAL_SETUP_LOGGING) + monkeypatch.setattr(template_upgrade, "LOG_PATH", + str(tmp_path / "qvm-template-upgrade.log")) + _reset_template_upgrade_logger() + + log1 = template_upgrade.setup_logging("INFO") + handler_count = len(log1.handlers) + log2 = template_upgrade.setup_logging("INFO") + + assert log1 is log2 + assert len(log2.handlers) == handler_count + assert log2.propagate is False + + +def test_setup_logging_tolerates_missing_log_dir(tmp_path, monkeypatch): + """A missing log directory degrades to stderr-only, not a crash.""" + monkeypatch.setattr(template_upgrade, "setup_logging", + _REAL_SETUP_LOGGING) + monkeypatch.setattr(template_upgrade, "LOG_PATH", + str(tmp_path / "nope" / "qvm-template-upgrade.log")) + _reset_template_upgrade_logger() + + log = template_upgrade.setup_logging("INFO") + + # The file handler should have been skipped; stderr stays. + assert not any(isinstance(h, logging.FileHandler) + for h in log.handlers) + assert any(isinstance(h, logging.StreamHandler) and + not isinstance(h, logging.FileHandler) + for h in log.handlers)