From f79115fef91fcdaa511416a9e29d00e46110ed22 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Thu, 11 Jun 2026 19:53:06 +0000 Subject: [PATCH 1/4] feat: add workflow stage resume --- docs/concepts/workflow-chaining.md | 13 +- .../pages/concepts/workflow-chaining.mdx | 13 +- .../interface/composite_workflow.py | 175 +++++++++++++++++- .../interface/test_composite_workflow.py | 146 ++++++++++++++- plans/workflow-chaining/workflow-chaining.md | 4 +- 5 files changed, 342 insertions(+), 9 deletions(-) diff --git a/docs/concepts/workflow-chaining.md b/docs/concepts/workflow-chaining.md index 2b8c3cc3f..51ead652a 100644 --- a/docs/concepts/workflow-chaining.md +++ b/docs/concepts/workflow-chaining.md @@ -98,10 +98,21 @@ workflow.add_stage("cleanup", cleanup) This is useful for final cleanup, schema transforms, and format-specific export preparation. +## Resume + +Workflow names are durable artifact identities. Reusing the same name with `resume=ResumeMode.IF_POSSIBLE` reuses compatible completed stages, resumes a matching partial stage through `DataDesigner.create(..., resume=ResumeMode.ALWAYS)`, and reruns the first changed or missing stage plus its descendants. + +```python +from data_designer.interface import ResumeMode + +results = workflow.run(resume=ResumeMode.IF_POSSIBLE) +``` + +Use `ResumeMode.ALWAYS` when every reusable stage must match the prior workflow metadata. If a stage changed or its selected output is missing, the workflow raises instead of starting fresh. + ## Current limits - Stages are linear. DAGs, parallel branches, and joins are planned separately. -- Stage-level resume is not implemented yet. - `push_to_hub()` does not support selected processor or callback outputs yet. Use `export()` for the selected workflow output. - `on_success` callbacks are trusted user code. If a callback returns a path, Data Designer reads that path as the next stage input. - The artifact layout is intended for inspection, but it is not yet a stable public contract. diff --git a/fern/versions/latest/pages/concepts/workflow-chaining.mdx b/fern/versions/latest/pages/concepts/workflow-chaining.mdx index 998d0dfd0..ef0b41aeb 100644 --- a/fern/versions/latest/pages/concepts/workflow-chaining.mdx +++ b/fern/versions/latest/pages/concepts/workflow-chaining.mdx @@ -102,10 +102,21 @@ workflow.add_stage("cleanup", cleanup) This is useful for final cleanup, schema transforms, and format-specific export preparation. +## Resume + +Workflow names are durable artifact identities. Reusing the same name with `resume=ResumeMode.IF_POSSIBLE` reuses compatible completed stages, resumes a matching partial stage through `DataDesigner.create(..., resume=ResumeMode.ALWAYS)`, and reruns the first changed or missing stage plus its descendants. + +```python +from data_designer.interface import ResumeMode + +results = workflow.run(resume=ResumeMode.IF_POSSIBLE) +``` + +Use `ResumeMode.ALWAYS` when every reusable stage must match the prior workflow metadata. If a stage changed or its selected output is missing, the workflow raises instead of starting fresh. + ## Current limits - Stages are linear. DAGs, parallel branches, and joins are planned separately. -- Stage-level resume is not implemented yet. - `push_to_hub()` does not support selected processor or callback outputs yet. Use `export()` for the selected workflow output. - `on_success` callbacks are trusted user code. If a callback returns a path, Data Designer reads that path as the next stage input. - The artifact layout is intended for inspection, but it is not yet a stable public contract. diff --git a/packages/data-designer/src/data_designer/interface/composite_workflow.py b/packages/data-designer/src/data_designer/interface/composite_workflow.py index e66ec1bff..1946765a0 100644 --- a/packages/data-designer/src/data_designer/interface/composite_workflow.py +++ b/packages/data-designer/src/data_designer/interface/composite_workflow.py @@ -14,9 +14,11 @@ from typing import TYPE_CHECKING, Any import data_designer.lazy_heavy_imports as lazy +from data_designer.config.analysis.dataset_profiler import DatasetProfilerResults from data_designer.config.base import ProcessorConfig from data_designer.config.config_builder import BuilderConfig, DataDesignerConfigBuilder from data_designer.config.data_designer_config import DataDesignerConfig +from data_designer.config.dataset_metadata import DatasetMetadata from data_designer.config.errors import InvalidFileFormatError from data_designer.config.seed import IndexRange, PartitionBlock, SamplingStrategy from data_designer.config.seed_source import LocalFileSeedSource @@ -24,6 +26,7 @@ from data_designer.config.utils.type_helpers import StrEnum from data_designer.config.version import get_library_version from data_designer.engine.dataset_builders.errors import ArtifactStorageError +from data_designer.engine.storage.artifact_storage import ArtifactStorage, ResumeMode from data_designer.interface.errors import DataDesignerWorkflowError from data_designer.interface.results import ( SUPPORTED_EXPORT_FORMATS, @@ -37,13 +40,15 @@ if TYPE_CHECKING: import pandas as pd - from data_designer.config.analysis.dataset_profiler import DatasetProfilerResults from data_designer.interface.data_designer import DataDesigner logger = logging.getLogger(__name__) OnSuccessCallback = Callable[[Path], Path | str] +WORKFLOW_METADATA_FILENAME = "workflow-metadata.json" +COMPLETED_STAGE_STATUSES = {"completed", "completed_empty"} +RESUMABLE_STAGE_STATUSES = {"running", "failed"} @dataclass(frozen=True) @@ -221,8 +226,8 @@ def add_stage( ) return self - def run(self) -> CompositeWorkflowResults: - """Run all stages from scratch. + def run(self, *, resume: ResumeMode = ResumeMode.NEVER) -> CompositeWorkflowResults: + """Run all stages, optionally reusing compatible completed stage outputs. Each stage writes a deterministic artifact directory under the parent Data Designer artifact path. Downstream stages are seeded from the @@ -233,6 +238,7 @@ def run(self) -> CompositeWorkflowResults: workflow_path = self._data_designer.artifact_path / self.name workflow_path.mkdir(parents=True, exist_ok=True) + prior_metadata = _read_prior_workflow_metadata(workflow_path, self.name, resume) metadata: dict[str, Any] = { "name": self.name, "library_version": get_library_version(), @@ -245,6 +251,7 @@ def run(self) -> CompositeWorkflowResults: previous_stage_name: str | None = None previous_stage_fingerprint: str | None = None skipped_upstream_stage: str | None = None + force_rerun_downstream = False for index, stage in enumerate(self._stages): stage_dir_name = _stage_dir_name(index, stage.name) @@ -288,7 +295,43 @@ def run(self) -> CompositeWorkflowResults: upstream_fingerprint=previous_stage_fingerprint, ) stage_path = workflow_path / stage_dir_name - if stage_path.exists(): + prior_stage_metadata = _get_prior_stage_metadata(prior_metadata, index, stage, stage_dir_name) + stage_resume = ResumeMode.NEVER + prior_matches = ( + not force_rerun_downstream + and prior_stage_metadata is not None + and prior_stage_metadata.get("fingerprint") == stage_fingerprint + ) + + if prior_matches and _can_skip_prior_stage(stage, prior_stage_metadata): + stage_metadata.update(prior_stage_metadata) + output_seed_path = Path(stage_metadata["output_seed_path"]) + output_records = _count_parquet_records(output_seed_path) + output_result = _stage_result_from_metadata( + workflow_path=workflow_path, + stage=stage, + stage_dir_name=stage_dir_name, + stage_builder=stage_builder, + ) + stage_results[stage.name] = output_result + stage_output_paths[stage.name] = output_seed_path + previous_seed_path = output_seed_path + previous_output_records = output_records + previous_stage_name = stage.name + previous_stage_fingerprint = stage_fingerprint + if stage_metadata["status"] == "completed_empty": + skipped_upstream_stage = stage.name + _write_workflow_metadata(workflow_path, metadata) + continue + + if prior_matches and prior_stage_metadata.get("status") in RESUMABLE_STAGE_STATUSES and stage_path.exists(): + stage_resume = ResumeMode.ALWAYS + elif resume == ResumeMode.ALWAYS and not force_rerun_downstream: + raise DataDesignerWorkflowError( + f"Cannot resume workflow {self.name!r}: stage {stage.name!r} is not reusable." + ) + + if stage_resume == ResumeMode.NEVER and stage_path.exists(): shutil.rmtree(stage_path) stage_metadata.update( @@ -310,11 +353,15 @@ def run(self) -> CompositeWorkflowResults: num_records=num_records, dataset_name=stage_dir_name, artifact_path=workflow_path, + resume=stage_resume, ) actual_records = result.count_records() output_result = result output_source_result = result if stage.output_processors: + output_processor_path = stage_path / "output-processors" + if output_processor_path.exists(): + shutil.rmtree(output_processor_path) output_processor_builder = _output_processor_config_builder( stage_builder=stage_builder, seed_path=result.artifact_storage.final_dataset_path, @@ -368,6 +415,7 @@ def run(self) -> CompositeWorkflowResults: previous_output_records = output_records previous_stage_name = stage.name previous_stage_fingerprint = stage_fingerprint + force_rerun_downstream = True _write_workflow_metadata(workflow_path, metadata) return CompositeWorkflowResults( @@ -378,6 +426,123 @@ def run(self) -> CompositeWorkflowResults: ) +def _read_prior_workflow_metadata( + workflow_path: Path, + workflow_name: str, + resume: ResumeMode, +) -> dict[str, Any] | None: + if resume == ResumeMode.NEVER: + return None + metadata_path = workflow_path / WORKFLOW_METADATA_FILENAME + if not metadata_path.exists(): + if resume == ResumeMode.ALWAYS: + raise DataDesignerWorkflowError(f"Cannot resume workflow {workflow_name!r}: no workflow metadata found.") + return None + try: + metadata = json.loads(metadata_path.read_text(encoding="utf-8")) + except json.JSONDecodeError as exc: + raise DataDesignerWorkflowError( + f"Cannot resume workflow {workflow_name!r}: workflow metadata is corrupt." + ) from exc + except OSError as exc: + raise DataDesignerWorkflowError( + f"Cannot resume workflow {workflow_name!r}: workflow metadata could not be read." + ) from exc + if metadata.get("name") != workflow_name: + raise DataDesignerWorkflowError( + f"Cannot resume workflow {workflow_name!r}: workflow metadata name does not match." + ) + return metadata + + +def _get_prior_stage_metadata( + prior_metadata: dict[str, Any] | None, + index: int, + stage: _WorkflowStage, + stage_dir_name: str, +) -> dict[str, Any] | None: + if prior_metadata is None: + return None + stages = prior_metadata.get("stages") + if not isinstance(stages, list) or index >= len(stages): + return None + prior_stage = stages[index] + if not isinstance(prior_stage, dict): + return None + if prior_stage.get("name") != stage.name or prior_stage.get("stage_dir") != stage_dir_name: + return None + return prior_stage + + +def _can_skip_prior_stage(stage: _WorkflowStage, prior_stage_metadata: dict[str, Any]) -> bool: + if prior_stage_metadata.get("status") not in COMPLETED_STAGE_STATUSES: + return False + if stage.on_success is not None and stage.on_success_version is None: + return False + output_seed_path = prior_stage_metadata.get("output_seed_path") + if not isinstance(output_seed_path, str) or not output_seed_path: + return False + try: + _count_parquet_records(Path(output_seed_path)) + except DataDesignerWorkflowError: + return False + return True + + +def _stage_result_from_metadata( + *, + workflow_path: Path, + stage: _WorkflowStage, + stage_dir_name: str, + stage_builder: DataDesignerConfigBuilder, +) -> DatasetCreationResults: + main_storage = ArtifactStorage(artifact_path=workflow_path, dataset_name=stage_dir_name, resume=ResumeMode.ALWAYS) + result_storage = main_storage + result_builder = stage_builder + if stage.output_processors: + result_storage = ArtifactStorage( + artifact_path=workflow_path / stage_dir_name, + dataset_name="output-processors", + resume=ResumeMode.ALWAYS, + ) + result_builder = _output_processor_config_builder( + stage_builder=stage_builder, + seed_path=main_storage.final_dataset_path, + output_processors=stage.output_processors, + ) + return DatasetCreationResults( + artifact_storage=result_storage, + analysis=_load_stage_analysis(result_storage), + config_builder=result_builder, + dataset_metadata=DatasetMetadata(), + ) + + +def _load_stage_analysis(artifact_storage: ArtifactStorage) -> Any: + try: + metadata = artifact_storage.read_metadata() + except (FileNotFoundError, json.JSONDecodeError, OSError): + return None + column_statistics = metadata.get("column_statistics") + if not column_statistics: + return None + num_records = metadata.get("actual_num_records") + if num_records is None: + num_records = _count_parquet_records(artifact_storage.final_dataset_path) + try: + return DatasetProfilerResults.model_validate( + { + "num_records": num_records, + "target_num_records": metadata.get("target_num_records", num_records), + "column_statistics": column_statistics, + "side_effect_column_names": metadata.get("side_effect_column_names"), + "column_profiles": metadata.get("column_profiles"), + } + ) + except Exception: + return None + + def _clone_config_builder(config_builder: DataDesignerConfigBuilder) -> DataDesignerConfigBuilder: return DataDesignerConfigBuilder.from_config(BuilderConfig(data_designer=config_builder.build())) @@ -527,7 +692,7 @@ def _parquet_files(path: Path) -> list[Path]: def _write_workflow_metadata(workflow_path: Path, metadata: dict[str, Any]) -> None: - path = workflow_path / "workflow-metadata.json" + path = workflow_path / WORKFLOW_METADATA_FILENAME path.write_text(json.dumps(metadata, indent=2, sort_keys=True), encoding="utf-8") diff --git a/packages/data-designer/tests/interface/test_composite_workflow.py b/packages/data-designer/tests/interface/test_composite_workflow.py index 2780e7a13..ff416c8be 100644 --- a/packages/data-designer/tests/interface/test_composite_workflow.py +++ b/packages/data-designer/tests/interface/test_composite_workflow.py @@ -20,7 +20,7 @@ from data_designer.config.seed_source import LocalFileSeedSource from data_designer.config.seed_source_dataframe import DataFrameSeedSource from data_designer.engine.secret_resolver import PlaintextResolver -from data_designer.engine.storage.artifact_storage import ArtifactStorage, BatchStage +from data_designer.engine.storage.artifact_storage import ArtifactStorage, BatchStage, ResumeMode from data_designer.interface.composite_workflow import SkippedStageResult, SkippedStageStatus from data_designer.interface.data_designer import DataDesigner from data_designer.interface.errors import DataDesignerWorkflowError @@ -431,6 +431,150 @@ def test_composite_workflow_clones_stage_builders_on_add( assert [column.name for column in stage_builder.get_column_configs()] == ["category"] +def test_composite_workflow_resume_if_possible_skips_completed_stages( + stub_artifact_path: Path, + stub_model_providers: list[ModelProvider], + stub_model_configs: list[ModelConfig], + stub_dataset_profiler_results, +) -> None: + data_designer = _data_designer(stub_artifact_path, stub_model_providers) + create_mock = _patch_create(data_designer, stub_dataset_profiler_results) + workflow = data_designer.compose_workflow(name="resume-skip") + workflow.add_stage("base", _category_builder(stub_model_configs), num_records=3) + workflow.add_stage("copy", _copy_builder(stub_model_configs)) + workflow.run() + create_mock.reset_mock() + + resumed = data_designer.compose_workflow(name="resume-skip") + resumed.add_stage("base", _category_builder(stub_model_configs), num_records=3) + resumed.add_stage("copy", _copy_builder(stub_model_configs)) + results = resumed.run(resume=ResumeMode.IF_POSSIBLE) + + assert create_mock.call_count == 0 + assert results.count_records() == 3 + assert results.load_dataset()["category"].tolist() == ["alpha", "alpha", "alpha"] + + +def test_composite_workflow_resume_if_possible_reruns_changed_stage_only( + stub_artifact_path: Path, + stub_model_providers: list[ModelProvider], + stub_model_configs: list[ModelConfig], + stub_dataset_profiler_results, +) -> None: + data_designer = _data_designer(stub_artifact_path, stub_model_providers) + create_mock = _patch_create(data_designer, stub_dataset_profiler_results) + workflow = data_designer.compose_workflow(name="resume-changed") + workflow.add_stage("base", _category_builder(stub_model_configs), num_records=2) + workflow.add_stage("copy", _copy_builder(stub_model_configs)) + workflow.run() + sentinel = stub_artifact_path / "resume-changed" / "stage-0-base" / "keep.txt" + sentinel.write_text("keep", encoding="utf-8") + create_mock.reset_mock() + + resumed = data_designer.compose_workflow(name="resume-changed") + resumed.add_stage("base", _category_builder(stub_model_configs), num_records=2) + resumed.add_stage("copy", _expression_builder(stub_model_configs, "category_copy", "{{ category }} v2")) + resumed.run(resume=ResumeMode.IF_POSSIBLE) + + assert [call.kwargs["dataset_name"] for call in create_mock.call_args_list] == ["stage-1-copy"] + assert sentinel.exists() + + +def test_composite_workflow_resume_if_possible_missing_callback_output_reruns_descendants( + stub_artifact_path: Path, + stub_model_providers: list[ModelProvider], + stub_model_configs: list[ModelConfig], + stub_dataset_profiler_results, +) -> None: + data_designer = _data_designer(stub_artifact_path, stub_model_providers) + create_mock = _patch_create(data_designer, stub_dataset_profiler_results) + + def keep_first(stage_path: Path) -> Path: + df = lazy.pd.read_parquet(stage_path / "parquet-files") + output_path = stage_path / "callback-outputs" / "first-row" + output_path.mkdir(parents=True) + df.head(1).to_parquet(output_path / "data.parquet", index=False) + return output_path + + workflow = data_designer.compose_workflow(name="resume-callback") + workflow.add_stage( + "base", + _category_builder(stub_model_configs), + num_records=3, + on_success=keep_first, + on_success_version="first-row", + ) + workflow.add_stage("copy", _copy_builder(stub_model_configs)) + workflow.run() + callback_output = stub_artifact_path / "resume-callback" / "stage-0-base" / "callback-outputs" / "first-row" + for parquet_file in callback_output.glob("*.parquet"): + parquet_file.unlink() + create_mock.reset_mock() + + resumed = data_designer.compose_workflow(name="resume-callback") + resumed.add_stage( + "base", + _category_builder(stub_model_configs), + num_records=3, + on_success=keep_first, + on_success_version="first-row", + ) + resumed.add_stage("copy", _copy_builder(stub_model_configs)) + resumed.run(resume=ResumeMode.IF_POSSIBLE) + + assert [call.kwargs["dataset_name"] for call in create_mock.call_args_list] == ["stage-0-base", "stage-1-copy"] + + +def test_composite_workflow_resume_if_possible_delegates_matching_partial_stage( + stub_artifact_path: Path, + stub_model_providers: list[ModelProvider], + stub_model_configs: list[ModelConfig], + stub_dataset_profiler_results, +) -> None: + data_designer = _data_designer(stub_artifact_path, stub_model_providers) + create_mock = _patch_create(data_designer, stub_dataset_profiler_results) + workflow = data_designer.compose_workflow(name="resume-partial") + workflow.add_stage("base", _category_builder(stub_model_configs), num_records=2) + workflow.add_stage("copy", _copy_builder(stub_model_configs)) + workflow.run() + metadata_path = stub_artifact_path / "resume-partial" / "workflow-metadata.json" + metadata = json.loads(metadata_path.read_text(encoding="utf-8")) + metadata["stages"][0]["status"] = "running" + metadata_path.write_text(json.dumps(metadata), encoding="utf-8") + create_mock.reset_mock() + + resumed = data_designer.compose_workflow(name="resume-partial") + resumed.add_stage("base", _category_builder(stub_model_configs), num_records=2) + resumed.add_stage("copy", _copy_builder(stub_model_configs)) + resumed.run(resume=ResumeMode.IF_POSSIBLE) + + assert [call.kwargs["dataset_name"] for call in create_mock.call_args_list] == ["stage-0-base", "stage-1-copy"] + assert [call.kwargs["resume"] for call in create_mock.call_args_list] == [ResumeMode.ALWAYS, ResumeMode.NEVER] + + +def test_composite_workflow_resume_always_rejects_changed_stage( + stub_artifact_path: Path, + stub_model_providers: list[ModelProvider], + stub_model_configs: list[ModelConfig], + stub_dataset_profiler_results, +) -> None: + data_designer = _data_designer(stub_artifact_path, stub_model_providers) + create_mock = _patch_create(data_designer, stub_dataset_profiler_results) + workflow = data_designer.compose_workflow(name="resume-always") + workflow.add_stage("base", _category_builder(stub_model_configs), num_records=2) + workflow.add_stage("copy", _copy_builder(stub_model_configs)) + workflow.run() + create_mock.reset_mock() + + resumed = data_designer.compose_workflow(name="resume-always") + resumed.add_stage("base", _category_builder(stub_model_configs), num_records=2) + resumed.add_stage("copy", _expression_builder(stub_model_configs, "category_copy", "{{ category }} v2")) + with pytest.raises(DataDesignerWorkflowError, match="not reusable"): + resumed.run(resume=ResumeMode.ALWAYS) + + assert create_mock.call_count == 0 + + def test_composite_workflow_runs_three_real_async_stages( tmp_path: Path, stub_model_providers: list[ModelProvider], diff --git a/plans/workflow-chaining/workflow-chaining.md b/plans/workflow-chaining/workflow-chaining.md index dc7358511..b5a7bbff5 100644 --- a/plans/workflow-chaining/workflow-chaining.md +++ b/plans/workflow-chaining/workflow-chaining.md @@ -401,7 +401,7 @@ result_2 = data_designer.create(config_2, num_records=200) # explode: 50 -> 200 - Add `compose_workflow(name: str)` factory method on `DataDesigner`. - Tests: multi-stage runs, explode/filter via callbacks, num_records defaulting, duplicate stage-name rejection, artifact layout, throttle reuse across stages. -**Status after PR #636:** Implemented `CompositeWorkflow`, `compose_workflow()`, `to_config_builder()`, disk handoff, stage metadata, `acreate()`, shared throttle manager reuse, explicit stage artifact roots, cloned stage builders, concurrent-safe seed reader/resource-provider handling, seeded processor-only configs, stage output processors, and stage output selection. Still deferred: stage-level resume, DAG branches, `allow_resize` removal, config bundles, and broader first-class artifact seeding. +**Status after PR #636:** Implemented `CompositeWorkflow`, `compose_workflow()`, `to_config_builder()`, disk handoff, stage metadata, `acreate()`, shared throttle manager reuse, explicit stage artifact roots, cloned stage builders, concurrent-safe seed reader/resource-provider handling, seeded processor-only configs, stage output processors, and stage output selection. Still deferred after #636: stage-level resume, DAG branches, `allow_resize` removal, config bundles, and broader first-class artifact seeding. ### Sidecar: `acreate()` on `DataDesigner` (independent of chaining v1) @@ -433,6 +433,8 @@ result_2 = data_designer.create(config_2, num_records=200) # explode: 50 -> 200 - For invalidated stages, clear or replace the deterministic stage directory before starting fresh so `ArtifactStorage` does not timestamp away from the workflow layout. - Depends on artifact layout from phase 1. +**Status after stage-level resume slice:** Implemented `workflow.run(resume=...)`, compatible completed-stage reuse, matching partial-stage delegation to `DataDesigner.create(..., resume=ResumeMode.ALWAYS)`, downstream invalidation after changed or missing stages, callback output path checks, and docs for `ResumeMode.IF_POSSIBLE` / `ResumeMode.ALWAYS`. Still deferred: DAG branches, `allow_resize` removal, config bundles, and broader first-class artifact seeding. + ### Phase 4: DAG-shaped stages with parallel branches - Extend `add_stage()` with an optional `depends_on=[stage_name, ...]` argument; default keeps the linear behavior. From f302206c433522da6b5bbc11dce86aba26f90e20 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Thu, 11 Jun 2026 20:05:27 +0000 Subject: [PATCH 2/4] test: cover workflow resume output processors --- .../interface/test_composite_workflow.py | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/packages/data-designer/tests/interface/test_composite_workflow.py b/packages/data-designer/tests/interface/test_composite_workflow.py index ff416c8be..aea17822d 100644 --- a/packages/data-designer/tests/interface/test_composite_workflow.py +++ b/packages/data-designer/tests/interface/test_composite_workflow.py @@ -455,6 +455,44 @@ def test_composite_workflow_resume_if_possible_skips_completed_stages( assert results.load_dataset()["category"].tolist() == ["alpha", "alpha", "alpha"] +def test_composite_workflow_resume_if_possible_skips_stage_with_output_processors( + tmp_path: Path, + stub_model_providers: list[ModelProvider], + stub_model_configs: list[ModelConfig], +) -> None: + stage = _seeded_builder(stub_model_configs, [{"name": "Ada", "secret": "hidden"}]) + stage.add_column(ExpressionColumnConfig(name="public_name", expr="{{ name }}")) + + data_designer = _real_data_designer(tmp_path / "artifacts", stub_model_providers) + workflow = data_designer.compose_workflow(name="resume-output-processors") + workflow.add_stage( + "base", + stage, + num_records=1, + output_processors=[DropColumnsProcessorConfig(name="drop_secret", column_names=["secret"])], + ) + workflow.add_stage("final", _expression_builder(stub_model_configs, "final", "{{ public_name }} final")) + first = workflow.run() + output_processor_file = first["base"].artifact_storage.final_dataset_path / "batch_00000.parquet" + output_processor_mtime = output_processor_file.stat().st_mtime_ns + + resumed = data_designer.compose_workflow(name="resume-output-processors") + resumed.add_stage( + "base", + stage, + num_records=1, + output_processors=[DropColumnsProcessorConfig(name="drop_secret", column_names=["secret"])], + ) + resumed.add_stage("final", _expression_builder(stub_model_configs, "final", "{{ public_name }} final")) + results = resumed.run(resume=ResumeMode.IF_POSSIBLE) + + assert "secret" not in results["base"].load_dataset().columns + assert results.load_dataset().to_dict(orient="records") == [ + {"name": "Ada", "public_name": "Ada", "final": "Ada final"} + ] + assert output_processor_file.stat().st_mtime_ns == output_processor_mtime + + def test_composite_workflow_resume_if_possible_reruns_changed_stage_only( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], From 23c167673897298c383dca6ab4f08a2dd90ae0c1 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Thu, 11 Jun 2026 20:16:45 +0000 Subject: [PATCH 3/4] fix: harden workflow resume metadata --- .../interface/composite_workflow.py | 20 +++- .../interface/test_composite_workflow.py | 108 +++++++++++++++++- 2 files changed, 125 insertions(+), 3 deletions(-) diff --git a/packages/data-designer/src/data_designer/interface/composite_workflow.py b/packages/data-designer/src/data_designer/interface/composite_workflow.py index 1946765a0..c6ccf1e5f 100644 --- a/packages/data-designer/src/data_designer/interface/composite_workflow.py +++ b/packages/data-designer/src/data_designer/interface/composite_workflow.py @@ -6,6 +6,7 @@ import hashlib import json import logging +import os import shutil import time from collections.abc import Callable, ItemsView, Iterator, KeysView @@ -441,14 +442,23 @@ def _read_prior_workflow_metadata( try: metadata = json.loads(metadata_path.read_text(encoding="utf-8")) except json.JSONDecodeError as exc: + if resume != ResumeMode.ALWAYS: + logger.warning("Workflow metadata for %r is corrupt; starting fresh.", workflow_name) + return None raise DataDesignerWorkflowError( f"Cannot resume workflow {workflow_name!r}: workflow metadata is corrupt." ) from exc except OSError as exc: + if resume != ResumeMode.ALWAYS: + logger.warning("Workflow metadata for %r could not be read; starting fresh.", workflow_name) + return None raise DataDesignerWorkflowError( f"Cannot resume workflow {workflow_name!r}: workflow metadata could not be read." ) from exc if metadata.get("name") != workflow_name: + if resume != ResumeMode.ALWAYS: + logger.warning("Workflow metadata for %r has a different name; starting fresh.", workflow_name) + return None raise DataDesignerWorkflowError( f"Cannot resume workflow {workflow_name!r}: workflow metadata name does not match." ) @@ -693,7 +703,15 @@ def _parquet_files(path: Path) -> list[Path]: def _write_workflow_metadata(workflow_path: Path, metadata: dict[str, Any]) -> None: path = workflow_path / WORKFLOW_METADATA_FILENAME - path.write_text(json.dumps(metadata, indent=2, sort_keys=True), encoding="utf-8") + tmp_path = path.with_name(f"{path.name}.tmp.{os.getpid()}") + try: + with tmp_path.open("w", encoding="utf-8") as f: + json.dump(metadata, f, indent=2, sort_keys=True) + f.flush() + os.fsync(f.fileno()) + os.replace(tmp_path, path) + finally: + tmp_path.unlink(missing_ok=True) def _validate_stage_output(output: str) -> None: diff --git a/packages/data-designer/tests/interface/test_composite_workflow.py b/packages/data-designer/tests/interface/test_composite_workflow.py index aea17822d..0f935b168 100644 --- a/packages/data-designer/tests/interface/test_composite_workflow.py +++ b/packages/data-designer/tests/interface/test_composite_workflow.py @@ -493,6 +493,51 @@ def test_composite_workflow_resume_if_possible_skips_stage_with_output_processor assert output_processor_file.stat().st_mtime_ns == output_processor_mtime +def test_composite_workflow_resume_if_possible_preserves_completed_empty_skip( + stub_artifact_path: Path, + stub_model_providers: list[ModelProvider], + stub_model_configs: list[ModelConfig], + stub_dataset_profiler_results, +) -> None: + data_designer = _data_designer(stub_artifact_path, stub_model_providers) + create_mock = _patch_create(data_designer, stub_dataset_profiler_results) + + def empty_output(stage_path: Path) -> Path: + output_path = stage_path / "callback-outputs" / "empty" + output_path.mkdir(parents=True) + lazy.pd.DataFrame({"category": []}).to_parquet(output_path / "data.parquet", index=False) + return output_path + + workflow = data_designer.compose_workflow(name="resume-empty") + workflow.add_stage( + "base", + _category_builder(stub_model_configs), + num_records=2, + on_success=empty_output, + on_success_version="empty", + allow_empty=True, + ) + workflow.add_stage("copy", _copy_builder(stub_model_configs)) + workflow.run() + create_mock.reset_mock() + + resumed = data_designer.compose_workflow(name="resume-empty") + resumed.add_stage( + "base", + _category_builder(stub_model_configs), + num_records=2, + on_success=empty_output, + on_success_version="empty", + allow_empty=True, + ) + resumed.add_stage("copy", _copy_builder(stub_model_configs)) + results = resumed.run(resume=ResumeMode.IF_POSSIBLE) + + assert create_mock.call_count == 0 + assert isinstance(results["copy"], SkippedStageResult) + assert results["copy"].upstream_stage == "base" + + def test_composite_workflow_resume_if_possible_reruns_changed_stage_only( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], @@ -563,11 +608,37 @@ def keep_first(stage_path: Path) -> Path: assert [call.kwargs["dataset_name"] for call in create_mock.call_args_list] == ["stage-0-base", "stage-1-copy"] -def test_composite_workflow_resume_if_possible_delegates_matching_partial_stage( +def test_composite_workflow_resume_if_possible_corrupt_metadata_starts_fresh( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], stub_model_configs: list[ModelConfig], stub_dataset_profiler_results, +) -> None: + data_designer = _data_designer(stub_artifact_path, stub_model_providers) + create_mock = _patch_create(data_designer, stub_dataset_profiler_results) + workflow = data_designer.compose_workflow(name="resume-corrupt") + workflow.add_stage("base", _category_builder(stub_model_configs), num_records=2) + workflow.add_stage("copy", _copy_builder(stub_model_configs)) + workflow.run() + metadata_path = stub_artifact_path / "resume-corrupt" / "workflow-metadata.json" + metadata_path.write_text("{", encoding="utf-8") + create_mock.reset_mock() + + resumed = data_designer.compose_workflow(name="resume-corrupt") + resumed.add_stage("base", _category_builder(stub_model_configs), num_records=2) + resumed.add_stage("copy", _copy_builder(stub_model_configs)) + resumed.run(resume=ResumeMode.IF_POSSIBLE) + + assert [call.kwargs["dataset_name"] for call in create_mock.call_args_list] == ["stage-0-base", "stage-1-copy"] + + +@pytest.mark.parametrize("status", ["running", "failed"]) +def test_composite_workflow_resume_if_possible_delegates_matching_resumable_stage( + stub_artifact_path: Path, + stub_model_providers: list[ModelProvider], + stub_model_configs: list[ModelConfig], + stub_dataset_profiler_results, + status: str, ) -> None: data_designer = _data_designer(stub_artifact_path, stub_model_providers) create_mock = _patch_create(data_designer, stub_dataset_profiler_results) @@ -577,7 +648,7 @@ def test_composite_workflow_resume_if_possible_delegates_matching_partial_stage( workflow.run() metadata_path = stub_artifact_path / "resume-partial" / "workflow-metadata.json" metadata = json.loads(metadata_path.read_text(encoding="utf-8")) - metadata["stages"][0]["status"] = "running" + metadata["stages"][0]["status"] = status metadata_path.write_text(json.dumps(metadata), encoding="utf-8") create_mock.reset_mock() @@ -590,6 +661,39 @@ def test_composite_workflow_resume_if_possible_delegates_matching_partial_stage( assert [call.kwargs["resume"] for call in create_mock.call_args_list] == [ResumeMode.ALWAYS, ResumeMode.NEVER] +def test_composite_workflow_resume_always_requires_metadata( + stub_artifact_path: Path, + stub_model_providers: list[ModelProvider], + stub_model_configs: list[ModelConfig], +) -> None: + data_designer = _data_designer(stub_artifact_path, stub_model_providers) + workflow = data_designer.compose_workflow(name="resume-missing") + workflow.add_stage("base", _category_builder(stub_model_configs), num_records=2) + + with pytest.raises(DataDesignerWorkflowError, match="no workflow metadata found"): + workflow.run(resume=ResumeMode.ALWAYS) + + +def test_composite_workflow_resume_always_rejects_corrupt_metadata( + stub_artifact_path: Path, + stub_model_providers: list[ModelProvider], + stub_model_configs: list[ModelConfig], + stub_dataset_profiler_results, +) -> None: + data_designer = _data_designer(stub_artifact_path, stub_model_providers) + _patch_create(data_designer, stub_dataset_profiler_results) + workflow = data_designer.compose_workflow(name="resume-corrupt-always") + workflow.add_stage("base", _category_builder(stub_model_configs), num_records=2) + workflow.run() + metadata_path = stub_artifact_path / "resume-corrupt-always" / "workflow-metadata.json" + metadata_path.write_text("{", encoding="utf-8") + + resumed = data_designer.compose_workflow(name="resume-corrupt-always") + resumed.add_stage("base", _category_builder(stub_model_configs), num_records=2) + with pytest.raises(DataDesignerWorkflowError, match="workflow metadata is corrupt"): + resumed.run(resume=ResumeMode.ALWAYS) + + def test_composite_workflow_resume_always_rejects_changed_stage( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], From b767633d24ac0632c9592e7d0cc6c0ecca2f65d7 Mon Sep 17 00:00:00 2001 From: Andre Manoel Date: Fri, 12 Jun 2026 20:17:33 +0000 Subject: [PATCH 4/4] fix: address workflow resume review feedback --- docs/concepts/workflow-chaining.md | 2 +- .../pages/concepts/workflow-chaining.mdx | 2 +- .../interface/composite_workflow.py | 66 ++++++++++++++--- .../interface/test_composite_workflow.py | 73 ++++++++++++++++++- 4 files changed, 128 insertions(+), 15 deletions(-) diff --git a/docs/concepts/workflow-chaining.md b/docs/concepts/workflow-chaining.md index 51ead652a..4364081bb 100644 --- a/docs/concepts/workflow-chaining.md +++ b/docs/concepts/workflow-chaining.md @@ -108,7 +108,7 @@ from data_designer.interface import ResumeMode results = workflow.run(resume=ResumeMode.IF_POSSIBLE) ``` -Use `ResumeMode.ALWAYS` when every reusable stage must match the prior workflow metadata. If a stage changed or its selected output is missing, the workflow raises instead of starting fresh. +Use `ResumeMode.ALWAYS` for strict resume before the first recovered checkpoint. A changed stage or missing selected output raises instead of starting fresh. If a matching partial stage resumes successfully, descendants are recreated from that stage's current output. ## Current limits diff --git a/fern/versions/latest/pages/concepts/workflow-chaining.mdx b/fern/versions/latest/pages/concepts/workflow-chaining.mdx index ef0b41aeb..c3fec9039 100644 --- a/fern/versions/latest/pages/concepts/workflow-chaining.mdx +++ b/fern/versions/latest/pages/concepts/workflow-chaining.mdx @@ -112,7 +112,7 @@ from data_designer.interface import ResumeMode results = workflow.run(resume=ResumeMode.IF_POSSIBLE) ``` -Use `ResumeMode.ALWAYS` when every reusable stage must match the prior workflow metadata. If a stage changed or its selected output is missing, the workflow raises instead of starting fresh. +Use `ResumeMode.ALWAYS` for strict resume before the first recovered checkpoint. A changed stage or missing selected output raises instead of starting fresh. If a matching partial stage resumes successfully, descendants are recreated from that stage's current output. ## Current limits diff --git a/packages/data-designer/src/data_designer/interface/composite_workflow.py b/packages/data-designer/src/data_designer/interface/composite_workflow.py index c6ccf1e5f..c766865c8 100644 --- a/packages/data-designer/src/data_designer/interface/composite_workflow.py +++ b/packages/data-designer/src/data_designer/interface/composite_workflow.py @@ -9,11 +9,14 @@ import os import shutil import time +import uuid from collections.abc import Callable, ItemsView, Iterator, KeysView from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING, Any +from pydantic import ValidationError + import data_designer.lazy_heavy_imports as lazy from data_designer.config.analysis.dataset_profiler import DatasetProfilerResults from data_designer.config.base import ProcessorConfig @@ -50,6 +53,12 @@ WORKFLOW_METADATA_FILENAME = "workflow-metadata.json" COMPLETED_STAGE_STATUSES = {"completed", "completed_empty"} RESUMABLE_STAGE_STATUSES = {"running", "failed"} +WORKFLOW_PATH_METADATA_KEYS = ( + "seed_path", + "output_seed_path", + "callback_output_path", + "output_processor_output_path", +) @dataclass(frozen=True) @@ -252,6 +261,7 @@ def run(self, *, resume: ResumeMode = ResumeMode.NEVER) -> CompositeWorkflowResu previous_stage_name: str | None = None previous_stage_fingerprint: str | None = None skipped_upstream_stage: str | None = None + # A stage that runs or resumes may produce new data, so descendants rebuild from its current output. force_rerun_downstream = False for index, stage in enumerate(self._stages): @@ -304,9 +314,10 @@ def run(self, *, resume: ResumeMode = ResumeMode.NEVER) -> CompositeWorkflowResu and prior_stage_metadata.get("fingerprint") == stage_fingerprint ) - if prior_matches and _can_skip_prior_stage(stage, prior_stage_metadata): + if prior_matches and _can_skip_prior_stage(stage, prior_stage_metadata, workflow_path): stage_metadata.update(prior_stage_metadata) - output_seed_path = Path(stage_metadata["output_seed_path"]) + output_seed_path = _resolve_metadata_path(workflow_path, stage_metadata["output_seed_path"]) + _normalize_stage_path_metadata(workflow_path, stage_metadata) output_records = _count_parquet_records(output_seed_path) output_result = _stage_result_from_metadata( workflow_path=workflow_path, @@ -317,7 +328,7 @@ def run(self, *, resume: ResumeMode = ResumeMode.NEVER) -> CompositeWorkflowResu stage_results[stage.name] = output_result stage_output_paths[stage.name] = output_seed_path previous_seed_path = output_seed_path - previous_output_records = output_records + previous_output_records = None if stage_metadata["status"] == "completed_empty" else output_records previous_stage_name = stage.name previous_stage_fingerprint = stage_fingerprint if stage_metadata["status"] == "completed_empty": @@ -341,7 +352,11 @@ def run(self, *, resume: ResumeMode = ResumeMode.NEVER) -> CompositeWorkflowResu "fingerprint": stage_fingerprint, "num_records_requested": num_records, "seeded_from_stage": previous_stage_name, - "seed_path": str(previous_seed_path) if previous_seed_path is not None else None, + "seed_path": ( + _metadata_path_value(workflow_path, previous_seed_path) + if previous_seed_path is not None + else None + ), "config": stage_config.model_dump(mode="json"), } ) @@ -397,10 +412,14 @@ def run(self, *, resume: ResumeMode = ResumeMode.NEVER) -> CompositeWorkflowResu "status": status, "num_records_actual": actual_records, "output_records": output_records, - "output_seed_path": str(output_seed_path), - "callback_output_path": str(callback_output_path) if callback_output_path else None, + "output_seed_path": _metadata_path_value(workflow_path, output_seed_path), + "callback_output_path": ( + _metadata_path_value(workflow_path, callback_output_path) if callback_output_path else None + ), "output_processor_output_path": ( - str(output_result.artifact_storage.base_dataset_path) if stage.output_processors else None + _metadata_path_value(workflow_path, output_result.artifact_storage.base_dataset_path) + if stage.output_processors + else None ), "duration_sec": time.monotonic() - start_time, } @@ -413,7 +432,7 @@ def run(self, *, resume: ResumeMode = ResumeMode.NEVER) -> CompositeWorkflowResu stage_results[stage.name] = output_result stage_output_paths[stage.name] = output_seed_path previous_seed_path = output_seed_path - previous_output_records = output_records + previous_output_records = None if status == "completed_empty" else output_records previous_stage_name = stage.name previous_stage_fingerprint = stage_fingerprint force_rerun_downstream = True @@ -484,7 +503,7 @@ def _get_prior_stage_metadata( return prior_stage -def _can_skip_prior_stage(stage: _WorkflowStage, prior_stage_metadata: dict[str, Any]) -> bool: +def _can_skip_prior_stage(stage: _WorkflowStage, prior_stage_metadata: dict[str, Any], workflow_path: Path) -> bool: if prior_stage_metadata.get("status") not in COMPLETED_STAGE_STATUSES: return False if stage.on_success is not None and stage.on_success_version is None: @@ -493,12 +512,35 @@ def _can_skip_prior_stage(stage: _WorkflowStage, prior_stage_metadata: dict[str, if not isinstance(output_seed_path, str) or not output_seed_path: return False try: - _count_parquet_records(Path(output_seed_path)) + _count_parquet_records(_resolve_metadata_path(workflow_path, output_seed_path)) except DataDesignerWorkflowError: return False return True +def _metadata_path_value(workflow_path: Path, path: Path) -> str: + if path.is_absolute(): + try: + return str(path.relative_to(workflow_path)) + except ValueError: + return str(path) + return str(path) + + +def _resolve_metadata_path(workflow_path: Path, path: str) -> Path: + metadata_path = Path(path) + if metadata_path.is_absolute(): + return metadata_path + return workflow_path / metadata_path + + +def _normalize_stage_path_metadata(workflow_path: Path, stage_metadata: dict[str, Any]) -> None: + for key in WORKFLOW_PATH_METADATA_KEYS: + value = stage_metadata.get(key) + if isinstance(value, str) and value: + stage_metadata[key] = _metadata_path_value(workflow_path, _resolve_metadata_path(workflow_path, value)) + + def _stage_result_from_metadata( *, workflow_path: Path, @@ -549,7 +591,7 @@ def _load_stage_analysis(artifact_storage: ArtifactStorage) -> Any: "column_profiles": metadata.get("column_profiles"), } ) - except Exception: + except ValidationError: return None @@ -703,7 +745,7 @@ def _parquet_files(path: Path) -> list[Path]: def _write_workflow_metadata(workflow_path: Path, metadata: dict[str, Any]) -> None: path = workflow_path / WORKFLOW_METADATA_FILENAME - tmp_path = path.with_name(f"{path.name}.tmp.{os.getpid()}") + tmp_path = path.with_name(f"{path.name}.tmp.{os.getpid()}.{uuid.uuid4().hex}") try: with tmp_path.open("w", encoding="utf-8") as f: json.dump(metadata, f, indent=2, sort_keys=True) diff --git a/packages/data-designer/tests/interface/test_composite_workflow.py b/packages/data-designer/tests/interface/test_composite_workflow.py index 0f935b168..d2b944901 100644 --- a/packages/data-designer/tests/interface/test_composite_workflow.py +++ b/packages/data-designer/tests/interface/test_composite_workflow.py @@ -4,6 +4,7 @@ from __future__ import annotations import json +import shutil from pathlib import Path from unittest.mock import MagicMock @@ -117,6 +118,19 @@ def _load_workflow_metadata(artifact_path: Path, workflow_name: str) -> dict: return json.loads((artifact_path / workflow_name / "workflow-metadata.json").read_text()) +def _mark_stage_resumable(metadata: dict, index: int, status: str) -> None: + metadata["stages"][index]["status"] = status + for key in ( + "num_records_actual", + "output_records", + "output_seed_path", + "callback_output_path", + "output_processor_output_path", + "duration_sec", + ): + metadata["stages"][index].pop(key, None) + + def test_dataset_creation_results_to_config_builder_columns( stub_model_configs: list[ModelConfig], stub_dataset_profiler_results, @@ -473,6 +487,8 @@ def test_composite_workflow_resume_if_possible_skips_stage_with_output_processor ) workflow.add_stage("final", _expression_builder(stub_model_configs, "final", "{{ public_name }} final")) first = workflow.run() + output_processor_dir = first["base"].artifact_storage.base_dataset_path + output_processor_dir_mtime = output_processor_dir.stat().st_mtime_ns output_processor_file = first["base"].artifact_storage.final_dataset_path / "batch_00000.parquet" output_processor_mtime = output_processor_file.stat().st_mtime_ns @@ -490,9 +506,37 @@ def test_composite_workflow_resume_if_possible_skips_stage_with_output_processor assert results.load_dataset().to_dict(orient="records") == [ {"name": "Ada", "public_name": "Ada", "final": "Ada final"} ] + assert output_processor_dir.stat().st_mtime_ns == output_processor_dir_mtime assert output_processor_file.stat().st_mtime_ns == output_processor_mtime +def test_composite_workflow_resume_if_possible_uses_relative_metadata_paths_after_move( + tmp_path: Path, + stub_model_providers: list[ModelProvider], + stub_model_configs: list[ModelConfig], + stub_dataset_profiler_results, +) -> None: + source_artifacts = tmp_path / "source" / "artifacts" + moved_artifacts = tmp_path / "moved" / "artifacts" + data_designer = _data_designer(source_artifacts, stub_model_providers) + _patch_create(data_designer, stub_dataset_profiler_results) + workflow = data_designer.compose_workflow(name="resume-moved") + workflow.add_stage("base", _category_builder(stub_model_configs), num_records=2) + workflow.run() + metadata = _load_workflow_metadata(source_artifacts, "resume-moved") + assert metadata["stages"][0]["output_seed_path"] == "stage-0-base/parquet-files" + + shutil.copytree(source_artifacts, moved_artifacts) + moved_data_designer = _data_designer(moved_artifacts, stub_model_providers) + create_mock = _patch_create(moved_data_designer, stub_dataset_profiler_results) + resumed = moved_data_designer.compose_workflow(name="resume-moved") + resumed.add_stage("base", _category_builder(stub_model_configs), num_records=2) + results = resumed.run(resume=ResumeMode.IF_POSSIBLE) + + assert create_mock.call_count == 0 + assert results.count_records() == 2 + + def test_composite_workflow_resume_if_possible_preserves_completed_empty_skip( stub_artifact_path: Path, stub_model_providers: list[ModelProvider], @@ -648,7 +692,7 @@ def test_composite_workflow_resume_if_possible_delegates_matching_resumable_stag workflow.run() metadata_path = stub_artifact_path / "resume-partial" / "workflow-metadata.json" metadata = json.loads(metadata_path.read_text(encoding="utf-8")) - metadata["stages"][0]["status"] = status + _mark_stage_resumable(metadata, 0, status) metadata_path.write_text(json.dumps(metadata), encoding="utf-8") create_mock.reset_mock() @@ -661,6 +705,33 @@ def test_composite_workflow_resume_if_possible_delegates_matching_resumable_stag assert [call.kwargs["resume"] for call in create_mock.call_args_list] == [ResumeMode.ALWAYS, ResumeMode.NEVER] +def test_composite_workflow_resume_always_reruns_descendants_after_partial_stage( + stub_artifact_path: Path, + stub_model_providers: list[ModelProvider], + stub_model_configs: list[ModelConfig], + stub_dataset_profiler_results, +) -> None: + data_designer = _data_designer(stub_artifact_path, stub_model_providers) + create_mock = _patch_create(data_designer, stub_dataset_profiler_results) + workflow = data_designer.compose_workflow(name="resume-always-partial") + workflow.add_stage("base", _category_builder(stub_model_configs), num_records=2) + workflow.add_stage("copy", _copy_builder(stub_model_configs)) + workflow.run() + metadata_path = stub_artifact_path / "resume-always-partial" / "workflow-metadata.json" + metadata = json.loads(metadata_path.read_text(encoding="utf-8")) + _mark_stage_resumable(metadata, 0, "running") + metadata_path.write_text(json.dumps(metadata), encoding="utf-8") + create_mock.reset_mock() + + resumed = data_designer.compose_workflow(name="resume-always-partial") + resumed.add_stage("base", _category_builder(stub_model_configs), num_records=2) + resumed.add_stage("copy", _expression_builder(stub_model_configs, "category_copy", "{{ category }} v2")) + resumed.run(resume=ResumeMode.ALWAYS) + + assert [call.kwargs["dataset_name"] for call in create_mock.call_args_list] == ["stage-0-base", "stage-1-copy"] + assert [call.kwargs["resume"] for call in create_mock.call_args_list] == [ResumeMode.ALWAYS, ResumeMode.NEVER] + + def test_composite_workflow_resume_always_requires_metadata( stub_artifact_path: Path, stub_model_providers: list[ModelProvider],