From 84075d484dd75330e0d137d5a779fc65d22e2af0 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sat, 6 Jun 2026 06:05:00 +0100 Subject: [PATCH 1/2] Preserve PUF tax detail leaves on clone collapse --- src/microplex_us/pipelines/us.py | 165 +++++++++++++++++++++++++-- tests/pipelines/test_us.py | 185 ++++++++++++++++++++++++++++--- 2 files changed, 326 insertions(+), 24 deletions(-) diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 0476b1a..38806b5 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -559,6 +559,36 @@ def _load_microplex_pregnancy_rates(year: int) -> dict[str, float]: PUF_SUPPORT_CLONE_SPECIAL_VARIABLES: tuple[str, ...] = ("weeks_unemployed",) +PUF_SUPPORT_CLONE_IRS_DETAIL_COLLAPSE_VARIABLES: tuple[str, ...] = tuple( + dict.fromkeys( + ( + "taxable_interest_income", + "tax_exempt_interest_income", + "interest_income", + "qualified_dividend_income", + "non_qualified_dividend_income", + "ordinary_dividend_income", + "dividend_income", + "taxable_pension_income", + "tax_exempt_pension_income", + "taxable_private_pension_income", + "tax_exempt_private_pension_income", + "pension_income", + "taxable_unemployment_compensation", + "unemployment_compensation", + ) + + PUF_SUPPORT_CLONE_OVERRIDDEN_VARIABLES + ) +) + +PUF_SUPPORT_CLONE_COLLAPSE_OVERLAP_VARIABLES: tuple[str, ...] = tuple( + dict.fromkeys( + PUF_SUPPORT_CLONE_IMPUTED_VARIABLES + + PUF_SUPPORT_CLONE_SPECIAL_VARIABLES + + ("wage_income", "dividend_income", "capital_gains") + ) +) + @lru_cache(maxsize=1) def _default_block_geography() -> BlockGeography: @@ -2182,6 +2212,13 @@ class USMicroplexBuildConfig: puf_support_clone_both_halves_override_variables: tuple[str, ...] = ( PUF_SUPPORT_CLONE_OVERRIDDEN_VARIABLES ) + puf_support_clone_collapse_irs_detail_variables: tuple[str, ...] = ( + PUF_SUPPORT_CLONE_IRS_DETAIL_COLLAPSE_VARIABLES + ) + puf_support_clone_collapse_overlap_variables: tuple[str, ...] = ( + PUF_SUPPORT_CLONE_COLLAPSE_OVERLAP_VARIABLES + ) + puf_support_clone_scale_tax_details_to_cps_totals: bool = False puf_support_clone_refresh_cps_only_fields: bool = True puf_support_clone_cps_refresh_variables: tuple[str, ...] = ( PUF_SUPPORT_CLONE_CPS_REFRESH_VARIABLES @@ -6381,16 +6418,36 @@ def _preserve_cps_measured_puf_clone_totals( integrated_variables: Iterable[str], preclone_columns: set[str], ) -> tuple[pd.DataFrame, dict[str, Any]]: - """Anchor copied PUF tax-detail leaves to CPS-reported income totals.""" + """Optionally anchor copied PUF leaves, then keep leaf identities intact.""" integrated_set = set(integrated_variables) result = clone.copy() passthrough_variables: list[str] = [] dividend_scaled = False + scaling_enabled = bool( + self.config.puf_support_clone_scale_tax_details_to_cps_totals + ) + + if not scaling_enabled: + identity_variables = ( + self._reconcile_puf_support_clone_tax_detail_identities( + original=original, + clone=result, + integrated_variables=integrated_set, + ) + ) + return result, { + "enabled": False, + "passthrough_variables": [], + "dividend_components_scaled_to_cps_total": False, + "identity_reconciled_variables": identity_variables, + } for target, aliases in PUF_SUPPORT_CLONE_CPS_DIRECT_PASSTHROUGH_ALIASES.items(): if target not in result.columns or target not in integrated_set: continue - source = next((alias for alias in aliases if alias in original.columns), None) + source = next( + (alias for alias in aliases if alias in original.columns), None + ) if source is None: continue result[target] = ( @@ -6400,9 +6457,11 @@ def _preserve_cps_measured_puf_clone_totals( ) passthrough_variables.append(target) - for components, total_alias, fallback_first_share in ( - PUF_SUPPORT_CLONE_CPS_SPLIT_TOTALS - ): + for ( + components, + total_alias, + fallback_first_share, + ) in PUF_SUPPORT_CLONE_CPS_SPLIT_TOTALS: if total_alias not in original.columns: continue if not all(column in result.columns for column in components): @@ -6435,9 +6494,9 @@ def _preserve_cps_measured_puf_clone_totals( / component_total.loc[positive_component_total] ).clip(lower=0.0, upper=1.0) result[components[0]] = (cps_total * first_share).to_numpy(copy=True) - result[components[1]] = ( - cps_total * (1.0 - first_share) - ).to_numpy(copy=True) + result[components[1]] = (cps_total * (1.0 - first_share)).to_numpy( + copy=True + ) if total_alias == "pension_income": if "taxable_private_pension_income" in result.columns: result["taxable_private_pension_income"] = result[ @@ -6504,11 +6563,79 @@ def _preserve_cps_measured_puf_clone_totals( ] ) + identity_variables = self._reconcile_puf_support_clone_tax_detail_identities( + original=original, + clone=result, + integrated_variables=integrated_set, + ) return result, { + "enabled": True, "passthrough_variables": sorted(set(passthrough_variables)), "dividend_components_scaled_to_cps_total": dividend_scaled, + "identity_reconciled_variables": identity_variables, } + def _reconcile_puf_support_clone_tax_detail_identities( + self, + *, + original: pd.DataFrame, + clone: pd.DataFrame, + integrated_variables: set[str], + ) -> list[str]: + """Keep PUF tax leaves and their parent totals consistent before collapse.""" + reconciled: list[str] = [] + + def numeric(column: str) -> pd.Series: + return pd.to_numeric(clone[column], errors="coerce").fillna(0.0) + + def assign_if_available(column: str, values: pd.Series) -> None: + if column not in clone.columns and column not in original.columns: + return + clone[column] = values.to_numpy(copy=True) + reconciled.append(column) + + interest_components = ("taxable_interest_income", "tax_exempt_interest_income") + if set(interest_components) & integrated_variables and all( + column in clone.columns for column in interest_components + ): + assign_if_available( + "interest_income", + numeric("taxable_interest_income") + + numeric("tax_exempt_interest_income"), + ) + + dividend_components = ( + "qualified_dividend_income", + "non_qualified_dividend_income", + ) + if set(dividend_components) & integrated_variables and all( + column in clone.columns for column in dividend_components + ): + dividend_total = numeric("qualified_dividend_income") + numeric( + "non_qualified_dividend_income" + ) + assign_if_available("ordinary_dividend_income", dividend_total) + assign_if_available("dividend_income", dividend_total) + + pension_components = ("taxable_pension_income", "tax_exempt_pension_income") + if set(pension_components) & integrated_variables and all( + column in clone.columns for column in pension_components + ): + taxable_pension = numeric("taxable_pension_income") + tax_exempt_pension = numeric("tax_exempt_pension_income") + assign_if_available("taxable_private_pension_income", taxable_pension) + assign_if_available("tax_exempt_private_pension_income", tax_exempt_pension) + assign_if_available("pension_income", taxable_pension + tax_exempt_pension) + + if "taxable_unemployment_compensation" in integrated_variables: + if "taxable_unemployment_compensation" in clone.columns: + assign_if_available( + "unemployment_compensation", + numeric("taxable_unemployment_compensation"), + ) + + return sorted(set(reconciled)) + def _finalize_puf_support_clone_frame( self, *, @@ -6590,10 +6717,26 @@ def _finalize_puf_support_clone_frame( passthrough_override = set( cps_passthrough_summary.get("passthrough_variables", ()) ) + identity_override = set( + cps_passthrough_summary.get("identity_reconciled_variables", ()) + ) + irs_detail_override = ( + integrated_set + & set(self.config.puf_support_clone_collapse_irs_detail_variables) + & preclone_columns + ) + overlap_collapse_override = ( + integrated_set + & set(self.config.puf_support_clone_collapse_overlap_variables) + & preclone_columns + ) collapse_candidates = ( (integrated_set - preclone_columns) | both_halves_override | passthrough_override + | identity_override + | irs_detail_override + | overlap_collapse_override ) - set(generated_entity_id_columns) for variable in sorted(collapse_candidates): if variable in clone.columns: @@ -6626,6 +6769,12 @@ def _finalize_puf_support_clone_frame( "overlap_variables": overlap_variables, "donor_only_variables": donor_only_variables, "both_halves_override_variables": sorted(both_halves_override), + "irs_detail_collapse_override_variables": sorted(irs_detail_override) + if output_mode == "collapse_to_scaffold" + else [], + "overlap_collapse_override_variables": sorted(overlap_collapse_override) + if output_mode == "collapse_to_scaffold" + else [], "collapse_copy_variables": collapse_copy_variables, "cps_only_refresh": cps_refresh_summary, "cps_measured_total_passthrough": cps_passthrough_summary, diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index cb9fd34..c12eba3 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -4230,7 +4230,7 @@ def test_finalize_puf_support_clone_can_collapse_donor_only_values_to_cps_rows( assert result["person_is_puf_clone"].tolist() == [0.0, 0.0] assert result["person_id"].tolist() == [10, 20] assert result["household_id"].tolist() == [1, 2] - assert result["self_employment_income"].tolist() == [75.0, 50.0] + assert result["self_employment_income"].tolist() == [-250.0, 500.0] assert result["taxable_interest_income"].tolist() == [10.0, 20.0] assert result["partnership_s_corp_income"].tolist() == [-700.0, 1_200.0] assert result["state_income_tax_paid"].tolist() == [400.0, 50.0] @@ -4240,14 +4240,17 @@ def test_finalize_puf_support_clone_can_collapse_donor_only_values_to_cps_rows( assert summary["emitted_clone_row_count"] == 0 assert summary["final_row_count"] == 2 assert summary["dropped_generated_entity_id_columns"] == ["tax_unit_id"] - assert "self_employment_income" not in summary["collapse_copy_variables"] assert summary["collapse_copy_variables"] == [ "partnership_s_corp_income", + "self_employment_income", "state_income_tax_paid", "taxable_interest_income", ] + assert summary["overlap_collapse_override_variables"] == [ + "self_employment_income", + ] - def test_finalize_puf_support_clone_preserves_cps_measured_income_totals( + def test_finalize_puf_support_clone_preserves_puf_tax_details_by_default( self, ): pipeline = USMicroplexPipeline( @@ -4263,10 +4266,149 @@ def test_finalize_puf_support_clone_preserves_cps_measured_income_totals( "person_id": [10, 20], "household_id": [1, 2], "age": [45, 62], + "employment_income": [30_000.0, 10_000.0], + "self_employment_income": [500.0, 250.0], + "long_term_capital_gains": [1_000.0, 0.0], + "short_term_capital_gains": [100.0, 0.0], + "capital_gains": [1_100.0, 0.0], "interest_income": [3.0, 4.0], # Regression coverage for preclone components: these may exist on # the CPS scaffold already, but PUF-integrated leaves must still - # be scaled back to the CPS measured total. + # survive collapse back to the scaffold rows. + "taxable_interest_income": [3.0, 4.0], + "tax_exempt_interest_income": [3.0, 4.0], + "dividend_income": [10.0, 5.0], + "qualified_dividend_income": [10.0, 5.0], + "non_qualified_dividend_income": [10.0, 5.0], + "pension_income": [100.0, 200.0], + "taxable_pension_income": [100.0, 200.0], + "tax_exempt_pension_income": [100.0, 200.0], + "unemployment_compensation": [100.0, 0.0], + "taxable_unemployment_compensation": [100.0, 0.0], + } + ) + clone = pd.DataFrame( + { + "person_id": [30, 40], + "household_id": [3, 4], + "age": [45, 62], + "employment_income": [90_000.0, 20_000.0], + "self_employment_income": [-4_000.0, 8_000.0], + "long_term_capital_gains": [50_000.0, -1_000.0], + "short_term_capital_gains": [2_500.0, -500.0], + "capital_gains": [52_500.0, -1_500.0], + "taxable_interest_income": [1_000.0, 0.0], + "tax_exempt_interest_income": [500.0, 0.0], + "qualified_dividend_income": [20.0, 0.0], + "non_qualified_dividend_income": [5.0, 0.0], + "ordinary_dividend_income": [25.0, 0.0], + "dividend_income": [25.0, 0.0], + "taxable_pension_income": [90.0, 0.0], + "tax_exempt_pension_income": [10.0, 0.0], + "taxable_unemployment_compensation": [600.0, 700.0], + } + ) + + result, summary = pipeline._finalize_puf_support_clone_frame( + original=original, + imputed_clone=clone, + donor_source_name="irs_soi_puf_2024", + integrated_variables=[ + "taxable_interest_income", + "tax_exempt_interest_income", + "employment_income", + "self_employment_income", + "long_term_capital_gains", + "short_term_capital_gains", + "capital_gains", + "qualified_dividend_income", + "non_qualified_dividend_income", + "taxable_pension_income", + "tax_exempt_pension_income", + "taxable_unemployment_compensation", + ], + preclone_columns=set(original.columns), + donor_seed_columns=set(clone.columns), + donor_observed=set(clone.columns), + ) + + assert result["employment_income"].tolist() == [90_000.0, 20_000.0] + assert result["self_employment_income"].tolist() == [-4_000.0, 8_000.0] + assert result["long_term_capital_gains"].tolist() == [50_000.0, -1_000.0] + assert result["short_term_capital_gains"].tolist() == [2_500.0, -500.0] + assert result["capital_gains"].tolist() == [52_500.0, -1_500.0] + assert result["taxable_interest_income"].tolist() == [1_000.0, 0.0] + assert result["tax_exempt_interest_income"].tolist() == [500.0, 0.0] + assert result["interest_income"].tolist() == [1_500.0, 0.0] + assert result["taxable_unemployment_compensation"].tolist() == [600.0, 700.0] + assert result["unemployment_compensation"].tolist() == [600.0, 700.0] + assert result["dividend_income"].tolist() == [25.0, 0.0] + assert result["ordinary_dividend_income"].tolist() == [25.0, 0.0] + assert result["qualified_dividend_income"].tolist() == [20.0, 0.0] + assert result["non_qualified_dividend_income"].tolist() == [5.0, 0.0] + assert result["taxable_pension_income"].tolist() == [90.0, 0.0] + assert result["tax_exempt_pension_income"].tolist() == [10.0, 0.0] + assert result["pension_income"].tolist() == [100.0, 0.0] + passthrough = summary["cps_measured_total_passthrough"] + assert passthrough["enabled"] is False + assert passthrough["passthrough_variables"] == [] + assert passthrough["dividend_components_scaled_to_cps_total"] is False + assert set(passthrough["identity_reconciled_variables"]) >= { + "dividend_income", + "interest_income", + "ordinary_dividend_income", + "pension_income", + "unemployment_compensation", + } + assert set(summary["collapse_copy_variables"]) >= { + "dividend_income", + "employment_income", + "interest_income", + "long_term_capital_gains", + "non_qualified_dividend_income", + "ordinary_dividend_income", + "pension_income", + "qualified_dividend_income", + "self_employment_income", + "short_term_capital_gains", + "tax_exempt_interest_income", + "tax_exempt_pension_income", + "taxable_interest_income", + "taxable_pension_income", + "taxable_unemployment_compensation", + "unemployment_compensation", + } + assert set(summary["overlap_collapse_override_variables"]) >= { + "capital_gains", + "employment_income", + "long_term_capital_gains", + "self_employment_income", + "short_term_capital_gains", + "tax_exempt_interest_income", + "tax_exempt_pension_income", + "taxable_interest_income", + "taxable_pension_income", + "taxable_unemployment_compensation", + } + + def test_finalize_puf_support_clone_can_scale_tax_details_to_cps_totals( + self, + ): + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig( + synthesis_backend="seed", + puf_support_clone_enabled=True, + puf_support_clone_output_mode="collapse_to_scaffold", + puf_support_clone_both_halves_override_variables=(), + puf_support_clone_scale_tax_details_to_cps_totals=True, + ) + ) + original = pd.DataFrame( + { + "person_id": [10, 20], + "household_id": [1, 2], + "age": [45, 62], + "interest_income": [3.0, 4.0], "taxable_interest_income": [3.0, 4.0], "tax_exempt_interest_income": [3.0, 4.0], "dividend_income": [10.0, 5.0], @@ -4316,8 +4458,11 @@ def test_finalize_puf_support_clone_preserves_cps_measured_income_totals( assert result["taxable_interest_income"].round(6).tolist() == [2.0, 2.72] assert result["tax_exempt_interest_income"].round(6).tolist() == [1.0, 1.28] + assert result["interest_income"].round(6).tolist() == [3.0, 4.0] assert result["taxable_unemployment_compensation"].tolist() == [100.0, 0.0] + assert result["unemployment_compensation"].tolist() == [100.0, 0.0] assert result["dividend_income"].tolist() == [10.0, 5.0] + assert result["ordinary_dividend_income"].tolist() == [10.0, 5.0] assert result["qualified_dividend_income"].round(6).tolist() == [ 8.0, 3.9, @@ -4334,17 +4479,25 @@ def test_finalize_puf_support_clone_preserves_cps_measured_income_totals( 10.0, 82.0, ] - assert summary["cps_measured_total_passthrough"] == { - "passthrough_variables": [ - "non_qualified_dividend_income", - "qualified_dividend_income", - "tax_exempt_interest_income", - "tax_exempt_pension_income", - "taxable_interest_income", - "taxable_pension_income", - "taxable_unemployment_compensation", - ], - "dividend_components_scaled_to_cps_total": True, + assert result["pension_income"].round(6).tolist() == [100.0, 200.0] + passthrough = summary["cps_measured_total_passthrough"] + assert passthrough["enabled"] is True + assert passthrough["passthrough_variables"] == [ + "non_qualified_dividend_income", + "qualified_dividend_income", + "tax_exempt_interest_income", + "tax_exempt_pension_income", + "taxable_interest_income", + "taxable_pension_income", + "taxable_unemployment_compensation", + ] + assert passthrough["dividend_components_scaled_to_cps_total"] is True + assert set(passthrough["identity_reconciled_variables"]) >= { + "dividend_income", + "interest_income", + "ordinary_dividend_income", + "pension_income", + "unemployment_compensation", } def test_integrate_donor_sources_collapses_puf_support_clone_before_later_donors( @@ -4524,7 +4677,7 @@ def frame_for(name, households, persons, capabilities): assert result.index.tolist() == [0, 1] assert result["person_is_puf_clone"].tolist() == [0.0, 0.0] assert result["hh_weight"].tolist() == [100.0, 200.0] - assert result["self_employment_income"].tolist() == [75.0, 50.0] + assert result["self_employment_income"].tolist() == [-250.0, 500.0] assert result["taxable_interest_income"].tolist() == [10.0, 20.0] assert sorted(result["state_income_tax_paid"].tolist()) == [50.0, 400.0] assert integration["puf_support_clone_summary"]["output_mode"] == ( From ed5f1a1fb25e7813c2f29843d0385e64ffbc40cb Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sat, 6 Jun 2026 07:10:56 +0100 Subject: [PATCH 2/2] Harden eCPS certificate gates and PUF collapse alignment --- .../pipelines/ecps_replacement_comparison.py | 11 ++ .../pipelines/mp300k_artifact_gates.py | 111 +++++++++++++++++- .../pipelines/mp_benchmark_manifest.py | 27 +++++ src/microplex_us/pipelines/us.py | 58 ++++++++- .../test_ecps_replacement_comparison.py | 35 +++++- tests/pipelines/test_mp300k_artifact_gates.py | 104 ++++++++++++++++ tests/pipelines/test_mp300k_gate_inputs.py | 10 ++ tests/pipelines/test_mp_benchmark_manifest.py | 27 +++++ tests/pipelines/test_us.py | 58 +++++++++ 9 files changed, 434 insertions(+), 7 deletions(-) diff --git a/src/microplex_us/pipelines/ecps_replacement_comparison.py b/src/microplex_us/pipelines/ecps_replacement_comparison.py index 0020a71..dcf76cb 100644 --- a/src/microplex_us/pipelines/ecps_replacement_comparison.py +++ b/src/microplex_us/pipelines/ecps_replacement_comparison.py @@ -4,6 +4,7 @@ import argparse import hashlib +import importlib.metadata import json import subprocess from pathlib import Path @@ -526,6 +527,7 @@ def build_sound_ecps_replacement_comparison( baseline_dataset_path=baseline_path, policyengine_targets_db_path=resolved_targets_db, policyengine_us_data_repo=policyengine_us_data_repo, + policyengine_us_version=_installed_policyengine_us_version(), period=period, target_names=target_names, target_scope=target_scope, @@ -1728,6 +1730,7 @@ def _frozen_ecps_baseline_certificate( baseline_dataset_path: Path, policyengine_targets_db_path: Path | None, policyengine_us_data_repo: str | Path | None, + policyengine_us_version: str, period: int, target_names: list[str], target_scope: str, @@ -1787,6 +1790,7 @@ def _frozen_ecps_baseline_certificate( else None ), "policyengine_us_data": _git_repo_descriptor(policyengine_us_data_repo), + "policyengine_us": {"version": str(policyengine_us_version)}, "target_surface": { "target_profile": "pe_native_broad", "target_scope": str(target_scope), @@ -1802,6 +1806,13 @@ def _frozen_ecps_baseline_certificate( } +def _installed_policyengine_us_version() -> str: + try: + return importlib.metadata.version("policyengine-us") + except importlib.metadata.PackageNotFoundError as exc: + raise ValueError("policyengine-us is not installed") from exc + + def _git_repo_descriptor(repo_path: str | Path | None) -> dict[str, Any] | None: if repo_path is None: return None diff --git a/src/microplex_us/pipelines/mp300k_artifact_gates.py b/src/microplex_us/pipelines/mp300k_artifact_gates.py index 36136ab..625c47d 100644 --- a/src/microplex_us/pipelines/mp300k_artifact_gates.py +++ b/src/microplex_us/pipelines/mp300k_artifact_gates.py @@ -113,6 +113,14 @@ "_imputation_source", ) _REQUIRED_BENCHMARK_MANIFEST_EVIDENCE = { + "certificate_type": ( + ("certificate_type",), + ("frozen_ecps_baseline_certificate", "certificate_type"), + ), + "period": ( + ("period",), + ("target_period",), + ), "baseline_dataset.path": ( ("baseline_dataset", "path"), ("baseline_dataset_path",), @@ -136,16 +144,38 @@ ("policyengine_us", "version"), ("policyengine_us_version",), ), + "target_surface.target_profile": ( + ("target_surface", "target_profile"), + ("target_profile",), + ), + "target_surface.target_scope": ( + ("target_surface", "target_scope"), + ("target_scope",), + ("target_scope_filter",), + ), + "target_surface.target_count": ( + ("target_surface", "target_count"), + ("target_count",), + ), + "target_surface.target_names_sha256": ( + ("target_surface", "target_names_sha256"), + ("target_names_sha256",), + ), + "scoring_config.sha256": ( + ("scoring_config", "sha256"), + ("scoring_config_sha256",), + ), "target_db.path": ( ("target_db", "path"), ("target_db_path",), ("targets_db", "path"), - ("policyengine_targets_db",), + ("policyengine_targets_db", "path"), ), "target_db.sha256": ( ("target_db", "sha256"), ("target_db_sha256",), ("targets_db", "sha256"), + ("policyengine_targets_db", "sha256"), ("policyengine_targets_db_sha256",), ), } @@ -238,6 +268,7 @@ def build_mp300k_artifact_gate_report( ecps_comparison_gate = _ecps_comparison_gate( resolved_ecps_comparison, benchmark_evidence=benchmark_evidence, + expected_period=period, ) arch_coverage_gate = _arch_target_coverage_gate( arch_coverage_payload, @@ -987,6 +1018,7 @@ def _ecps_comparison_gate( ecps_comparison_payload: dict[str, Any] | None, *, benchmark_evidence: dict[str, Any] | None = None, + expected_period: int, ) -> dict[str, Any]: if ecps_comparison_payload is None: return _gate( @@ -1022,6 +1054,7 @@ def _ecps_comparison_gate( ecps_comparison_payload, summary, benchmark_evidence=benchmark_evidence, + expected_period=expected_period, ) details.update(contract["details"]) missing_requirements = list(contract["missing_requirements"]) @@ -1101,6 +1134,7 @@ def _ecps_comparison_contract_summary( summary: dict[str, Any], *, benchmark_evidence: dict[str, Any] | None = None, + expected_period: int, ) -> dict[str, Any]: candidate_households = _first_nested_present( payload, @@ -1216,6 +1250,7 @@ def _ecps_comparison_contract_summary( payload, summary, benchmark_evidence=benchmark_evidence, + expected_period=expected_period, ) requirements = { @@ -1255,6 +1290,7 @@ def _frozen_baseline_certificate_summary( summary: dict[str, Any], *, benchmark_evidence: dict[str, Any] | None, + expected_period: int, ) -> dict[str, Any]: certificate = _find_frozen_baseline_certificate(payload) if not isinstance(certificate, dict): @@ -1275,8 +1311,32 @@ def _frozen_baseline_certificate_summary( "actual": schema_version, } ) + certificate_type = certificate.get("certificate_type") + if certificate_type != "frozen_production_ecps_baseline": + mismatches.append( + { + "field": "certificate_type", + "expected": "frozen_production_ecps_baseline", + "actual": certificate_type, + } + ) + certificate_period = certificate.get("period") + try: + certificate_period_int = int(certificate_period) + except (TypeError, ValueError): + certificate_period_int = None + if certificate_period_int != int(expected_period): + mismatches.append( + { + "field": "period", + "expected": int(expected_period), + "actual": certificate_period, + } + ) evidence_values = { + "certificate_type": certificate_type, + "period": certificate_period, "baseline_dataset.sha256": _first_nested_path_value( certificate, ( @@ -1305,6 +1365,28 @@ def _frozen_baseline_certificate_summary( ("policyengine_us_data_commit_sha",), ), ), + "policyengine_us.version": _first_nested_path_value( + certificate, + ( + ("policyengine_us", "version"), + ("policyengine_us_version",), + ), + ), + "target_surface.target_profile": _first_nested_path_value( + certificate, + ( + ("target_surface", "target_profile"), + ("target_profile",), + ), + ), + "target_surface.target_scope": _first_nested_path_value( + certificate, + ( + ("target_surface", "target_scope"), + ("target_scope",), + ("target_scope_filter",), + ), + ), "scoring_config.sha256": _first_nested_path_value( certificate, ( @@ -1356,9 +1438,17 @@ def _frozen_baseline_certificate_summary( ) for evidence_name in ( + "certificate_type", + "period", "baseline_dataset.sha256", "target_db.sha256", "policyengine_us_data.commit", + "policyengine_us.version", + "target_surface.target_profile", + "target_surface.target_scope", + "target_surface.target_count", + "target_surface.target_names_sha256", + "scoring_config.sha256", ): expected_value = (benchmark_evidence or {}).get(evidence_name) certificate_value = evidence_values.get(evidence_name) @@ -1384,7 +1474,10 @@ def _frozen_baseline_certificate_summary( "policyengine_us_data_commit": evidence_values.get( "policyengine_us_data.commit" ), + "policyengine_us_version": evidence_values.get("policyengine_us.version"), "scoring_config_sha256": evidence_values.get("scoring_config.sha256"), + "target_profile": evidence_values.get("target_surface.target_profile"), + "target_scope": evidence_values.get("target_surface.target_scope"), "target_names_sha256": evidence_values.get( "target_surface.target_names_sha256" ), @@ -1422,11 +1515,18 @@ def _certificate_metric(certificate: dict[str, Any], metric_name: str) -> Any: def _valid_certificate_evidence_value(name: str, value: Any) -> bool: - if name == "target_surface.target_count": + if name in {"period", "target_surface.target_count"}: try: return int(value) > 0 except (TypeError, ValueError): return False + if name == "certificate_type": + return value == "frozen_production_ecps_baseline" + if name.endswith(".version") or name in { + "target_surface.target_profile", + "target_surface.target_scope", + }: + return isinstance(value, str) and bool(value) if name.endswith(".sha256"): return ( isinstance(value, str) @@ -2103,6 +2203,13 @@ def _first_nested_path_value( def _valid_benchmark_evidence_value(name: str, value: Any) -> bool: + if name in {"period", "target_surface.target_count"}: + try: + return int(value) > 0 + except (TypeError, ValueError): + return False + if name == "certificate_type": + return value == "frozen_production_ecps_baseline" if not isinstance(value, str) or not value: return False if name.endswith(".sha256"): diff --git a/src/microplex_us/pipelines/mp_benchmark_manifest.py b/src/microplex_us/pipelines/mp_benchmark_manifest.py index 5ff579f..8f46602 100644 --- a/src/microplex_us/pipelines/mp_benchmark_manifest.py +++ b/src/microplex_us/pipelines/mp_benchmark_manifest.py @@ -20,6 +20,11 @@ def build_mp_benchmark_manifest( target_db_path: str | Path, period: int = 2024, target_profile: str = "pe_native_broad", + target_scope: str = "all", + target_count: int, + target_names_sha256: str, + scoring_config_sha256: str, + certificate_type: str = "frozen_production_ecps_baseline", policyengine_us_data_repo: str | Path | None = None, policyengine_us_data_commit: str | None = None, policyengine_us_version: str | None = None, @@ -42,9 +47,18 @@ def build_mp_benchmark_manifest( version = policyengine_us_version or _installed_policyengine_us_version() return { "schema_version": 1, + "certificate_type": str(certificate_type), "generated_at": datetime.now(UTC).isoformat(), "period": int(period), "target_profile": str(target_profile), + "target_scope": str(target_scope), + "target_surface": { + "target_profile": str(target_profile), + "target_scope": str(target_scope), + "target_count": int(target_count), + "target_names_sha256": str(target_names_sha256), + }, + "scoring_config": {"sha256": str(scoring_config_sha256)}, "baseline_dataset": baseline_dataset, "policyengine_us_data": repo_descriptor, "policyengine_us": {"version": version}, @@ -142,6 +156,14 @@ def main(argv: list[str] | None = None) -> int: parser.add_argument("--output-json", required=True) parser.add_argument("--period", type=int, default=2024) parser.add_argument("--target-profile", default="pe_native_broad") + parser.add_argument("--target-scope", default="all") + parser.add_argument("--target-count", type=int, required=True) + parser.add_argument("--target-names-sha256", required=True) + parser.add_argument("--scoring-config-sha256", required=True) + parser.add_argument( + "--certificate-type", + default="frozen_production_ecps_baseline", + ) parser.add_argument("--policyengine-us-data-repo") parser.add_argument("--policyengine-us-data-commit") parser.add_argument("--policyengine-us-version") @@ -161,6 +183,11 @@ def main(argv: list[str] | None = None) -> int: target_db_path=args.target_db, period=args.period, target_profile=args.target_profile, + target_scope=args.target_scope, + target_count=args.target_count, + target_names_sha256=args.target_names_sha256, + scoring_config_sha256=args.scoring_config_sha256, + certificate_type=args.certificate_type, policyengine_us_data_repo=args.policyengine_us_data_repo, policyengine_us_data_commit=args.policyengine_us_data_commit, policyengine_us_version=args.policyengine_us_version, diff --git a/src/microplex_us/pipelines/us.py b/src/microplex_us/pipelines/us.py index 38806b5..f351a75 100644 --- a/src/microplex_us/pipelines/us.py +++ b/src/microplex_us/pipelines/us.py @@ -155,6 +155,7 @@ LOGGER = logging.getLogger(__name__) PUF_SUPPORT_CLONE_FLAG_COLUMN = "person_is_puf_clone" +PUF_SUPPORT_CLONE_SOURCE_ROW_ID_COLUMN = "_puf_support_clone_source_row_id" PUF_SUPPORT_CLONE_IMPUTED_VARIABLES: tuple[str, ...] = ( "employment_income", @@ -6250,6 +6251,10 @@ def _ordered_donor_inputs_for_puf_support_clone( def _prepare_puf_support_clone_frame(self, original: pd.DataFrame) -> pd.DataFrame: """Create a zero-stored-weight PUF clone frame from CPS support rows.""" clone = original.copy() + clone[PUF_SUPPORT_CLONE_SOURCE_ROW_ID_COLUMN] = np.arange( + len(clone), + dtype=np.int64, + ) structural_id_columns = {"person_id", *ENTITY_ID_COLUMNS.values()} for column in sorted(structural_id_columns & set(clone.columns)): series = clone[column] @@ -6713,7 +6718,52 @@ def _finalize_puf_support_clone_frame( output_mode = self.config.puf_support_clone_output_mode collapse_copy_variables: list[str] = [] + source_row_alignment: dict[str, Any] = { + "enabled": False, + "column": PUF_SUPPORT_CLONE_SOURCE_ROW_ID_COLUMN, + } if output_mode == "collapse_to_scaffold": + if PUF_SUPPORT_CLONE_SOURCE_ROW_ID_COLUMN not in clone.columns: + raise ValueError( + "PUF support clone collapse requires " + f"{PUF_SUPPORT_CLONE_SOURCE_ROW_ID_COLUMN}" + ) + source_row_id = pd.to_numeric( + clone[PUF_SUPPORT_CLONE_SOURCE_ROW_ID_COLUMN], + errors="coerce", + ) + if source_row_id.isna().any(): + raise ValueError( + "PUF support clone source-row ids must be complete before collapse" + ) + source_row_index = source_row_id.astype(np.int64) + if source_row_index.duplicated().any(): + raise ValueError( + "PUF support clone source-row ids must be unique before collapse" + ) + expected_index = pd.Index(range(len(original)), dtype=np.int64) + if not set(source_row_index.to_numpy()).issubset(set(expected_index)): + raise ValueError( + "PUF support clone source-row ids are outside the CPS scaffold" + ) + aligned_clone = clone.assign( + **{PUF_SUPPORT_CLONE_SOURCE_ROW_ID_COLUMN: source_row_index} + ).set_index(PUF_SUPPORT_CLONE_SOURCE_ROW_ID_COLUMN, drop=False) + aligned_clone = aligned_clone.reindex(expected_index) + if aligned_clone.isna().all(axis=1).any(): + raise ValueError( + "PUF support clone source-row ids do not cover the CPS scaffold" + ) + source_row_alignment = { + "enabled": True, + "column": PUF_SUPPORT_CLONE_SOURCE_ROW_ID_COLUMN, + "row_count": int(len(aligned_clone)), + "clone_was_reordered": bool( + not source_row_index.reset_index(drop=True).equals( + pd.Series(range(len(source_row_index)), dtype=np.int64) + ) + ), + } passthrough_override = set( cps_passthrough_summary.get("passthrough_variables", ()) ) @@ -6739,12 +6789,14 @@ def _finalize_puf_support_clone_frame( | overlap_collapse_override ) - set(generated_entity_id_columns) for variable in sorted(collapse_candidates): - if variable in clone.columns: - original[variable] = clone[variable].to_numpy(copy=True) + if variable in aligned_clone.columns: + original[variable] = aligned_clone[variable].to_numpy(copy=True) collapse_copy_variables.append(variable) combined = original.reset_index(drop=True) emitted_clone_row_count = 0 else: + if PUF_SUPPORT_CLONE_SOURCE_ROW_ID_COLUMN in clone.columns: + clone = clone.drop(columns=[PUF_SUPPORT_CLONE_SOURCE_ROW_ID_COLUMN]) for column in sorted(set(clone.columns) - set(original.columns)): original[column] = 0.0 for column in sorted(set(original.columns) - set(clone.columns)): @@ -6778,6 +6830,7 @@ def _finalize_puf_support_clone_frame( "collapse_copy_variables": collapse_copy_variables, "cps_only_refresh": cps_refresh_summary, "cps_measured_total_passthrough": cps_passthrough_summary, + "source_row_alignment": source_row_alignment, "dropped_generated_entity_id_columns": generated_entity_id_columns, "variable_surface": { "ecps_imputed_variables": list(PUF_SUPPORT_CLONE_IMPUTED_VARIABLES), @@ -6828,6 +6881,7 @@ def _integrate_donor_sources( "is_head", "is_spouse", "is_dependent", + PUF_SUPPORT_CLONE_SOURCE_ROW_ID_COLUMN, } rng = np.random.default_rng(self.config.random_seed) _emit_us_pipeline_progress( diff --git a/tests/pipelines/test_ecps_replacement_comparison.py b/tests/pipelines/test_ecps_replacement_comparison.py index 0b864d3..406bdda 100644 --- a/tests/pipelines/test_ecps_replacement_comparison.py +++ b/tests/pipelines/test_ecps_replacement_comparison.py @@ -37,6 +37,15 @@ ] +@pytest.fixture(autouse=True) +def _pin_policyengine_us_version(monkeypatch): + monkeypatch.setattr( + ecps, + "_installed_policyengine_us_version", + lambda: "1.587.0", + ) + + def _write_minimal_policyengine_dataset( path: Path, *, @@ -378,6 +387,11 @@ def _benchmark_manifest( baseline_dataset = dict(certificate["baseline_dataset"]) target_db = dict(certificate["target_db"]) policyengine_us_data = dict(certificate["policyengine_us_data"]) + policyengine_us = dict(certificate["policyengine_us"]) + certificate_type = certificate["certificate_type"] + period = certificate["period"] + target_surface = dict(certificate["target_surface"]) + scoring_config = {"sha256": certificate["scoring_config"]["sha256"]} else: baseline_dataset = { "path": "/tmp/enhanced_cps_2024.h5", @@ -391,15 +405,29 @@ def _benchmark_manifest( "repo": "PolicyEngine/policyengine-us-data", "commit": "b" * 40, } + policyengine_us = {"version": "1.587.0"} + certificate_type = "frozen_production_ecps_baseline" + period = 2024 + target_surface = { + "target_profile": "pe_native_broad", + "target_scope": "national", + "target_count": 150, + "target_names_sha256": "d" * 64, + } + scoring_config = {"sha256": "e" * 64} path.write_text( json.dumps( { "schema_version": 1, - "period": 2024, - "target_profile": "pe_native_broad", + "certificate_type": certificate_type, + "period": period, + "target_profile": target_surface["target_profile"], + "target_scope": target_surface["target_scope"], + "target_surface": target_surface, + "scoring_config": scoring_config, "baseline_dataset": baseline_dataset, "policyengine_us_data": policyengine_us_data, - "policyengine_us": {"version": "1.587.0"}, + "policyengine_us": policyengine_us, "target_db": target_db, } ) @@ -445,6 +473,7 @@ def test_sound_ecps_replacement_comparison_satisfies_gate_contract( assert certificate["baseline_dataset"]["sha256"] assert certificate["target_db"]["sha256"] assert certificate["policyengine_us_data"]["commit"] + assert certificate["policyengine_us"]["version"] assert ( certificate["baseline_metrics"]["baseline_enhanced_cps_native_loss"] == (summary["baseline_enhanced_cps_native_loss"]) diff --git a/tests/pipelines/test_mp300k_artifact_gates.py b/tests/pipelines/test_mp300k_artifact_gates.py index 9184a09..a06a956 100644 --- a/tests/pipelines/test_mp300k_artifact_gates.py +++ b/tests/pipelines/test_mp300k_artifact_gates.py @@ -112,8 +112,17 @@ def _write_benchmark_manifest(path: Path) -> None: json.dumps( { "schema_version": 1, + "certificate_type": "frozen_production_ecps_baseline", "period": 2024, "target_profile": "pe_native_broad", + "target_scope": "national", + "target_surface": { + "target_profile": "pe_native_broad", + "target_scope": "national", + "target_count": 150, + "target_names_sha256": "d" * 64, + }, + "scoring_config": {"sha256": "e" * 64}, "baseline_dataset": { "path": "/tmp/enhanced_cps_2024.h5", "sha256": "a" * 64, @@ -239,6 +248,7 @@ def _sound_ecps_comparison_payload( "repo": "PolicyEngine/policyengine-us-data", "commit": "b" * 40, }, + "policyengine_us": {"version": "1.587.0"}, "target_surface": { "target_profile": "pe_native_broad", "target_scope": "national", @@ -351,10 +361,17 @@ def test_benchmark_manifest_gate_requires_pinned_release_evidence(tmp_path): assert record["summary"]["status"] == "failed" assert benchmark_gate["status"] == "fail" assert benchmark_gate["details"]["missing_evidence"] == [ + "certificate_type", + "period", "baseline_dataset.path", "baseline_dataset.sha256", "policyengine_us_data.commit", "policyengine_us.version", + "target_surface.target_profile", + "target_surface.target_scope", + "target_surface.target_count", + "target_surface.target_names_sha256", + "scoring_config.sha256", "target_db.path", "target_db.sha256", ] @@ -464,6 +481,93 @@ def test_ecps_comparison_gate_rejects_benchmark_certificate_mismatch(tmp_path): ] +@pytest.mark.parametrize( + ("mutate_certificate", "expected_field"), + [ + ( + lambda certificate: certificate.update( + {"certificate_type": "live_recomputed_ecps_baseline"} + ), + "certificate_type", + ), + (lambda certificate: certificate.update({"period": 2025}), "period"), + ( + lambda certificate: certificate["policyengine_us"].update( + {"version": "1.999.0"} + ), + "policyengine_us.version", + ), + ( + lambda certificate: certificate["target_surface"].update( + {"target_profile": "pe_native_narrow"} + ), + "target_surface.target_profile", + ), + ( + lambda certificate: certificate["target_surface"].update( + {"target_scope": "state"} + ), + "target_surface.target_scope", + ), + ( + lambda certificate: certificate["target_surface"].update( + {"target_count": 149} + ), + "target_surface.target_count", + ), + ( + lambda certificate: certificate["target_surface"].update( + {"target_names_sha256": "f" * 64} + ), + "target_surface.target_names_sha256", + ), + ( + lambda certificate: certificate["scoring_config"].update( + {"sha256": "f" * 64} + ), + "scoring_config.sha256", + ), + ], +) +def test_ecps_comparison_gate_rejects_stale_frozen_surface_certificate( + tmp_path, + mutate_certificate, + expected_field, +): + artifact_dir = tmp_path / "artifact" + artifact_dir.mkdir() + _write_contract_policyengine_dataset(artifact_dir / "candidate.h5") + baseline_dataset = _write_contract_policyengine_dataset(tmp_path / "baseline.h5") + benchmark_manifest = tmp_path / "benchmark_manifest.json" + _write_benchmark_manifest(benchmark_manifest) + _write_artifact_manifest(artifact_dir, baseline_dataset=baseline_dataset) + payload = _sound_ecps_comparison_payload(candidate_loss=0.10) + mutate_certificate(payload["frozen_ecps_baseline_certificate"]) + + report_path = write_mp300k_artifact_gate_report( + artifact_dir, + ecps_comparison_payload=payload, + arch_coverage_payload=_arch_coverage_payload(), + runtime_smoke_payload={"runtime_ratio": 1.0}, + benchmark_manifest_path=benchmark_manifest, + compute_native_scores=False, + update_manifest=False, + ) + + record = json.loads(report_path.read_text()) + ecps_gate = record["gates"]["ecps_comparison"] + certificate_details = ecps_gate["details"]["frozen_ecps_baseline_certificate"] + mismatch_fields = { + mismatch["field"] for mismatch in certificate_details["mismatches"] + } + + assert record["summary"]["status"] == "failed" + assert ecps_gate["status"] == "fail" + assert expected_field in ( + mismatch_fields | set(certificate_details["missing_evidence"]) + ) + + def test_core_benchmark_floor_accepts_aca_enrollment_family_alias(tmp_path): artifact_dir = tmp_path / "artifact" artifact_dir.mkdir() diff --git a/tests/pipelines/test_mp300k_gate_inputs.py b/tests/pipelines/test_mp300k_gate_inputs.py index d8866e3..8ae3329 100644 --- a/tests/pipelines/test_mp300k_gate_inputs.py +++ b/tests/pipelines/test_mp300k_gate_inputs.py @@ -88,8 +88,17 @@ def _write_benchmark_manifest(path: Path) -> None: json.dumps( { "schema_version": 1, + "certificate_type": "frozen_production_ecps_baseline", "period": 2024, "target_profile": "pe_native_broad", + "target_scope": "national", + "target_surface": { + "target_profile": "pe_native_broad", + "target_scope": "national", + "target_count": 150, + "target_names_sha256": "d" * 64, + }, + "scoring_config": {"sha256": "e" * 64}, "baseline_dataset": { "path": "/tmp/enhanced_cps_2024.h5", "sha256": "a" * 64, @@ -199,6 +208,7 @@ def _sound_ecps_comparison_payload() -> dict[str, object]: "repo": "PolicyEngine/policyengine-us-data", "commit": "b" * 40, }, + "policyengine_us": {"version": "1.587.0"}, "target_surface": { "target_profile": "pe_native_broad", "target_scope": "national", diff --git a/tests/pipelines/test_mp_benchmark_manifest.py b/tests/pipelines/test_mp_benchmark_manifest.py index 54a25b7..29fe19b 100644 --- a/tests/pipelines/test_mp_benchmark_manifest.py +++ b/tests/pipelines/test_mp_benchmark_manifest.py @@ -35,13 +35,26 @@ def test_build_mp_benchmark_manifest_pins_release_inputs(tmp_path): target_db_path=target_db, period=2024, target_profile="pe_native_broad", + target_scope="national", + target_count=150, + target_names_sha256="d" * 64, + scoring_config_sha256="e" * 64, policyengine_us_data_commit="b" * 40, policyengine_us_version="1.587.0", ) assert manifest["schema_version"] == 1 + assert manifest["certificate_type"] == "frozen_production_ecps_baseline" assert manifest["period"] == 2024 assert manifest["target_profile"] == "pe_native_broad" + assert manifest["target_scope"] == "national" + assert manifest["target_surface"] == { + "target_profile": "pe_native_broad", + "target_scope": "national", + "target_count": 150, + "target_names_sha256": "d" * 64, + } + assert manifest["scoring_config"] == {"sha256": "e" * 64} assert manifest["baseline_dataset"]["path"] == str(baseline.resolve()) assert manifest["baseline_dataset"]["sha256"] == _sha256(baseline_contents) assert manifest["target_db"]["path"] == str(target_db.resolve()) @@ -67,6 +80,14 @@ def test_main_writes_mp_benchmark_manifest(tmp_path, capsys): "c" * 40, "--policyengine-us-version", "1.587.0", + "--target-scope", + "national", + "--target-count", + "150", + "--target-names-sha256", + "d" * 64, + "--scoring-config-sha256", + "e" * 64, ] ) @@ -110,6 +131,9 @@ def test_dirty_policyengine_us_data_repo_is_rejected_unless_explicit(tmp_path): target_db_path=target_db, policyengine_us_data_repo=repo, policyengine_us_version="1.587.0", + target_count=150, + target_names_sha256="d" * 64, + scoring_config_sha256="e" * 64, ) manifest = build_mp_benchmark_manifest( @@ -117,6 +141,9 @@ def test_dirty_policyengine_us_data_repo_is_rejected_unless_explicit(tmp_path): target_db_path=target_db, policyengine_us_data_repo=repo, policyengine_us_version="1.587.0", + target_count=150, + target_names_sha256="d" * 64, + scoring_config_sha256="e" * 64, allow_dirty_policyengine_us_data=True, ) diff --git a/tests/pipelines/test_us.py b/tests/pipelines/test_us.py index c12eba3..6a91503 100644 --- a/tests/pipelines/test_us.py +++ b/tests/pipelines/test_us.py @@ -4202,6 +4202,7 @@ def test_finalize_puf_support_clone_can_collapse_donor_only_values_to_cps_rows( "person_id": [30, 40], "household_id": [3, 4], "age": [45, 62], + us_pipeline_module.PUF_SUPPORT_CLONE_SOURCE_ROW_ID_COLUMN: [0, 1], "self_employment_income": [-250.0, 500.0], "taxable_interest_income": [10.0, 20.0], "partnership_s_corp_income": [-700.0, 1_200.0], @@ -4249,6 +4250,12 @@ def test_finalize_puf_support_clone_can_collapse_donor_only_values_to_cps_rows( assert summary["overlap_collapse_override_variables"] == [ "self_employment_income", ] + assert summary["source_row_alignment"] == { + "enabled": True, + "column": us_pipeline_module.PUF_SUPPORT_CLONE_SOURCE_ROW_ID_COLUMN, + "row_count": 2, + "clone_was_reordered": False, + } def test_finalize_puf_support_clone_preserves_puf_tax_details_by_default( self, @@ -4292,6 +4299,7 @@ def test_finalize_puf_support_clone_preserves_puf_tax_details_by_default( "person_id": [30, 40], "household_id": [3, 4], "age": [45, 62], + us_pipeline_module.PUF_SUPPORT_CLONE_SOURCE_ROW_ID_COLUMN: [0, 1], "employment_income": [90_000.0, 20_000.0], "self_employment_income": [-4_000.0, 8_000.0], "long_term_capital_gains": [50_000.0, -1_000.0], @@ -4390,6 +4398,55 @@ def test_finalize_puf_support_clone_preserves_puf_tax_details_by_default( "taxable_pension_income", "taxable_unemployment_compensation", } + assert summary["source_row_alignment"]["clone_was_reordered"] is False + + def test_finalize_puf_support_clone_aligns_shuffled_clone_by_source_row_id( + self, + ): + pipeline = USMicroplexPipeline( + USMicroplexBuildConfig( + synthesis_backend="seed", + puf_support_clone_enabled=True, + puf_support_clone_output_mode="collapse_to_scaffold", + puf_support_clone_both_halves_override_variables=(), + ) + ) + original = pd.DataFrame( + { + "person_id": [10, 20], + "household_id": [1, 2], + "age": [45, 62], + "self_employment_income": [75.0, 50.0], + } + ) + clone = pd.DataFrame( + { + "person_id": [40, 30], + "household_id": [4, 3], + "age": [62, 45], + us_pipeline_module.PUF_SUPPORT_CLONE_SOURCE_ROW_ID_COLUMN: [1, 0], + "self_employment_income": [500.0, -250.0], + } + ) + + result, summary = pipeline._finalize_puf_support_clone_frame( + original=original, + imputed_clone=clone, + donor_source_name="irs_soi_puf_2024", + integrated_variables=["self_employment_income"], + preclone_columns=set(original.columns), + donor_seed_columns=set(clone.columns), + donor_observed=set(clone.columns), + ) + + assert result["person_id"].tolist() == [10, 20] + assert result["self_employment_income"].tolist() == [-250.0, 500.0] + assert summary["source_row_alignment"] == { + "enabled": True, + "column": us_pipeline_module.PUF_SUPPORT_CLONE_SOURCE_ROW_ID_COLUMN, + "row_count": 2, + "clone_was_reordered": True, + } def test_finalize_puf_support_clone_can_scale_tax_details_to_cps_totals( self, @@ -4426,6 +4483,7 @@ def test_finalize_puf_support_clone_can_scale_tax_details_to_cps_totals( "person_id": [30, 40], "household_id": [3, 4], "age": [45, 62], + us_pipeline_module.PUF_SUPPORT_CLONE_SOURCE_ROW_ID_COLUMN: [0, 1], "taxable_interest_income": [1_000.0, 0.0], "tax_exempt_interest_income": [500.0, 0.0], "qualified_dividend_income": [20.0, 0.0],