From c37a815d04220dbaa78fff28f142f0bb19de670f Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 28 Apr 2026 17:48:47 +0000 Subject: [PATCH 01/11] Initial plan From 3cad8798b3723e0774e63fd068d8255fdb37c1b0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 28 Apr 2026 17:59:49 +0000 Subject: [PATCH 02/11] Add workflow step catalog: StepRegistry, StepCatalog, CLI commands, and tests Agent-Logs-Url: https://github.com/github/spec-kit/sessions/2885e646-477d-4df8-b9a3-06d8cb29e748 Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/__init__.py | 422 +++++++++++++++++++++ src/specify_cli/workflows/__init__.py | 78 ++++ src/specify_cli/workflows/catalog.py | 503 +++++++++++++++++++++++++- tests/test_workflows.py | 349 ++++++++++++++++++ workflows/step-catalog.community.json | 6 + workflows/step-catalog.json | 6 + 6 files changed, 1363 insertions(+), 1 deletion(-) create mode 100644 workflows/step-catalog.community.json create mode 100644 workflows/step-catalog.json diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index d5f5aba2d5..8fecfb9a80 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4675,6 +4675,20 @@ def extension_set_priority( ) workflow_app.add_typer(workflow_catalog_app, name="catalog") +workflow_step_app = typer.Typer( + name="step", + help="Manage workflow step types", + add_completion=False, +) +workflow_app.add_typer(workflow_step_app, name="step") + +workflow_step_catalog_app = typer.Typer( + name="catalog", + help="Manage step catalogs", + add_completion=False, +) +workflow_step_app.add_typer(workflow_step_catalog_app, name="catalog") + @workflow_app.command("run") def workflow_run( @@ -5321,6 +5335,414 @@ def workflow_catalog_remove( console.print(f"[green]✓[/green] Catalog source '{removed_name}' removed") +# ===== Workflow Step Commands ===== + +@workflow_step_app.command("list") +def workflow_step_list(): + """List installed step types (built-in and custom).""" + from .workflows import STEP_REGISTRY + from .workflows.catalog import StepRegistry + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + + # Load custom steps if in a spec-kit project + custom_keys: set[str] = set() + if specify_dir.exists(): + from .workflows import load_custom_steps + loaded = load_custom_steps(project_root) + custom_keys.update(loaded) + # Also read registry for metadata + registry = StepRegistry(project_root) + installed = registry.list() + custom_keys.update(installed.keys()) + + console.print("\n[bold cyan]Installed Step Types:[/bold cyan]\n") + + built_in: list[str] = [] + custom: list[str] = [] + for key in sorted(STEP_REGISTRY.keys()): + if key in custom_keys: + custom.append(key) + else: + built_in.append(key) + + if built_in: + console.print(" [bold]Built-in:[/bold]") + for key in built_in: + console.print(f" • {key}") + console.print() + + if custom: + console.print(" [bold]Custom (installed):[/bold]") + if specify_dir.exists(): + registry = StepRegistry(project_root) + for key in custom: + meta = registry.get(key) or {} + name = meta.get("name", key) + version = meta.get("version", "?") + console.print(f" • [bold]{name}[/bold] ({key}) v{version}") + else: + for key in custom: + console.print(f" • {key}") + console.print() + + if not built_in and not custom: + console.print("[yellow]No step types found.[/yellow]") + + if specify_dir.exists(): + console.print( + " Install a new step type with: [cyan]specify workflow step add [/cyan]" + ) + + +@workflow_step_app.command("add") +def workflow_step_add( + step_id: str = typer.Argument(..., help="Step type ID from catalog"), +): + """Install a custom step type from the step catalog.""" + from .workflows.catalog import StepCatalog, StepCatalogError, StepRegistry + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + catalog = StepCatalog(project_root) + try: + info = catalog.get_step_info(step_id) + except StepCatalogError as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) + + if not info: + console.print(f"[red]Error:[/red] Step type '{step_id}' not found in catalog") + raise typer.Exit(1) + + if not info.get("_install_allowed", True): + console.print( + f"[yellow]Warning:[/yellow] Step type '{step_id}' is from a discovery-only catalog" + ) + console.print("Direct installation is not enabled for this catalog source.") + raise typer.Exit(1) + + step_yml_url = info.get("step_yml_url") or info.get("url") + if not step_yml_url: + console.print(f"[red]Error:[/red] Catalog entry for '{step_id}' has no URL") + raise typer.Exit(1) + + # Derive __init__.py URL: replace trailing step.yml with __init__.py + # or use explicit init_url if provided. + init_url = info.get("init_url") + if not init_url: + if step_yml_url.endswith("step.yml"): + init_url = step_yml_url[: -len("step.yml")] + "__init__.py" + else: + console.print( + f"[red]Error:[/red] Cannot derive __init__.py URL from '{step_yml_url}'. " + "Catalog entry should provide 'init_url' or a 'url' ending in 'step.yml'." + ) + raise typer.Exit(1) + + from urllib.parse import urlparse + from urllib.request import urlopen + + def _safe_fetch(url: str) -> bytes: + parsed = urlparse(url) + is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1") + if parsed.scheme != "https" and not (parsed.scheme == "http" and is_localhost): + raise ValueError(f"Refusing to fetch from non-HTTPS URL: {url}") + with urlopen(url, timeout=30) as resp: # noqa: S310 + final_url = resp.geturl() + final_parsed = urlparse(final_url) + final_is_localhost = final_parsed.hostname in ("localhost", "127.0.0.1", "::1") + if final_parsed.scheme != "https" and not ( + final_parsed.scheme == "http" and final_is_localhost + ): + raise ValueError(f"Redirect to non-HTTPS URL: {final_url}") + return resp.read() + + step_dir = project_root / ".specify" / "workflows" / "steps" / step_id + step_dir.mkdir(parents=True, exist_ok=True) + + try: + step_yml_content = _safe_fetch(step_yml_url) + init_py_content = _safe_fetch(init_url) + except Exception as exc: + import shutil + shutil.rmtree(step_dir, ignore_errors=True) + console.print(f"[red]Error:[/red] Failed to download step files: {exc}") + raise typer.Exit(1) + + # Validate step.yml + try: + import yaml as _yaml + + meta = _yaml.safe_load(step_yml_content.decode("utf-8")) or {} + except Exception as exc: + import shutil + shutil.rmtree(step_dir, ignore_errors=True) + console.print(f"[red]Error:[/red] Invalid step.yml: {exc}") + raise typer.Exit(1) + + step_meta = meta.get("step", {}) + type_key = step_meta.get("type_key", "") + if not type_key: + import shutil + shutil.rmtree(step_dir, ignore_errors=True) + console.print("[red]Error:[/red] step.yml missing 'step.type_key' field") + raise typer.Exit(1) + + if type_key != step_id: + import shutil + shutil.rmtree(step_dir, ignore_errors=True) + console.print( + f"[red]Error:[/red] step.yml type_key ({type_key!r}) does not match " + f"catalog ID ({step_id!r})" + ) + raise typer.Exit(1) + + # Write files + (step_dir / "step.yml").write_bytes(step_yml_content) + (step_dir / "__init__.py").write_bytes(init_py_content) + + # Register in step registry + registry = StepRegistry(project_root) + registry.add( + step_id, + { + "name": info.get("name", step_id), + "version": info.get("version", step_meta.get("version", "0.0.0")), + "description": info.get("description", step_meta.get("description", "")), + "author": info.get("author", step_meta.get("author", "")), + "source": "catalog", + "catalog_name": info.get("_catalog_name", ""), + "type_key": type_key, + }, + ) + + console.print( + f"[green]✓[/green] Step type '{info.get('name', step_id)}' ({step_id}) installed" + ) + console.print( + " Use [cyan]specify workflow step list[/cyan] to verify the installation." + ) + + +@workflow_step_app.command("remove") +def workflow_step_remove( + step_id: str = typer.Argument(..., help="Step type ID to uninstall"), +): + """Uninstall a custom step type.""" + from .workflows.catalog import StepRegistry + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + registry = StepRegistry(project_root) + if not registry.is_installed(step_id): + console.print(f"[red]Error:[/red] Step type '{step_id}' is not installed") + raise typer.Exit(1) + + step_dir = project_root / ".specify" / "workflows" / "steps" / step_id + if step_dir.exists(): + import shutil + shutil.rmtree(step_dir) + + registry.remove(step_id) + console.print(f"[green]✓[/green] Step type '{step_id}' uninstalled") + + +@workflow_step_app.command("search") +def workflow_step_search( + query: str | None = typer.Argument(None, help="Search query"), +): + """Search the step type catalog.""" + from .workflows.catalog import StepCatalog, StepCatalogError + + project_root = Path.cwd() + if not (project_root / ".specify").exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + catalog = StepCatalog(project_root) + + try: + results = catalog.search(query=query) + except StepCatalogError as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) + + if not results: + if query: + console.print(f"[yellow]No step types found matching '{query}'.[/yellow]") + else: + console.print("[yellow]No step types found in catalog.[/yellow]") + return + + console.print(f"\n[bold cyan]Step Types ({len(results)}):[/bold cyan]\n") + for step in results: + install_note = ( + "" if step.get("_install_allowed", True) else " [dim](discovery only)[/dim]" + ) + console.print( + f" [bold]{step.get('name', step.get('id', '?'))}[/bold]" + f" ({step.get('id', '?')}) v{step.get('version', '?')}{install_note}" + ) + desc = step.get("description", "") + if desc: + console.print(f" {desc}") + console.print() + + +@workflow_step_app.command("info") +def workflow_step_info( + step_id: str = typer.Argument(..., help="Step type ID"), +): + """Show details for a step type.""" + from .workflows import STEP_REGISTRY + from .workflows.catalog import StepCatalog, StepCatalogError, StepRegistry + + project_root = Path.cwd() + if not (project_root / ".specify").exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + # Load custom steps so registry is up to date + from .workflows import load_custom_steps + load_custom_steps(project_root) + + registry = StepRegistry(project_root) + installed_meta = registry.get(step_id) + + # Check if it's a built-in + builtin_step = STEP_REGISTRY.get(step_id) + is_builtin = builtin_step is not None and not installed_meta + + if is_builtin: + console.print(f"\n[bold cyan]{step_id}[/bold cyan] [dim](built-in)[/dim]") + console.print(f" Type key: {step_id}") + console.print(" [green]Built-in step type[/green]") + return + + if installed_meta: + console.print( + f"\n[bold cyan]{installed_meta.get('name', step_id)}[/bold cyan] ({step_id})" + ) + console.print(f" Version: {installed_meta.get('version', '?')}") + if installed_meta.get("author"): + console.print(f" Author: {installed_meta['author']}") + if installed_meta.get("description"): + console.print(f" Description: {installed_meta['description']}") + console.print(" [green]Installed[/green]") + return + + # Try catalog + catalog = StepCatalog(project_root) + try: + info = catalog.get_step_info(step_id) + except StepCatalogError: + info = None + + if info: + console.print( + f"\n[bold cyan]{info.get('name', step_id)}[/bold cyan] ({step_id})" + ) + console.print(f" Version: {info.get('version', '?')}") + if info.get("author"): + console.print(f" Author: {info['author']}") + if info.get("description"): + console.print(f" Description: {info['description']}") + console.print(" [yellow]Not installed[/yellow]") + console.print( + f"\n Install with: [cyan]specify workflow step add {step_id}[/cyan]" + ) + else: + console.print(f"[red]Error:[/red] Step type '{step_id}' not found") + raise typer.Exit(1) + + +@workflow_step_catalog_app.command("list") +def workflow_step_catalog_list(): + """List configured step catalog sources.""" + from .workflows.catalog import StepCatalog, StepCatalogError + + project_root = Path.cwd() + catalog = StepCatalog(project_root) + + try: + configs = catalog.get_catalog_configs() + except StepCatalogError as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) + + console.print("\n[bold cyan]Step Catalog Sources:[/bold cyan]\n") + for i, cfg in enumerate(configs): + install_status = ( + "[green]install allowed[/green]" + if cfg["install_allowed"] + else "[yellow]discovery only[/yellow]" + ) + console.print(f" [{i}] [bold]{cfg['name']}[/bold] — {install_status}") + console.print(f" {cfg['url']}") + if cfg.get("description"): + console.print(f" [dim]{cfg['description']}[/dim]") + console.print() + + +@workflow_step_catalog_app.command("add") +def workflow_step_catalog_add( + url: str = typer.Argument(..., help="Catalog URL to add"), + name: str = typer.Option(None, "--name", help="Catalog name"), +): + """Add a step catalog source.""" + from .workflows.catalog import StepCatalog, StepValidationError + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + catalog = StepCatalog(project_root) + try: + catalog.add_catalog(url, name) + except StepValidationError as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) + + console.print(f"[green]✓[/green] Step catalog source added: {url}") + + +@workflow_step_catalog_app.command("remove") +def workflow_step_catalog_remove( + index: int = typer.Argument( + ..., help="Catalog index to remove (from 'step catalog list')" + ), +): + """Remove a step catalog source by index.""" + from .workflows.catalog import StepCatalog, StepValidationError + + project_root = Path.cwd() + specify_dir = project_root / ".specify" + if not specify_dir.exists(): + console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") + raise typer.Exit(1) + + catalog = StepCatalog(project_root) + try: + removed_name = catalog.remove_catalog(index) + except StepValidationError as exc: + console.print(f"[red]Error:[/red] {exc}") + raise typer.Exit(1) + + console.print(f"[green]✓[/green] Step catalog source '{removed_name}' removed") + + def main(): app() diff --git a/src/specify_cli/workflows/__init__.py b/src/specify_cli/workflows/__init__.py index 13782f620b..77a73dd16e 100644 --- a/src/specify_cli/workflows/__init__.py +++ b/src/specify_cli/workflows/__init__.py @@ -7,10 +7,12 @@ - ``STEP_REGISTRY`` — maps ``type_key`` to ``StepBase`` subclass instances. - ``WorkflowEngine`` — orchestrator that loads, validates, and executes workflow YAML definitions. +- ``load_custom_steps`` — loads community-installed step types into STEP_REGISTRY. """ from __future__ import annotations +from pathlib import Path from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -66,3 +68,79 @@ def _register_builtin_steps() -> None: _register_builtin_steps() + + +def load_custom_steps(project_root: Path) -> list[str]: + """Load community-installed custom step types into STEP_REGISTRY. + + Scans ``.specify/workflows/steps/`` for installed step packages. + Each valid package must contain ``step.yml`` (with a ``step.type_key`` + field) and ``__init__.py`` (a ``StepBase`` subclass). + + Returns a list of type_keys that were successfully loaded. + Silently skips packages that fail to import or validate. + """ + import importlib.util as _importlib_util + + steps_dir = Path(project_root) / ".specify" / "workflows" / "steps" + if not steps_dir.is_dir(): + return [] + + loaded: list[str] = [] + for step_dir in steps_dir.iterdir(): + if not step_dir.is_dir(): + continue + step_yml = step_dir / "step.yml" + init_py = step_dir / "__init__.py" + if not step_yml.exists() or not init_py.exists(): + continue + + try: + import yaml as _yaml + + meta = _yaml.safe_load(step_yml.read_text(encoding="utf-8")) or {} + step_meta = meta.get("step", {}) + type_key = step_meta.get("type_key", "") + if not type_key: + continue + + # Skip if already registered (e.g. built-in or previously loaded) + if type_key in STEP_REGISTRY: + continue + + spec = _importlib_util.spec_from_file_location( + f"_speckit_custom_step_{type_key}", init_py + ) + if spec is None or spec.loader is None: + continue + module = _importlib_util.module_from_spec(spec) + spec.loader.exec_module(module) # type: ignore[union-attr] + + # Find the StepBase subclass in the module + from .base import StepBase as _StepBase + + step_class = None + for attr_name in dir(module): + attr = getattr(module, attr_name) + try: + if ( + isinstance(attr, type) + and issubclass(attr, _StepBase) + and attr is not _StepBase + and getattr(attr, "type_key", "") == type_key + ): + step_class = attr + break + except TypeError: + continue + + if step_class is None: + continue + + _register_step(step_class()) + loaded.append(type_key) + except Exception: # noqa: BLE001 + # Silently skip broken step packages at load time + continue + + return loaded diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index da5c60b5c8..77f0f25200 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -1,9 +1,10 @@ -"""Workflow catalog — discovery, install, and management of workflows. +"""Workflow catalog — discovery, install, and management of workflows and step types. Mirrors the existing extension/preset catalog pattern with: - Multi-catalog stack (env var → project → user → built-in) - SHA256-hashed per-URL caching with 1-hour TTL - Workflow registry for installed workflow tracking +- Step registry for installed custom step type tracking - Search across all configured catalog sources """ @@ -538,3 +539,503 @@ def remove_catalog(self, index: int) -> str: if isinstance(removed, dict): return removed.get("name", f"catalog-{index + 1}") return f"catalog-{index + 1}" + + +# --------------------------------------------------------------------------- +# Step catalog errors +# --------------------------------------------------------------------------- + + +class StepCatalogError(Exception): + """Base error for step catalog operations.""" + + +class StepValidationError(StepCatalogError): + """Validation error for step catalog config or step data.""" + + +# --------------------------------------------------------------------------- +# StepCatalogEntry +# --------------------------------------------------------------------------- + + +@dataclass +class StepCatalogEntry: + """Represents a single step catalog source in the catalog stack.""" + + url: str + name: str + priority: int + install_allowed: bool + description: str = "" + + +# --------------------------------------------------------------------------- +# StepRegistry +# --------------------------------------------------------------------------- + + +class StepRegistry: + """Manages the registry of installed custom step types. + + Tracks installed step types and their metadata in + ``.specify/workflows/steps/step-registry.json``. + """ + + REGISTRY_FILE = "step-registry.json" + SCHEMA_VERSION = "1.0" + + def __init__(self, project_root: Path) -> None: + self.project_root = project_root + self.steps_dir = project_root / ".specify" / "workflows" / "steps" + self.registry_path = self.steps_dir / self.REGISTRY_FILE + self.data = self._load() + + def _load(self) -> dict[str, Any]: + """Load registry from disk or create default.""" + if self.registry_path.exists(): + try: + with open(self.registry_path, encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, ValueError): + return {"schema_version": self.SCHEMA_VERSION, "steps": {}} + return {"schema_version": self.SCHEMA_VERSION, "steps": {}} + + def save(self) -> None: + """Persist registry to disk.""" + self.steps_dir.mkdir(parents=True, exist_ok=True) + with open(self.registry_path, "w", encoding="utf-8") as f: + json.dump(self.data, f, indent=2) + + def add(self, step_id: str, metadata: dict[str, Any]) -> None: + """Add or update an installed step entry.""" + from datetime import datetime, timezone + + existing = self.data["steps"].get(step_id, {}) + metadata["installed_at"] = existing.get( + "installed_at", datetime.now(timezone.utc).isoformat() + ) + metadata["updated_at"] = datetime.now(timezone.utc).isoformat() + self.data["steps"][step_id] = metadata + self.save() + + def remove(self, step_id: str) -> bool: + """Remove an installed step entry. Returns True if found.""" + if step_id in self.data["steps"]: + del self.data["steps"][step_id] + self.save() + return True + return False + + def get(self, step_id: str) -> dict[str, Any] | None: + """Get metadata for an installed step.""" + return self.data["steps"].get(step_id) + + def list(self) -> dict[str, dict[str, Any]]: + """Return all installed steps.""" + return dict(self.data["steps"]) + + def is_installed(self, step_id: str) -> bool: + """Check if a step is installed.""" + return step_id in self.data["steps"] + + +# --------------------------------------------------------------------------- +# StepCatalog +# --------------------------------------------------------------------------- + + +class StepCatalog: + """Manages step catalog fetching, caching, and searching. + + Resolution order for catalog sources: + 1. ``SPECKIT_STEP_CATALOG_URL`` env var (overrides all) + 2. Project-level ``.specify/step-catalogs.yml`` + 3. User-level ``~/.specify/step-catalogs.yml`` + 4. Built-in defaults (official + community) + """ + + DEFAULT_CATALOG_URL = ( + "https://raw.githubusercontent.com/github/spec-kit/main/" + "workflows/step-catalog.json" + ) + COMMUNITY_CATALOG_URL = ( + "https://raw.githubusercontent.com/github/spec-kit/main/" + "workflows/step-catalog.community.json" + ) + CACHE_DURATION = 3600 # 1 hour + + def __init__(self, project_root: Path) -> None: + self.project_root = project_root + self.steps_dir = project_root / ".specify" / "workflows" / "steps" + self.cache_dir = self.steps_dir / ".cache" + + # -- Catalog resolution ----------------------------------------------- + + def _validate_catalog_url(self, url: str) -> None: + """Validate that a catalog URL uses HTTPS (localhost HTTP allowed).""" + from urllib.parse import urlparse + + parsed = urlparse(url) + is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1") + if parsed.scheme != "https" and not ( + parsed.scheme == "http" and is_localhost + ): + raise StepValidationError( + f"Catalog URL must use HTTPS (got {parsed.scheme}://). " + "HTTP is only allowed for localhost." + ) + if not parsed.netloc: + raise StepValidationError( + "Catalog URL must be a valid URL with a host." + ) + + def _load_catalog_config( + self, config_path: Path + ) -> list[StepCatalogEntry] | None: + """Load catalog stack configuration from a YAML file.""" + if not config_path.exists(): + return None + try: + data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} + except (yaml.YAMLError, OSError, UnicodeError) as exc: + raise StepValidationError( + f"Failed to read catalog config {config_path}: {exc}" + ) + catalogs_data = data.get("catalogs", []) + if not catalogs_data: + return None + if not isinstance(catalogs_data, list): + raise StepValidationError( + f"Invalid catalog config: 'catalogs' must be a list, " + f"got {type(catalogs_data).__name__}" + ) + + entries: list[StepCatalogEntry] = [] + for idx, item in enumerate(catalogs_data): + if not isinstance(item, dict): + raise StepValidationError( + f"Invalid catalog entry at index {idx}: " + f"expected a mapping, got {type(item).__name__}" + ) + url = str(item.get("url", "")).strip() + if not url: + continue + self._validate_catalog_url(url) + try: + priority = int(item.get("priority", idx + 1)) + except (TypeError, ValueError): + raise StepValidationError( + f"Invalid priority for catalog " + f"'{item.get('name', idx + 1)}': " + f"expected integer, got {item.get('priority')!r}" + ) + raw_install = item.get("install_allowed", False) + if isinstance(raw_install, str): + install_allowed = raw_install.strip().lower() in ( + "true", + "yes", + "1", + ) + else: + install_allowed = bool(raw_install) + entries.append( + StepCatalogEntry( + url=url, + name=str(item.get("name", f"catalog-{idx + 1}")), + priority=priority, + install_allowed=install_allowed, + description=str(item.get("description", "")), + ) + ) + entries.sort(key=lambda e: e.priority) + if not entries: + raise StepValidationError( + f"Catalog config {config_path} contains {len(catalogs_data)} " + f"entries but none have valid URLs." + ) + return entries + + def get_active_catalogs(self) -> list[StepCatalogEntry]: + """Get the ordered list of active step catalogs.""" + # 1. Environment variable override + env_url = os.environ.get("SPECKIT_STEP_CATALOG_URL", "").strip() + if env_url: + self._validate_catalog_url(env_url) + return [ + StepCatalogEntry( + url=env_url, + name="env-override", + priority=1, + install_allowed=True, + description="From SPECKIT_STEP_CATALOG_URL", + ) + ] + + # 2. Project-level config + project_config = self.project_root / ".specify" / "step-catalogs.yml" + project_entries = self._load_catalog_config(project_config) + if project_entries is not None: + return project_entries + + # 3. User-level config + home = Path.home() + user_config = home / ".specify" / "step-catalogs.yml" + user_entries = self._load_catalog_config(user_config) + if user_entries is not None: + return user_entries + + # 4. Built-in defaults + return [ + StepCatalogEntry( + url=self.DEFAULT_CATALOG_URL, + name="default", + priority=1, + install_allowed=True, + description="Official step types", + ), + StepCatalogEntry( + url=self.COMMUNITY_CATALOG_URL, + name="community", + priority=2, + install_allowed=False, + description="Community-contributed step types (discovery only)", + ), + ] + + # -- Caching ---------------------------------------------------------- + + def _get_cache_paths(self, url: str) -> tuple[Path, Path]: + """Get cache file paths for a URL (hash-based).""" + url_hash = hashlib.sha256(url.encode()).hexdigest()[:16] + cache_file = self.cache_dir / f"step-catalog-{url_hash}.json" + meta_file = self.cache_dir / f"step-catalog-{url_hash}-meta.json" + return cache_file, meta_file + + def _is_url_cache_valid(self, url: str) -> bool: + """Check if cached data for a URL is still fresh.""" + _, meta_file = self._get_cache_paths(url) + if not meta_file.exists(): + return False + try: + with open(meta_file, encoding="utf-8") as f: + meta = json.load(f) + fetched_at = meta.get("fetched_at", 0) + return (time.time() - fetched_at) < self.CACHE_DURATION + except (json.JSONDecodeError, OSError): + return False + + def _fetch_single_catalog( + self, entry: StepCatalogEntry, force_refresh: bool = False + ) -> dict[str, Any]: + """Fetch a single catalog, using cache when possible.""" + cache_file, meta_file = self._get_cache_paths(entry.url) + + if not force_refresh and self._is_url_cache_valid(entry.url): + try: + with open(cache_file, encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, OSError): + pass + + from urllib.parse import urlparse + from urllib.request import urlopen + + def _validate_url(url: str) -> None: + parsed = urlparse(url) + is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1") + if parsed.scheme != "https" and not ( + parsed.scheme == "http" and is_localhost + ): + raise StepCatalogError( + f"Refusing to fetch catalog from non-HTTPS URL: {url}" + ) + + _validate_url(entry.url) + + try: + with urlopen(entry.url, timeout=30) as resp: # noqa: S310 + _validate_url(resp.geturl()) + data = json.loads(resp.read().decode("utf-8")) + except Exception as exc: + if cache_file.exists(): + try: + with open(cache_file, encoding="utf-8") as f: + return json.load(f) + except (json.JSONDecodeError, ValueError, OSError): + pass + raise StepCatalogError( + f"Failed to fetch catalog from {entry.url}: {exc}" + ) from exc + + if not isinstance(data, dict): + raise StepCatalogError( + f"Catalog from {entry.url} is not a valid JSON object." + ) + + self.cache_dir.mkdir(parents=True, exist_ok=True) + with open(cache_file, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + with open(meta_file, "w", encoding="utf-8") as f: + json.dump({"url": entry.url, "fetched_at": time.time()}, f) + + return data + + def _get_merged_steps( + self, force_refresh: bool = False + ) -> dict[str, dict[str, Any]]: + """Merge steps from all active catalogs (lower priority number wins).""" + catalogs = self.get_active_catalogs() + merged: dict[str, dict[str, Any]] = {} + fetch_errors = 0 + + for entry in reversed(catalogs): + try: + data = self._fetch_single_catalog(entry, force_refresh) + except StepCatalogError: + fetch_errors += 1 + continue + steps = data.get("steps", {}) + if isinstance(steps, dict): + for step_id, step_data in steps.items(): + if not isinstance(step_data, dict): + continue + step_data["_catalog_name"] = entry.name + step_data["_install_allowed"] = entry.install_allowed + merged[step_id] = step_data + elif isinstance(steps, list): + for step_data in steps: + if not isinstance(step_data, dict): + continue + step_id = step_data.get("id", "") + if step_id: + step_data["_catalog_name"] = entry.name + step_data["_install_allowed"] = entry.install_allowed + merged[step_id] = step_data + if fetch_errors == len(catalogs) and catalogs: + raise StepCatalogError("All configured step catalogs failed to fetch.") + return merged + + # -- Public API ------------------------------------------------------- + + def search( + self, + query: str | None = None, + ) -> list[dict[str, Any]]: + """Search step types across all configured catalogs.""" + merged = self._get_merged_steps() + results: list[dict[str, Any]] = [] + + for step_id, step_data in merged.items(): + step_data.setdefault("id", step_id) + if query: + q = query.lower() + searchable = " ".join( + [ + step_data.get("name", ""), + step_data.get("description", ""), + step_data.get("id", ""), + ] + ).lower() + if q not in searchable: + continue + results.append(step_data) + return results + + def get_step_info(self, step_id: str) -> dict[str, Any] | None: + """Get details for a specific step from the catalog.""" + merged = self._get_merged_steps() + step = merged.get(step_id) + if step: + step.setdefault("id", step_id) + return step + + def get_catalog_configs(self) -> list[dict[str, Any]]: + """Return current catalog configuration as a list of dicts.""" + entries = self.get_active_catalogs() + return [ + { + "name": e.name, + "url": e.url, + "priority": e.priority, + "install_allowed": e.install_allowed, + "description": e.description, + } + for e in entries + ] + + def add_catalog(self, url: str, name: str | None = None) -> None: + """Add a catalog source to the project-level config.""" + self._validate_catalog_url(url) + config_path = self.project_root / ".specify" / "step-catalogs.yml" + + data: dict[str, Any] = {"catalogs": []} + if config_path.exists(): + raw = yaml.safe_load(config_path.read_text(encoding="utf-8")) + if not isinstance(raw, dict): + raise StepValidationError( + "Catalog config file is corrupted (expected a mapping)." + ) + data = raw + + catalogs = data.get("catalogs", []) + if not isinstance(catalogs, list): + raise StepValidationError( + "Catalog config 'catalogs' must be a list." + ) + for cat in catalogs: + if isinstance(cat, dict) and cat.get("url") == url: + raise StepValidationError( + f"Catalog URL already configured: {url}" + ) + + max_priority = max( + (cat.get("priority", 0) for cat in catalogs if isinstance(cat, dict)), + default=0, + ) + catalogs.append( + { + "name": name or f"catalog-{len(catalogs) + 1}", + "url": url, + "priority": max_priority + 1, + "install_allowed": True, + "description": "", + } + ) + data["catalogs"] = catalogs + + config_path.parent.mkdir(parents=True, exist_ok=True) + with open(config_path, "w", encoding="utf-8") as f: + yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True) + + def remove_catalog(self, index: int) -> str: + """Remove a catalog source by index (0-based). Returns the removed name.""" + config_path = self.project_root / ".specify" / "step-catalogs.yml" + if not config_path.exists(): + raise StepValidationError("No step catalog config file found.") + + data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} + if not isinstance(data, dict): + raise StepValidationError( + "Catalog config file is corrupted (expected a mapping)." + ) + catalogs = data.get("catalogs", []) + if not isinstance(catalogs, list): + raise StepValidationError( + "Catalog config 'catalogs' must be a list." + ) + + if index < 0 or index >= len(catalogs): + raise StepValidationError( + f"Catalog index {index} out of range (0-{len(catalogs) - 1})." + ) + + removed = catalogs.pop(index) + data["catalogs"] = catalogs + + with open(config_path, "w", encoding="utf-8") as f: + yaml.dump(data, f, default_flow_style=False, sort_keys=False, allow_unicode=True) + + if isinstance(removed, dict): + return removed.get("name", f"catalog-{index + 1}") + return f"catalog-{index + 1}" diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 4c042fc7d5..4e55138e0f 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -1843,3 +1843,352 @@ def test_switch_workflow(self, project_dir): assert state.status == RunStatus.COMPLETED assert "do-plan" in state.step_results assert "do-specify" not in state.step_results + + +# ===== Step Registry Tests ===== + +class TestStepRegistryCustom: + """Test StepRegistry operations for custom step types.""" + + def test_add_and_get(self, project_dir): + from specify_cli.workflows.catalog import StepRegistry + + registry = StepRegistry(project_dir) + registry.add("deploy", {"name": "Deploy", "version": "1.0.0", "type_key": "deploy"}) + + entry = registry.get("deploy") + assert entry is not None + assert entry["name"] == "Deploy" + assert "installed_at" in entry + + def test_remove(self, project_dir): + from specify_cli.workflows.catalog import StepRegistry + + registry = StepRegistry(project_dir) + registry.add("deploy", {"name": "Deploy", "type_key": "deploy"}) + assert registry.is_installed("deploy") + + registry.remove("deploy") + assert not registry.is_installed("deploy") + + def test_remove_missing_returns_false(self, project_dir): + from specify_cli.workflows.catalog import StepRegistry + + registry = StepRegistry(project_dir) + assert registry.remove("nonexistent") is False + + def test_list(self, project_dir): + from specify_cli.workflows.catalog import StepRegistry + + registry = StepRegistry(project_dir) + registry.add("step-a", {"name": "A", "type_key": "step-a"}) + registry.add("step-b", {"name": "B", "type_key": "step-b"}) + + installed = registry.list() + assert "step-a" in installed + assert "step-b" in installed + + def test_is_installed(self, project_dir): + from specify_cli.workflows.catalog import StepRegistry + + registry = StepRegistry(project_dir) + assert not registry.is_installed("missing") + + registry.add("exists", {"name": "Exists", "type_key": "exists"}) + assert registry.is_installed("exists") + + def test_persistence(self, project_dir): + from specify_cli.workflows.catalog import StepRegistry + + registry1 = StepRegistry(project_dir) + registry1.add("deploy", {"name": "Deploy", "type_key": "deploy"}) + + registry2 = StepRegistry(project_dir) + assert registry2.is_installed("deploy") + + def test_corrupted_registry_resets(self, project_dir): + from specify_cli.workflows.catalog import StepRegistry + + registry = StepRegistry(project_dir) + registry.steps_dir.mkdir(parents=True, exist_ok=True) + registry.registry_path.write_text("not json", encoding="utf-8") + + # Loading again should reset + registry2 = StepRegistry(project_dir) + assert registry2.list() == {} + + +# ===== Step Catalog Tests ===== + +class TestStepCatalog: + """Test StepCatalog catalog resolution.""" + + def test_default_catalogs(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog + + catalog = StepCatalog(project_dir) + entries = catalog.get_active_catalogs() + assert len(entries) == 2 + assert entries[0].name == "default" + assert entries[1].name == "community" + + def test_env_var_override(self, project_dir, monkeypatch): + from specify_cli.workflows.catalog import StepCatalog + + monkeypatch.setenv("SPECKIT_STEP_CATALOG_URL", "https://example.com/step-catalog.json") + catalog = StepCatalog(project_dir) + entries = catalog.get_active_catalogs() + assert len(entries) == 1 + assert entries[0].name == "env-override" + assert entries[0].url == "https://example.com/step-catalog.json" + + def test_project_level_config(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog + + config_path = project_dir / ".specify" / "step-catalogs.yml" + config_path.write_text(yaml.dump({ + "catalogs": [{ + "name": "custom", + "url": "https://example.com/step-catalog.json", + "priority": 1, + "install_allowed": True, + }] + })) + + catalog = StepCatalog(project_dir) + entries = catalog.get_active_catalogs() + assert len(entries) == 1 + assert entries[0].name == "custom" + + def test_validate_url_http_rejected(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog, StepValidationError + + catalog = StepCatalog(project_dir) + with pytest.raises(StepValidationError, match="HTTPS"): + catalog._validate_catalog_url("http://evil.com/step-catalog.json") + + def test_validate_url_localhost_http_allowed(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog + + catalog = StepCatalog(project_dir) + # Should not raise + catalog._validate_catalog_url("http://localhost:8080/step-catalog.json") + + def test_add_catalog(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog + + catalog = StepCatalog(project_dir) + catalog.add_catalog("https://example.com/new-steps.json", "my-steps") + + config_path = project_dir / ".specify" / "step-catalogs.yml" + assert config_path.exists() + data = yaml.safe_load(config_path.read_text()) + assert len(data["catalogs"]) == 1 + assert data["catalogs"][0]["url"] == "https://example.com/new-steps.json" + + def test_add_catalog_duplicate_rejected(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog, StepValidationError + + catalog = StepCatalog(project_dir) + catalog.add_catalog("https://example.com/steps.json") + + with pytest.raises(StepValidationError, match="already configured"): + catalog.add_catalog("https://example.com/steps.json") + + def test_remove_catalog(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog + + catalog = StepCatalog(project_dir) + catalog.add_catalog("https://example.com/s1.json", "first") + catalog.add_catalog("https://example.com/s2.json", "second") + + removed = catalog.remove_catalog(0) + assert removed == "first" + + config_path = project_dir / ".specify" / "step-catalogs.yml" + data = yaml.safe_load(config_path.read_text()) + assert len(data["catalogs"]) == 1 + + def test_remove_catalog_invalid_index(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog, StepValidationError + + catalog = StepCatalog(project_dir) + catalog.add_catalog("https://example.com/s1.json") + + with pytest.raises(StepValidationError, match="out of range"): + catalog.remove_catalog(5) + + def test_remove_catalog_no_config(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog, StepValidationError + + catalog = StepCatalog(project_dir) + with pytest.raises(StepValidationError, match="No step catalog config file found"): + catalog.remove_catalog(0) + + def test_get_catalog_configs(self, project_dir): + from specify_cli.workflows.catalog import StepCatalog + + catalog = StepCatalog(project_dir) + configs = catalog.get_catalog_configs() + assert len(configs) == 2 + assert configs[0]["name"] == "default" + assert isinstance(configs[0]["install_allowed"], bool) + + def test_search_with_mock_catalog(self, project_dir, monkeypatch): + from specify_cli.workflows.catalog import StepCatalog + + mock_data = { + "schema_version": "1.0", + "steps": { + "deploy": { + "id": "deploy", + "name": "Deploy Step", + "description": "Deploy to production", + "version": "1.0.0", + }, + "notify": { + "id": "notify", + "name": "Notify Step", + "description": "Send notifications", + "version": "1.0.0", + }, + }, + } + + catalog = StepCatalog(project_dir) + monkeypatch.setattr(catalog, "_get_merged_steps", lambda **kw: { + "deploy": dict(mock_data["steps"]["deploy"], _catalog_name="test", _install_allowed=True), + "notify": dict(mock_data["steps"]["notify"], _catalog_name="test", _install_allowed=True), + }) + + results = catalog.search() + assert len(results) == 2 + + results = catalog.search(query="deploy") + assert len(results) == 1 + assert results[0]["id"] == "deploy" + + def test_get_step_info_with_mock_catalog(self, project_dir, monkeypatch): + from specify_cli.workflows.catalog import StepCatalog + + catalog = StepCatalog(project_dir) + monkeypatch.setattr(catalog, "_get_merged_steps", lambda **kw: { + "deploy": { + "id": "deploy", + "name": "Deploy Step", + "version": "1.0.0", + "_catalog_name": "test", + "_install_allowed": True, + }, + }) + + info = catalog.get_step_info("deploy") + assert info is not None + assert info["name"] == "Deploy Step" + + missing = catalog.get_step_info("nonexistent") + assert missing is None + + +# ===== Load Custom Steps Tests ===== + +class TestLoadCustomSteps: + """Test dynamic loading of custom step types from the filesystem.""" + + def test_empty_steps_dir(self, project_dir): + from specify_cli.workflows import load_custom_steps + + loaded = load_custom_steps(project_dir) + assert loaded == [] + + def test_no_steps_dir(self, project_dir): + from specify_cli.workflows import load_custom_steps + + # .specify/workflows/steps does not exist + loaded = load_custom_steps(project_dir) + assert loaded == [] + + def test_load_valid_custom_step(self, project_dir): + from specify_cli.workflows import load_custom_steps, STEP_REGISTRY + + step_dir = project_dir / ".specify" / "workflows" / "steps" / "test-custom" + step_dir.mkdir(parents=True) + + step_yml = """ +schema_version: "1.0" +step: + type_key: "test-custom" + name: "Test Custom Step" + version: "1.0.0" + author: "test" + description: "A test custom step" +""" + (step_dir / "step.yml").write_text(step_yml, encoding="utf-8") + + init_py = """ +from specify_cli.workflows.base import StepBase, StepResult + +class TestCustomStep(StepBase): + type_key = "test-custom" + + def execute(self, config, context): + return StepResult() +""" + (step_dir / "__init__.py").write_text(init_py, encoding="utf-8") + + loaded = load_custom_steps(project_dir) + assert "test-custom" in loaded + assert "test-custom" in STEP_REGISTRY + + def test_skip_missing_step_yml(self, project_dir): + from specify_cli.workflows import load_custom_steps + + step_dir = project_dir / ".specify" / "workflows" / "steps" / "bad-step" + step_dir.mkdir(parents=True) + (step_dir / "__init__.py").write_text("# no step.yml", encoding="utf-8") + + loaded = load_custom_steps(project_dir) + assert "bad-step" not in loaded + + def test_skip_missing_init_py(self, project_dir): + from specify_cli.workflows import load_custom_steps + + step_dir = project_dir / ".specify" / "workflows" / "steps" / "bad-step2" + step_dir.mkdir(parents=True) + (step_dir / "step.yml").write_text( + "step:\n type_key: bad-step2\n", encoding="utf-8" + ) + + loaded = load_custom_steps(project_dir) + assert "bad-step2" not in loaded + + def test_skip_already_registered(self, project_dir): + from specify_cli.workflows import load_custom_steps + + # "command" is already registered as a built-in step + step_dir = project_dir / ".specify" / "workflows" / "steps" / "command" + step_dir.mkdir(parents=True) + (step_dir / "step.yml").write_text( + "step:\n type_key: command\n", encoding="utf-8" + ) + (step_dir / "__init__.py").write_text("", encoding="utf-8") + + # Should not raise KeyError; just skip + loaded = load_custom_steps(project_dir) + assert "command" not in loaded + + def test_skip_broken_init_py(self, project_dir): + from specify_cli.workflows import load_custom_steps + + step_dir = project_dir / ".specify" / "workflows" / "steps" / "broken-step" + step_dir.mkdir(parents=True) + (step_dir / "step.yml").write_text( + "step:\n type_key: broken-step\n", encoding="utf-8" + ) + (step_dir / "__init__.py").write_text( + "raise RuntimeError('broken')", encoding="utf-8" + ) + + # Should not propagate exception + loaded = load_custom_steps(project_dir) + assert "broken-step" not in loaded diff --git a/workflows/step-catalog.community.json b/workflows/step-catalog.community.json new file mode 100644 index 0000000000..50a45049f1 --- /dev/null +++ b/workflows/step-catalog.community.json @@ -0,0 +1,6 @@ +{ + "schema_version": "1.0", + "updated_at": "2026-04-28T00:00:00Z", + "catalog_url": "https://raw.githubusercontent.com/github/spec-kit/main/workflows/step-catalog.community.json", + "steps": {} +} diff --git a/workflows/step-catalog.json b/workflows/step-catalog.json new file mode 100644 index 0000000000..e109bf4edd --- /dev/null +++ b/workflows/step-catalog.json @@ -0,0 +1,6 @@ +{ + "schema_version": "1.0", + "updated_at": "2026-04-28T00:00:00Z", + "catalog_url": "https://raw.githubusercontent.com/github/spec-kit/main/workflows/step-catalog.json", + "steps": {} +} From de5fe2815ca95f162b1222de38f65fc339eb3c61 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Tue, 28 Apr 2026 14:18:38 -0500 Subject: [PATCH 03/11] Potential fix for pull request finding 'An assert statement has a side-effect' Co-authored-by: Copilot Autofix powered by AI <223894421+github-code-quality[bot]@users.noreply.github.com> --- tests/test_workflows.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 4e55138e0f..4910521baf 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -1875,7 +1875,8 @@ def test_remove_missing_returns_false(self, project_dir): from specify_cli.workflows.catalog import StepRegistry registry = StepRegistry(project_dir) - assert registry.remove("nonexistent") is False + removed = registry.remove("nonexistent") + assert removed is False def test_list(self, project_dir): from specify_cli.workflows.catalog import StepRegistry From f47f695e4b1ddc324829f3b36091dad06afba677 Mon Sep 17 00:00:00 2001 From: Manfred Riem <15701806+mnriem@users.noreply.github.com> Date: Wed, 6 May 2026 14:18:39 -0500 Subject: [PATCH 04/11] Address PR review: path traversal, cache robustness, collision check, failed-to-load display - Add resolve()+relative_to() path traversal guards in workflow_step_add and workflow_step_remove to prevent directory escape via step_id - Harden _is_url_cache_valid in both StepCatalog and WorkflowCatalog to coerce fetched_at to float and catch TypeError/ValueError - Check STEP_REGISTRY and StepRegistry before installing to prevent collisions with built-in step types or already-installed steps - Show 'Custom (installed, failed to load)' section in workflow step list for steps in the registry that failed to load into STEP_REGISTRY --- src/specify_cli/__init__.py | 51 ++++++++++++++++++++++++++-- src/specify_cli/workflows/catalog.py | 8 ++--- 2 files changed, 53 insertions(+), 6 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 8fecfb9a80..6f469eef5b 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -5387,6 +5387,23 @@ def workflow_step_list(): console.print(f" • {key}") console.print() + # Show custom steps that are installed but failed to load + if specify_dir.exists(): + registry = StepRegistry(project_root) + installed = registry.list() + failed = [ + k for k in sorted(installed.keys()) + if k not in STEP_REGISTRY + ] + if failed: + console.print(" [bold yellow]Custom (installed, failed to load):[/bold yellow]") + for key in failed: + meta = installed.get(key, {}) + name = meta.get("name", key) + version = meta.get("version", "?") + console.print(f" • [dim]{name}[/dim] ({key}) v{version}") + console.print() + if not built_in and not custom: console.print("[yellow]No step types found.[/yellow]") @@ -5427,6 +5444,24 @@ def workflow_step_add( console.print("Direct installation is not enabled for this catalog source.") raise typer.Exit(1) + # Reject step IDs that collide with built-in step types + from .workflows import STEP_REGISTRY as _step_reg + if step_id in _step_reg: + console.print( + f"[red]Error:[/red] Step type '{step_id}' conflicts with a built-in step type" + ) + raise typer.Exit(1) + + # Reject if already installed + registry = StepRegistry(project_root) + if registry.is_installed(step_id): + console.print( + f"[red]Error:[/red] Step type '{step_id}' is already installed. " + "Remove it first with: [cyan]specify workflow step remove " + f"{step_id}[/cyan]" + ) + raise typer.Exit(1) + step_yml_url = info.get("step_yml_url") or info.get("url") if not step_yml_url: console.print(f"[red]Error:[/red] Catalog entry for '{step_id}' has no URL") @@ -5463,7 +5498,13 @@ def _safe_fetch(url: str) -> bytes: raise ValueError(f"Redirect to non-HTTPS URL: {final_url}") return resp.read() - step_dir = project_root / ".specify" / "workflows" / "steps" / step_id + steps_base_dir = (project_root / ".specify" / "workflows" / "steps").resolve() + step_dir = (steps_base_dir / step_id).resolve() + try: + step_dir.relative_to(steps_base_dir) + except ValueError: + console.print(f"[red]Error:[/red] Invalid step id '{step_id}'") + raise typer.Exit(1) step_dir.mkdir(parents=True, exist_ok=True) try: @@ -5548,7 +5589,13 @@ def workflow_step_remove( console.print(f"[red]Error:[/red] Step type '{step_id}' is not installed") raise typer.Exit(1) - step_dir = project_root / ".specify" / "workflows" / "steps" / step_id + steps_base_dir = (project_root / ".specify" / "workflows" / "steps").resolve() + step_dir = (steps_base_dir / step_id).resolve() + try: + step_dir.relative_to(steps_base_dir) + except ValueError: + console.print(f"[red]Error:[/red] Invalid step id '{step_id}'") + raise typer.Exit(1) if step_dir.exists(): import shutil shutil.rmtree(step_dir) diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index 77f0f25200..1ebb52fdec 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -303,9 +303,9 @@ def _is_url_cache_valid(self, url: str) -> bool: try: with open(meta_file, encoding="utf-8") as f: meta = json.load(f) - fetched_at = meta.get("fetched_at", 0) + fetched_at = float(meta.get("fetched_at", 0)) return (time.time() - fetched_at) < self.CACHE_DURATION - except (json.JSONDecodeError, OSError): + except (json.JSONDecodeError, OSError, TypeError, ValueError): return False def _fetch_single_catalog( @@ -820,9 +820,9 @@ def _is_url_cache_valid(self, url: str) -> bool: try: with open(meta_file, encoding="utf-8") as f: meta = json.load(f) - fetched_at = meta.get("fetched_at", 0) + fetched_at = float(meta.get("fetched_at", 0)) return (time.time() - fetched_at) < self.CACHE_DURATION - except (json.JSONDecodeError, OSError): + except (json.JSONDecodeError, OSError, TypeError, ValueError): return False def _fetch_single_catalog( From e311afbc0deefacd663e64ebae0443796dcaeb4d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 6 May 2026 22:46:26 +0000 Subject: [PATCH 05/11] Fix StepRegistry shape validation and StepCatalog empty-YAML handling Agent-Logs-Url: https://github.com/github/spec-kit/sessions/0dca6393-f5a9-40de-bb5c-77ba6af033d2 Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/workflows/catalog.py | 17 ++++++--- tests/test_workflows.py | 54 ++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 5 deletions(-) diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index 1ebb52fdec..c282d98b0c 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -593,13 +593,20 @@ def __init__(self, project_root: Path) -> None: def _load(self) -> dict[str, Any]: """Load registry from disk or create default.""" + _default: dict[str, Any] = {"schema_version": self.SCHEMA_VERSION, "steps": {}} if self.registry_path.exists(): try: with open(self.registry_path, encoding="utf-8") as f: - return json.load(f) - except (json.JSONDecodeError, ValueError): - return {"schema_version": self.SCHEMA_VERSION, "steps": {}} - return {"schema_version": self.SCHEMA_VERSION, "steps": {}} + data = json.load(f) + # Validate shape: must be a dict with a dict "steps" field + if not isinstance(data, dict): + return _default + if not isinstance(data.get("steps"), dict): + data["steps"] = {} + return data + except (json.JSONDecodeError, ValueError, OSError, UnicodeError): + return _default + return _default def save(self) -> None: """Persist registry to disk.""" @@ -971,7 +978,7 @@ def add_catalog(self, url: str, name: str | None = None) -> None: data: dict[str, Any] = {"catalogs": []} if config_path.exists(): - raw = yaml.safe_load(config_path.read_text(encoding="utf-8")) + raw = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} if not isinstance(raw, dict): raise StepValidationError( "Catalog config file is corrupted (expected a mapping)." diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 4910521baf..9da3204326 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -1918,6 +1918,45 @@ def test_corrupted_registry_resets(self, project_dir): registry2 = StepRegistry(project_dir) assert registry2.list() == {} + def test_registry_missing_steps_key_resets(self, project_dir): + """Valid JSON but missing 'steps' key should not crash add/get.""" + from specify_cli.workflows.catalog import StepRegistry + import json as _json + + registry = StepRegistry(project_dir) + registry.steps_dir.mkdir(parents=True, exist_ok=True) + # Valid JSON but 'steps' is not a dict + registry.registry_path.write_text( + _json.dumps({"schema_version": "1.0", "steps": "bad"}), + encoding="utf-8", + ) + + registry2 = StepRegistry(project_dir) + # Should be safe to call add/get without KeyError + assert registry2.list() == {} + registry2.add("deploy", {"name": "Deploy", "type_key": "deploy"}) + assert registry2.is_installed("deploy") + + def test_registry_unreadable_file_resets(self, project_dir): + """OSError reading the registry file should fall back to default.""" + from specify_cli.workflows.catalog import StepRegistry + import json as _json + + registry = StepRegistry(project_dir) + registry.steps_dir.mkdir(parents=True, exist_ok=True) + # Write valid registry first + registry.registry_path.write_text( + _json.dumps({"schema_version": "1.0", "steps": {"existing": {}}}), + encoding="utf-8", + ) + # Make it unreadable + registry.registry_path.chmod(0o000) + try: + registry2 = StepRegistry(project_dir) + assert registry2.list() == {} + finally: + registry.registry_path.chmod(0o644) + # ===== Step Catalog Tests ===== @@ -1987,6 +2026,21 @@ def test_add_catalog(self, project_dir): assert len(data["catalogs"]) == 1 assert data["catalogs"][0]["url"] == "https://example.com/new-steps.json" + def test_add_catalog_empty_yaml_file(self, project_dir): + """An empty YAML config file should be treated as empty, not corrupted.""" + from specify_cli.workflows.catalog import StepCatalog + + config_path = project_dir / ".specify" / "step-catalogs.yml" + config_path.write_text("", encoding="utf-8") + + catalog = StepCatalog(project_dir) + # Should not raise StepValidationError "corrupted" + catalog.add_catalog("https://example.com/steps.json", "my-steps") + + data = yaml.safe_load(config_path.read_text()) + assert len(data["catalogs"]) == 1 + assert data["catalogs"][0]["url"] == "https://example.com/steps.json" + def test_add_catalog_duplicate_rejected(self, project_dir): from specify_cli.workflows.catalog import StepCatalog, StepValidationError From df061e02086871d331dda05036caa8e58787f88a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 6 May 2026 22:47:37 +0000 Subject: [PATCH 06/11] Polish: rename _default to default_registry, strengthen unreadable-file test Agent-Logs-Url: https://github.com/github/spec-kit/sessions/0dca6393-f5a9-40de-bb5c-77ba6af033d2 Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/workflows/catalog.py | 8 ++++---- tests/test_workflows.py | 4 ++++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index c282d98b0c..8f97350ef1 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -593,20 +593,20 @@ def __init__(self, project_root: Path) -> None: def _load(self) -> dict[str, Any]: """Load registry from disk or create default.""" - _default: dict[str, Any] = {"schema_version": self.SCHEMA_VERSION, "steps": {}} + default_registry: dict[str, Any] = {"schema_version": self.SCHEMA_VERSION, "steps": {}} if self.registry_path.exists(): try: with open(self.registry_path, encoding="utf-8") as f: data = json.load(f) # Validate shape: must be a dict with a dict "steps" field if not isinstance(data, dict): - return _default + return default_registry if not isinstance(data.get("steps"), dict): data["steps"] = {} return data except (json.JSONDecodeError, ValueError, OSError, UnicodeError): - return _default - return _default + return default_registry + return default_registry def save(self) -> None: """Persist registry to disk.""" diff --git a/tests/test_workflows.py b/tests/test_workflows.py index 9da3204326..e41198459c 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -1957,6 +1957,10 @@ def test_registry_unreadable_file_resets(self, project_dir): finally: registry.registry_path.chmod(0o644) + # After restoring permissions the registry is fully functional + registry2.add("deploy", {"name": "Deploy", "type_key": "deploy"}) + assert registry2.is_installed("deploy") + # ===== Step Catalog Tests ===== From 3e305202827ec4b8bf26f8b9bbf1e2461de82fc5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 6 May 2026 23:03:49 +0000 Subject: [PATCH 07/11] Address PR review: atomic install, hostname validation, cache resilience, no dynamic imports in list/info Agent-Logs-Url: https://github.com/github/spec-kit/sessions/3e18fef0-e2e6-4b3e-9e8d-9adb1e5e464e Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/__init__.py | 135 +++++++++++---------------- src/specify_cli/workflows/catalog.py | 28 +++--- tests/test_workflows.py | 2 + 3 files changed, 71 insertions(+), 94 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 6f469eef5b..751d41c4ce 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -5346,65 +5346,31 @@ def workflow_step_list(): project_root = Path.cwd() specify_dir = project_root / ".specify" - # Load custom steps if in a spec-kit project - custom_keys: set[str] = set() + # Read installed custom steps from registry only — no dynamic imports + installed: dict = {} if specify_dir.exists(): - from .workflows import load_custom_steps - loaded = load_custom_steps(project_root) - custom_keys.update(loaded) - # Also read registry for metadata registry = StepRegistry(project_root) installed = registry.list() - custom_keys.update(installed.keys()) console.print("\n[bold cyan]Installed Step Types:[/bold cyan]\n") - built_in: list[str] = [] - custom: list[str] = [] - for key in sorted(STEP_REGISTRY.keys()): - if key in custom_keys: - custom.append(key) - else: - built_in.append(key) - + built_in = sorted(k for k in STEP_REGISTRY if k not in installed) if built_in: console.print(" [bold]Built-in:[/bold]") for key in built_in: console.print(f" • {key}") console.print() - if custom: + if installed: console.print(" [bold]Custom (installed):[/bold]") - if specify_dir.exists(): - registry = StepRegistry(project_root) - for key in custom: - meta = registry.get(key) or {} - name = meta.get("name", key) - version = meta.get("version", "?") - console.print(f" • [bold]{name}[/bold] ({key}) v{version}") - else: - for key in custom: - console.print(f" • {key}") + for key in sorted(installed): + meta = installed[key] or {} + name = meta.get("name", key) + version = meta.get("version", "?") + console.print(f" • [bold]{name}[/bold] ({key}) v{version}") console.print() - # Show custom steps that are installed but failed to load - if specify_dir.exists(): - registry = StepRegistry(project_root) - installed = registry.list() - failed = [ - k for k in sorted(installed.keys()) - if k not in STEP_REGISTRY - ] - if failed: - console.print(" [bold yellow]Custom (installed, failed to load):[/bold yellow]") - for key in failed: - meta = installed.get(key, {}) - name = meta.get("name", key) - version = meta.get("version", "?") - console.print(f" • [dim]{name}[/dim] ({key}) v{version}") - console.print() - - if not built_in and not custom: + if not built_in and not installed: console.print("[yellow]No step types found.[/yellow]") if specify_dir.exists(): @@ -5505,48 +5471,55 @@ def _safe_fetch(url: str) -> bytes: except ValueError: console.print(f"[red]Error:[/red] Invalid step id '{step_id}'") raise typer.Exit(1) - step_dir.mkdir(parents=True, exist_ok=True) - try: - step_yml_content = _safe_fetch(step_yml_url) - init_py_content = _safe_fetch(init_url) - except Exception as exc: - import shutil - shutil.rmtree(step_dir, ignore_errors=True) - console.print(f"[red]Error:[/red] Failed to download step files: {exc}") - raise typer.Exit(1) + import shutil + import tempfile - # Validate step.yml + # Download and validate in a temp directory first; only move to the final + # location on success so a transient failure can never corrupt or delete a + # pre-existing directory at step_dir. + tmp_path = Path(tempfile.mkdtemp(prefix="speckit_step_")) + _install_success = False try: - import yaml as _yaml + try: + step_yml_content = _safe_fetch(step_yml_url) + init_py_content = _safe_fetch(init_url) + except Exception as exc: + console.print(f"[red]Error:[/red] Failed to download step files: {exc}") + raise typer.Exit(1) - meta = _yaml.safe_load(step_yml_content.decode("utf-8")) or {} - except Exception as exc: - import shutil - shutil.rmtree(step_dir, ignore_errors=True) - console.print(f"[red]Error:[/red] Invalid step.yml: {exc}") - raise typer.Exit(1) + # Validate step.yml + try: + import yaml as _yaml - step_meta = meta.get("step", {}) - type_key = step_meta.get("type_key", "") - if not type_key: - import shutil - shutil.rmtree(step_dir, ignore_errors=True) - console.print("[red]Error:[/red] step.yml missing 'step.type_key' field") - raise typer.Exit(1) + meta = _yaml.safe_load(step_yml_content.decode("utf-8")) or {} + except Exception as exc: + console.print(f"[red]Error:[/red] Invalid step.yml: {exc}") + raise typer.Exit(1) - if type_key != step_id: - import shutil - shutil.rmtree(step_dir, ignore_errors=True) - console.print( - f"[red]Error:[/red] step.yml type_key ({type_key!r}) does not match " - f"catalog ID ({step_id!r})" - ) - raise typer.Exit(1) + step_meta = meta.get("step", {}) + type_key = step_meta.get("type_key", "") + if not type_key: + console.print("[red]Error:[/red] step.yml missing 'step.type_key' field") + raise typer.Exit(1) - # Write files - (step_dir / "step.yml").write_bytes(step_yml_content) - (step_dir / "__init__.py").write_bytes(init_py_content) + if type_key != step_id: + console.print( + f"[red]Error:[/red] step.yml type_key ({type_key!r}) does not match " + f"catalog ID ({step_id!r})" + ) + raise typer.Exit(1) + + # Write validated files to temp location, then move atomically + (tmp_path / "step.yml").write_bytes(step_yml_content) + (tmp_path / "__init__.py").write_bytes(init_py_content) + + steps_base_dir.mkdir(parents=True, exist_ok=True) + shutil.move(str(tmp_path), str(step_dir)) + _install_success = True + finally: + if not _install_success: + shutil.rmtree(tmp_path, ignore_errors=True) # Register in step registry registry = StepRegistry(project_root) @@ -5659,10 +5632,6 @@ def workflow_step_info( console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") raise typer.Exit(1) - # Load custom steps so registry is up to date - from .workflows import load_custom_steps - load_custom_steps(project_root) - registry = StepRegistry(project_root) installed_meta = registry.get(step_id) diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index 8f97350ef1..79ca9d1ad2 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -359,11 +359,14 @@ def _validate_catalog_url(url: str) -> None: ) # Write cache - self.cache_dir.mkdir(parents=True, exist_ok=True) - with open(cache_file, "w", encoding="utf-8") as f: - json.dump(data, f, indent=2) - with open(meta_file, "w", encoding="utf-8") as f: - json.dump({"url": entry.url, "fetched_at": time.time()}, f) + try: + self.cache_dir.mkdir(parents=True, exist_ok=True) + with open(cache_file, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + with open(meta_file, "w", encoding="utf-8") as f: + json.dump({"url": entry.url, "fetched_at": time.time()}, f) + except OSError: + pass # Proceed without caching if disk write fails return data @@ -692,7 +695,7 @@ def _validate_catalog_url(self, url: str) -> None: f"Catalog URL must use HTTPS (got {parsed.scheme}://). " "HTTP is only allowed for localhost." ) - if not parsed.netloc: + if not parsed.hostname: raise StepValidationError( "Catalog URL must be a valid URL with a host." ) @@ -880,11 +883,14 @@ def _validate_url(url: str) -> None: f"Catalog from {entry.url} is not a valid JSON object." ) - self.cache_dir.mkdir(parents=True, exist_ok=True) - with open(cache_file, "w", encoding="utf-8") as f: - json.dump(data, f, indent=2) - with open(meta_file, "w", encoding="utf-8") as f: - json.dump({"url": entry.url, "fetched_at": time.time()}, f) + try: + self.cache_dir.mkdir(parents=True, exist_ok=True) + with open(cache_file, "w", encoding="utf-8") as f: + json.dump(data, f, indent=2) + with open(meta_file, "w", encoding="utf-8") as f: + json.dump({"url": entry.url, "fetched_at": time.time()}, f) + except OSError: + pass # Proceed without caching if disk write fails return data diff --git a/tests/test_workflows.py b/tests/test_workflows.py index e41198459c..e467fd09d8 100644 --- a/tests/test_workflows.py +++ b/tests/test_workflows.py @@ -14,6 +14,7 @@ import json import shutil +import sys import tempfile from pathlib import Path @@ -1937,6 +1938,7 @@ def test_registry_missing_steps_key_resets(self, project_dir): registry2.add("deploy", {"name": "Deploy", "type_key": "deploy"}) assert registry2.is_installed("deploy") + @pytest.mark.skipif(sys.platform == "win32", reason="chmod not reliable on Windows") def test_registry_unreadable_file_resets(self, project_dir): """OSError reading the registry file should fall back to default.""" from specify_cli.workflows.catalog import StepRegistry From 1e5f5fdb52a09ccd9e7020dd69647c15f67ec382 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 6 May 2026 23:05:26 +0000 Subject: [PATCH 08/11] Fix shutil.move with existing step_dir: remove before move to avoid subdirectory nesting Agent-Logs-Url: https://github.com/github/spec-kit/sessions/3e18fef0-e2e6-4b3e-9e8d-9adb1e5e464e Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/__init__.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 751d41c4ce..0c76168acf 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -5515,8 +5515,12 @@ def _safe_fetch(url: str) -> bytes: (tmp_path / "__init__.py").write_bytes(init_py_content) steps_base_dir.mkdir(parents=True, exist_ok=True) + # Remove any pre-existing step_dir before the move; shutil.move would + # otherwise place tmp_path as a *subdirectory* of an existing target. + if step_dir.exists(): + shutil.rmtree(step_dir) shutil.move(str(tmp_path), str(step_dir)) - _install_success = True + _install_success = True # set immediately after move succeeds finally: if not _install_success: shutil.rmtree(tmp_path, ignore_errors=True) From bf0af270ec395ddafcff0146b723ea48f6792b73 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 7 May 2026 13:28:33 +0000 Subject: [PATCH 09/11] Call load_custom_steps at execution time; enforce hostname in _safe_fetch and _validate_url Agent-Logs-Url: https://github.com/github/spec-kit/sessions/73865880-fb25-4061-a43e-4e4b4d1c4de6 Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/__init__.py | 8 ++++++++ src/specify_cli/workflows/catalog.py | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 0c76168acf..729d5d9d19 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -4698,12 +4698,14 @@ def workflow_run( ), ): """Run a workflow from an installed ID or local YAML path.""" + from .workflows import load_custom_steps from .workflows.engine import WorkflowEngine project_root = Path.cwd() if not (project_root / ".specify").exists(): console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") raise typer.Exit(1) + load_custom_steps(project_root) engine = WorkflowEngine(project_root) engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026") @@ -4765,12 +4767,14 @@ def workflow_resume( run_id: str = typer.Argument(..., help="Run ID to resume"), ): """Resume a paused or failed workflow run.""" + from .workflows import load_custom_steps from .workflows.engine import WorkflowEngine project_root = Path.cwd() if not (project_root / ".specify").exists(): console.print("[red]Error:[/red] Not a spec-kit project (no .specify/ directory)") raise typer.Exit(1) + load_custom_steps(project_root) engine = WorkflowEngine(project_root) engine.on_step_start = lambda sid, label: console.print(f" \u25b8 [{sid}] {label} \u2026") @@ -5454,6 +5458,8 @@ def _safe_fetch(url: str) -> bytes: is_localhost = parsed.hostname in ("localhost", "127.0.0.1", "::1") if parsed.scheme != "https" and not (parsed.scheme == "http" and is_localhost): raise ValueError(f"Refusing to fetch from non-HTTPS URL: {url}") + if not parsed.hostname: + raise ValueError(f"Refusing to fetch from URL with no hostname: {url}") with urlopen(url, timeout=30) as resp: # noqa: S310 final_url = resp.geturl() final_parsed = urlparse(final_url) @@ -5462,6 +5468,8 @@ def _safe_fetch(url: str) -> bytes: final_parsed.scheme == "http" and final_is_localhost ): raise ValueError(f"Redirect to non-HTTPS URL: {final_url}") + if not final_parsed.hostname: + raise ValueError(f"Redirect to URL with no hostname: {final_url}") return resp.read() steps_base_dir = (project_root / ".specify" / "workflows" / "steps").resolve() diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index 79ca9d1ad2..da1e9f3fec 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -334,6 +334,10 @@ def _validate_catalog_url(url: str) -> None: raise WorkflowCatalogError( f"Refusing to fetch catalog from non-HTTPS URL: {url}" ) + if not parsed.hostname: + raise WorkflowCatalogError( + f"Refusing to fetch catalog from URL with no hostname: {url}" + ) _validate_catalog_url(entry.url) @@ -860,6 +864,10 @@ def _validate_url(url: str) -> None: raise StepCatalogError( f"Refusing to fetch catalog from non-HTTPS URL: {url}" ) + if not parsed.hostname: + raise StepCatalogError( + f"Refusing to fetch catalog from URL with no hostname: {url}" + ) _validate_url(entry.url) From c310420bf4cb22a5692763a76e6ef3dfe6df559a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 7 May 2026 17:41:54 +0000 Subject: [PATCH 10/11] Wrap YAML parsing in try/except; atomic step install via os.rename() under same fs Agent-Logs-Url: https://github.com/github/spec-kit/sessions/ff915bc5-ec7e-4e6a-b505-35f5795250df Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/__init__.py | 36 +++++++++++++++------------- src/specify_cli/workflows/catalog.py | 14 +++++++++-- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 729d5d9d19..33da614504 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -5483,11 +5483,19 @@ def _safe_fetch(url: str) -> bytes: import shutil import tempfile - # Download and validate in a temp directory first; only move to the final - # location on success so a transient failure can never corrupt or delete a - # pre-existing directory at step_dir. - tmp_path = Path(tempfile.mkdtemp(prefix="speckit_step_")) - _install_success = False + # Refuse if step_dir already exists (e.g. leftover from a previous failed/manual + # install that wasn't registered). The user should remove it before retrying. + if step_dir.exists(): + console.print( + f"[red]Error:[/red] Step directory already exists at '{step_dir}'. " + f"Remove it manually or use: [cyan]specify workflow step remove {step_id}[/cyan]" + ) + raise typer.Exit(1) + + # Create steps_base_dir now so the staging temp dir is on the same filesystem, + # enabling a truly atomic os.rename() below. + steps_base_dir.mkdir(parents=True, exist_ok=True) + tmp_path = Path(tempfile.mkdtemp(prefix="speckit_step_tmp_", dir=steps_base_dir)) try: try: step_yml_content = _safe_fetch(step_yml_url) @@ -5518,20 +5526,16 @@ def _safe_fetch(url: str) -> bytes: ) raise typer.Exit(1) - # Write validated files to temp location, then move atomically + # Write validated files to the staged temp location, then atomically rename + # into place. Both paths are under steps_base_dir (same filesystem), so + # os.rename() is atomic on POSIX and won't leave a partially-written + # directory at step_dir on failure. (tmp_path / "step.yml").write_bytes(step_yml_content) (tmp_path / "__init__.py").write_bytes(init_py_content) - - steps_base_dir.mkdir(parents=True, exist_ok=True) - # Remove any pre-existing step_dir before the move; shutil.move would - # otherwise place tmp_path as a *subdirectory* of an existing target. - if step_dir.exists(): - shutil.rmtree(step_dir) - shutil.move(str(tmp_path), str(step_dir)) - _install_success = True # set immediately after move succeeds + os.rename(tmp_path, step_dir) finally: - if not _install_success: - shutil.rmtree(tmp_path, ignore_errors=True) + # Clean up if the rename hasn't moved tmp_path yet (i.e. on any failure). + shutil.rmtree(tmp_path, ignore_errors=True) # Register in step registry registry = StepRegistry(project_root) diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index da1e9f3fec..a02198b9d9 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -992,7 +992,12 @@ def add_catalog(self, url: str, name: str | None = None) -> None: data: dict[str, Any] = {"catalogs": []} if config_path.exists(): - raw = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} + try: + raw = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} + except (yaml.YAMLError, OSError, UnicodeDecodeError) as exc: + raise StepValidationError( + f"Catalog config file is unreadable or malformed: {exc}" + ) from exc if not isinstance(raw, dict): raise StepValidationError( "Catalog config file is corrupted (expected a mapping)." @@ -1035,7 +1040,12 @@ def remove_catalog(self, index: int) -> str: if not config_path.exists(): raise StepValidationError("No step catalog config file found.") - data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} + try: + data = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {} + except (yaml.YAMLError, OSError, UnicodeDecodeError) as exc: + raise StepValidationError( + f"Catalog config file is unreadable or malformed: {exc}" + ) from exc if not isinstance(data, dict): raise StepValidationError( "Catalog config file is corrupted (expected a mapping)." From bc1e2526b347b50d0b7870facd22e11d96daa4f2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 7 May 2026 18:30:41 +0000 Subject: [PATCH 11/11] Validate YAML root is a dict in _load_catalog_config and workflow_step_add; fix WorkflowCatalog hostname validation Co-authored-by: mnriem <15701806+mnriem@users.noreply.github.com> --- src/specify_cli/__init__.py | 7 +++++++ src/specify_cli/workflows/catalog.py | 7 ++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/src/specify_cli/__init__.py b/src/specify_cli/__init__.py index 33da614504..188828f5f3 100644 --- a/src/specify_cli/__init__.py +++ b/src/specify_cli/__init__.py @@ -5513,7 +5513,14 @@ def _safe_fetch(url: str) -> bytes: console.print(f"[red]Error:[/red] Invalid step.yml: {exc}") raise typer.Exit(1) + if not isinstance(meta, dict): + console.print("[red]Error:[/red] step.yml must be a YAML mapping") + raise typer.Exit(1) + step_meta = meta.get("step", {}) + if not isinstance(step_meta, dict): + console.print("[red]Error:[/red] step.yml 'step' field must be a mapping") + raise typer.Exit(1) type_key = step_meta.get("type_key", "") if not type_key: console.print("[red]Error:[/red] step.yml missing 'step.type_key' field") diff --git a/src/specify_cli/workflows/catalog.py b/src/specify_cli/workflows/catalog.py index a02198b9d9..c20fc020fb 100644 --- a/src/specify_cli/workflows/catalog.py +++ b/src/specify_cli/workflows/catalog.py @@ -166,7 +166,7 @@ def _validate_catalog_url(self, url: str) -> None: f"Catalog URL must use HTTPS (got {parsed.scheme}://). " "HTTP is only allowed for localhost." ) - if not parsed.netloc: + if not parsed.hostname: raise WorkflowValidationError( "Catalog URL must be a valid URL with a host." ) @@ -716,6 +716,11 @@ def _load_catalog_config( raise StepValidationError( f"Failed to read catalog config {config_path}: {exc}" ) + if not isinstance(data, dict): + raise StepValidationError( + f"Invalid catalog config: expected a mapping, " + f"got {type(data).__name__}" + ) catalogs_data = data.get("catalogs", []) if not catalogs_data: return None