Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions docs/next-run-plan.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
# Next v8 pipeline run plan

> Superseded for release-candidate builds as of 2026-06-06. PE-US-data PUF
> support clone rebuilds must use `--donor-imputer-backend regime_aware`, which
> routes through MicroImpute chained donor imputations. The older `qrf` and
> `zi_qrf` backends remain useful only for explicit non-release experiments with
> `puf_support_clone_enabled=False`; release-profile config now fails closed if
> either backend is requested.

## Summary

v7 (2026-04-18 12:19 PM, artifact `live_pe_us_data_rebuild_checkpoint_20260418_microcalibrate_modular`) uses the default `donor_imputer_backend="qrf"`. That path leaves `zero_inflated_vars` empty in `ColumnwiseQRFDonorImputer`, so the imputer fits no zero-classifier and the QRF runs `predict()` over all 3.37 M rows for every target column — including columns that are 99 % zero.

v8 should flip to `--donor-imputer-backend zi_qrf`, which activates the `ZERO_INFLATED_POSITIVE`-whitelist path. On whitelisted columns the imputer fits a `RandomForestClassifier` zero-gate, then only invokes QRF `predict()` on rows the gate sends to the positive branch. On a 97 %-zero column this cuts QRF predict to ~3 % of rows — a large wall-clock win on donor integration.
v8 originally planned to flip to `--donor-imputer-backend zi_qrf`, which activates the `ZERO_INFLATED_POSITIVE`-whitelist path. On whitelisted columns the imputer fits a `RandomForestClassifier` zero-gate, then only invokes QRF `predict()` on rows the gate sends to the positive branch. On a 97 %-zero column this cuts QRF predict to ~3 % of rows — a large wall-clock win on donor integration. That is no longer sufficient for MP/eCPS release candidates because it does not use MicroImpute chained imputations across related donor targets.

## What `zi_qrf` actually covers

Expand Down Expand Up @@ -38,8 +45,8 @@ uv run python -m microplex_us.pipelines.pe_us_data_rebuild_checkpoint \
--targets-db /Users/maxghenis/PolicyEngine/policyengine-us-data-aca-agi-db/policyengine_us_data/storage/calibration/policy_data.db \
--policyengine-us-data-repo /Users/maxghenis/PolicyEngine/policyengine-us-data \
--calibration-backend microcalibrate \
--donor-imputer-backend zi_qrf \
--version-id microcalibrate-zi-qrf-v8 \
--donor-imputer-backend regime_aware \
--version-id microcalibrate-regime-aware-v8 \
--n-synthetic 100000 \
--defer-policyengine-harness \
--defer-policyengine-native-score \
Expand Down
15 changes: 13 additions & 2 deletions src/microplex_us/pipelines/pe_us_data_rebuild.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def default_policyengine_us_data_rebuild_config(
policyengine_calibration_deferred_stage_min_full_oracle_capped_mean_abs_relative_error=None,
policyengine_calibration_deferred_stage_top_family_count=7,
policyengine_calibration_deferred_stage_top_geography_count=4,
donor_imputer_backend="qrf",
donor_imputer_backend="regime_aware",
donor_imputer_condition_selection="pe_prespecified",
donor_imputer_qrf_zero_threshold=0.05,
donor_imputer_excluded_variables=(),
Expand All @@ -91,7 +91,18 @@ def default_policyengine_us_data_rebuild_config(
),
policyengine_prefer_existing_tax_unit_ids=True,
)
return replace(defaults, **overrides)
config = replace(defaults, **overrides)
if (
config.puf_support_clone_enabled
and config.donor_imputer_backend != "regime_aware"
):
raise ValueError(
"PE-US-data PUF support clone rebuilds require "
"donor_imputer_backend='regime_aware' so release candidates use "
"MicroImpute chained donor imputations. Set "
"puf_support_clone_enabled=False for legacy imputer experiments."
)
return config


def default_policyengine_us_data_rebuild_source_providers(
Expand Down
27 changes: 26 additions & 1 deletion tests/pipelines/test_pe_us_data_rebuild.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def test_default_policyengine_us_data_rebuild_config_uses_incumbent_defaults() -
)
assert config.policyengine_calibration_deferred_stage_top_family_count == 7
assert config.policyengine_calibration_deferred_stage_top_geography_count == 4
assert config.donor_imputer_backend == "qrf"
assert config.donor_imputer_backend == "regime_aware"
assert config.donor_imputer_condition_selection == "pe_prespecified"
assert config.donor_imputer_excluded_variables == ()
assert config.puf_support_clone_enabled is True
Expand All @@ -101,6 +101,31 @@ def test_default_policyengine_us_data_rebuild_config_uses_incumbent_defaults() -
assert config.cps_asec_source_year == 2022


def test_default_policyengine_us_data_rebuild_config_rejects_legacy_imputer_for_puf_support_clone() -> (
None
):
try:
default_policyengine_us_data_rebuild_config(donor_imputer_backend="qrf")
except ValueError as exc:
message = str(exc)
assert "PUF support clone rebuilds require" in message
assert "donor_imputer_backend='regime_aware'" in message
else:
raise AssertionError("Expected PUF support clone qrf rebuild to fail")


def test_default_policyengine_us_data_rebuild_config_allows_legacy_imputer_when_puf_support_clone_disabled() -> (
None
):
config = default_policyengine_us_data_rebuild_config(
puf_support_clone_enabled=False,
donor_imputer_backend="qrf",
)

assert config.puf_support_clone_enabled is False
assert config.donor_imputer_backend == "qrf"


def test_default_policyengine_us_data_rebuild_config_respects_calibration_support_override() -> (
None
):
Expand Down
47 changes: 38 additions & 9 deletions tests/pipelines/test_pe_us_data_rebuild_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def test_default_policyengine_us_data_rebuild_checkpoint_config_sets_pe_context(
)
assert config.policyengine_calibration_deferred_stage_top_family_count == 7
assert config.policyengine_calibration_deferred_stage_top_geography_count == 4
assert config.donor_imputer_backend == "qrf"
assert config.donor_imputer_backend == "regime_aware"
assert config.donor_imputer_condition_selection == "pe_prespecified"
assert config.donor_imputer_excluded_variables == ()
assert config.policyengine_baseline_dataset == "/tmp/enhanced_cps_2024.h5"
Expand All @@ -89,6 +89,23 @@ def test_default_policyengine_us_data_rebuild_checkpoint_config_sets_pe_context(
assert config.random_seed == 123


def test_default_policyengine_us_data_rebuild_checkpoint_config_rejects_legacy_imputer_for_puf_support_clone() -> (
None
):
try:
default_policyengine_us_data_rebuild_checkpoint_config(
policyengine_baseline_dataset="/tmp/enhanced_cps_2024.h5",
policyengine_targets_db="/tmp/policy_data.db",
donor_imputer_backend="qrf",
)
except ValueError as exc:
message = str(exc)
assert "PUF support clone rebuilds require" in message
assert "donor_imputer_backend='regime_aware'" in message
else:
raise AssertionError("Expected checkpoint qrf rebuild to fail")


def test_default_policyengine_us_data_rebuild_checkpoint_config_preserves_explicit_calibration_scope() -> (
None
):
Expand Down Expand Up @@ -389,9 +406,12 @@ def _install_resume_stage_test_doubles(monkeypatch, artifact_root, captured) ->

class FakeResumePipeline:
def __init__(self, config=None, *, stage_runtime_writer=None):
self.config = config or default_policyengine_us_data_rebuild_checkpoint_config(
policyengine_baseline_dataset="/tmp/enhanced_cps_2024.h5",
policyengine_targets_db="/tmp/policy_data.db",
self.config = (
config
or default_policyengine_us_data_rebuild_checkpoint_config(
policyengine_baseline_dataset="/tmp/enhanced_cps_2024.h5",
policyengine_targets_db="/tmp/policy_data.db",
)
)
self.stage_runtime_writer = stage_runtime_writer
if stage_runtime_writer is not None:
Expand Down Expand Up @@ -811,7 +831,10 @@ def test_stage_resume_preflight_reports_missing_policyengine_bundle_member(
) -> None:
artifact_root = _write_complete_resume_artifact_root(tmp_path / "run-1")
missing_member = (
artifact_root / "stage_artifacts" / "06_policyengine_entities" / "persons.parquet"
artifact_root
/ "stage_artifacts"
/ "06_policyengine_entities"
/ "persons.parquet"
)
missing_member.unlink()

Expand All @@ -822,10 +845,15 @@ def test_stage_resume_preflight_reports_missing_policyengine_bundle_member(

assert not preflight.ok
missing = {item.label: item for item in preflight.missing}
assert "06_policyengine_entities.pre_calibration_policyengine_entity_tables" in missing
assert missing[
"06_policyengine_entities.pre_calibration_policyengine_entity_tables"
].path == missing_member
assert (
"06_policyengine_entities.pre_calibration_policyengine_entity_tables" in missing
)
assert (
missing[
"06_policyengine_entities.pre_calibration_policyengine_entity_tables"
].path
== missing_member
)


def test_run_policyengine_us_data_rebuild_checkpoint_builds_bundle_and_parity(
Expand Down Expand Up @@ -2304,6 +2332,7 @@ def _write_complete_resume_artifact_root(artifact_root: Path) -> Path:
"complete": True,
"lifecycleStatus": "complete",
"requiredOutputs": required_outputs,
"missingRequiredOutputs": [],
"outputs": outputs,
}
)
Expand Down
Loading