From f4d3acdac1f6a5ba2c4b7d3b9322d12bbef637e9 Mon Sep 17 00:00:00 2001 From: Max Ghenis Date: Sat, 6 Jun 2026 07:59:54 +0100 Subject: [PATCH] Chain MicroImpute donor blocks --- pyproject.toml | 2 +- src/microplex_us/pipelines/donor_imputers.py | 89 ++++++++++-------- src/microplex_us/variables.py | 91 +++++++++++++++++-- .../test_regime_aware_donor_imputer.py | 30 ++++++ tests/test_variables.py | 7 +- uv.lock | 10 +- 6 files changed, 176 insertions(+), 53 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 9e9d0f6..856c48b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ hf = [ "huggingface_hub>=0.24", ] policyengine = [ - "microimpute @ git+https://github.com/PolicyEngine/microimpute.git@27d23090dcf04c2c30ae369b0a209e27eb3659f0 ; python_full_version >= '3.12' and python_full_version < '3.15'", + "microimpute==3.1.0; python_full_version >= '3.12' and python_full_version < '3.15'", "policyengine-us==1.715.2; python_version >= '3.11' and python_version < '3.15'", "spm-calculator>=0.3.1", # Standalone tax-unit construction engine (the extraction of eCPS's diff --git a/src/microplex_us/pipelines/donor_imputers.py b/src/microplex_us/pipelines/donor_imputers.py index 724ef20..8e545ea 100644 --- a/src/microplex_us/pipelines/donor_imputers.py +++ b/src/microplex_us/pipelines/donor_imputers.py @@ -138,13 +138,13 @@ def generate( class RegimeAwareDonorImputer: - """Donor imputer that wraps `microimpute.ZeroInflatedImputer` per column. + """Donor imputer that wraps one chained `ZeroInflatedImputer` per block. - Each target is fit with an independent `ZeroInflatedImputer`, which + The whole target block is fit with one `ZeroInflatedImputer`, which auto-detects one of seven regimes (THREE_SIGN / ZI_POSITIVE / ZI_NEGATIVE / SIGN_ONLY / POSITIVE_ONLY / NEGATIVE_ONLY / - DEGENERATE_ZERO) from the training distribution and composes a - gate classifier + one or two base imputers as appropriate. + DEGENERATE_ZERO) for each target and composes a gate classifier + one or + two base imputers as appropriate. Key advantages over `ColumnwiseQRFDonorImputer`: @@ -157,10 +157,12 @@ class RegimeAwareDonorImputer: tripartite gate routes to sign-specific base imputers that each see only one sign of training data. - This class is a thin columnwise adapter: one `ZeroInflatedImputer` - is fit per target, using `microimpute.QRF` as the base. Fit and - generate work column-by-column so memory scales with the single - largest base imputer, not with the total target count. + This class is a thin block adapter: the target order is passed through + to microimpute so target i+1 conditions on the realized imputation for + target i. That preserves cross-target donor relationships such as + income totals, losses, interest, dividends, pensions, and deduction + leaves instead of independently drawing each target from the same + original predictor surface. """ def __init__( @@ -177,6 +179,7 @@ def __init__( self.classifier_type = str(classifier_type) self.seed = int(seed) self._fitted: dict[str, Any] = {} + self._fitted_columns: tuple[str, ...] = () self._regimes: dict[str, str] = {} def fit( @@ -205,28 +208,33 @@ def fit( from microimpute.models.zero_inflated import ZeroInflatedImputer self._fitted = {} + self._fitted_columns = () self._regimes = {} - for column in self.target_vars: - subset = data[self.condition_vars + [column]].dropna() - if len(subset) < 25: - continue - # base_imputer_kwargs={} because microimpute 2.x's - # ZeroInflatedImputer._fit_base_single already passes - # log_level="ERROR" to the base, and duplicating it here - # raises TypeError. Upstream fix tracked. - wrapper = ZeroInflatedImputer( - base_imputer_class=QRF, - base_imputer_kwargs={}, - classifier_type=self.classifier_type, - seed=self.seed, - ) - fitted = wrapper.fit( - subset, - predictors=list(self.condition_vars), - imputed_variables=[column], - ) - self._fitted[column] = fitted - self._regimes[column] = wrapper.get_regime(column) + subset = ( + data[self.condition_vars + self.target_vars] + .replace([np.inf, -np.inf], np.nan) + .dropna() + ) + if len(subset) < 25: + return self + + wrapper = ZeroInflatedImputer( + base_imputer_class=QRF, + base_imputer_kwargs={}, + classifier_type=self.classifier_type, + sequential=True, + seed=self.seed, + ) + fitted = wrapper.fit( + subset, + predictors=list(self.condition_vars), + imputed_variables=list(self.target_vars), + ) + self._fitted_columns = tuple(self.target_vars) + self._fitted = {column: fitted for column in self._fitted_columns} + self._regimes = { + column: wrapper.get_regime(column) for column in self._fitted_columns + } return self def generate( @@ -235,19 +243,20 @@ def generate( seed: int | None = None, ) -> pd.DataFrame: synthetic = conditions.copy().reset_index(drop=True) - master_seed = self.seed if seed is None else int(seed) - master_rng = np.random.default_rng(master_seed) + fitted = next(iter(self._fitted.values()), None) + if fitted is None: + for column in self.target_vars: + synthetic[column] = np.nan + return synthetic + + prediction_seed = self.seed if seed is None else int(seed) + self._reset_prediction_rngs(fitted, seed=prediction_seed) + preds = fitted.predict(synthetic[self.condition_vars]) for column in self.target_vars: - fitted = self._fitted.get(column) - if fitted is None: + if column in preds.columns: + synthetic[column] = preds[column].to_numpy(dtype=float) + else: synthetic[column] = np.nan - continue - column_seed = int( - master_rng.integers(0, np.iinfo(np.int32).max, dtype=np.int64) - ) - self._reset_prediction_rngs(fitted, seed=column_seed) - preds = fitted.predict(synthetic[self.condition_vars]) - synthetic[column] = preds[column].to_numpy(dtype=float) return synthetic def _reset_prediction_rngs( diff --git a/src/microplex_us/variables.py b/src/microplex_us/variables.py index b620f28..68fb7f5 100644 --- a/src/microplex_us/variables.py +++ b/src/microplex_us/variables.py @@ -2,6 +2,7 @@ from __future__ import annotations +from collections import defaultdict from collections.abc import Callable, Iterable, Mapping from dataclasses import dataclass, field from enum import Enum @@ -834,6 +835,66 @@ def restore_dividend_components_from_composition(frame: pd.DataFrame) -> pd.Data restore_frame=restore_dividend_components_from_composition, ) +DONOR_CHAIN_PRIORITY: tuple[str, ...] = ( + "income", + "employment_income", + "employment_income_before_lsr", + "self_employment_income", + "self_employment_income_before_lsr", + "sstb_self_employment_income", + "sstb_self_employment_income_before_lsr", + "partnership_s_corp_income", + "tax_unit_partnership_s_corp_income", + "rental_income", + "rental_income_positive", + "rental_income_negative", + "farm_income", + "farm_operations_income", + "taxable_interest_income", + "tax_exempt_interest_income", + "dividend_income", + "ordinary_dividend_income", + "qualified_dividend_income", + "non_qualified_dividend_income", + DIVIDEND_SHARE_COLUMN, + "short_term_capital_gains", + "long_term_capital_gains", + "net_capital_gains", + "social_security", + "social_security_retirement", + "social_security_disability", + "social_security_survivors", + "social_security_dependents", + "taxable_social_security", + "pension_income", + "taxable_pension_income", + "tax_exempt_pension_income", + "unemployment_compensation", + "taxable_unemployment_compensation", + "health_savings_account_ald", + "self_employed_health_insurance_ald", + "self_employed_pension_contribution_ald", + "self_employed_pension_contributions", + "traditional_401k_contributions", + "roth_401k_contributions", + "traditional_ira_contributions", + "roth_ira_contributions", + "ira_deduction", + "student_loan_interest", + "state_income_tax_paid", + "real_estate_tax_paid", + "mortgage_interest_paid", + "charitable_cash", + "charitable_noncash", +) +_DONOR_CHAIN_PRIORITY_INDEX = { + variable: index for index, variable in enumerate(DONOR_CHAIN_PRIORITY) +} + + +def _donor_chain_sort_key(variable_name: str) -> tuple[int, str]: + return (_DONOR_CHAIN_PRIORITY_INDEX.get(variable_name, 10_000), variable_name) + def variable_semantic_spec_for(variable_name: str) -> VariableSemanticSpec: """Return semantic metadata for one variable.""" @@ -961,15 +1022,33 @@ def donor_imputation_block_specs( if set(DIVIDEND_COMPONENT_COLUMNS).issubset(remaining): block_specs.append(DIVIDEND_DONOR_BLOCK_SPEC) remaining.difference_update(DIVIDEND_COMPONENT_COLUMNS) - for variable in sorted(remaining): + + grouped_variables: dict[tuple[EntityType, tuple[EntityType, ...]], list[str]] = ( + defaultdict(list) + ) + for variable in sorted(remaining, key=_donor_chain_sort_key): spec = variable_semantic_spec_for(variable) + grouped_variables[ + (spec.native_entity, resolve_condition_entities_for_targets((variable,))) + ].append(variable) + + for (native_entity, _), variables in sorted( + grouped_variables.items(), + key=lambda item: _donor_chain_sort_key(item[1][0]), + ): + block_variables = tuple(sorted(variables, key=_donor_chain_sort_key)) block_specs.append( DonorImputationBlockSpec( - native_entity=spec.native_entity, - condition_entities=resolve_condition_entities_for_targets((variable,)), - model_variables=(variable,), - restored_variables=(variable,), - match_strategies={variable: spec.donor_match_strategy}, + native_entity=native_entity, + condition_entities=resolve_condition_entities_for_targets( + block_variables + ), + model_variables=block_variables, + restored_variables=block_variables, + match_strategies={ + variable: variable_semantic_spec_for(variable).donor_match_strategy + for variable in block_variables + }, ) ) return tuple(block_specs) diff --git a/tests/pipelines/test_regime_aware_donor_imputer.py b/tests/pipelines/test_regime_aware_donor_imputer.py index 7d4bbe9..fbb8546 100644 --- a/tests/pipelines/test_regime_aware_donor_imputer.py +++ b/tests/pipelines/test_regime_aware_donor_imputer.py @@ -120,6 +120,36 @@ def test_factory_dispatches_to_regime_aware(self) -> None: class TestRegimeAwareFitGenerate: """Fit/generate contract and tripartite-specific guarantees.""" + def test_multi_target_fit_uses_one_chained_zero_inflated_imputer(self) -> None: + from microplex_us.pipelines.us import RegimeAwareDonorImputer + + rng = np.random.default_rng(20260606) + n = 300 + age = rng.integers(18, 80, size=n).astype(float) + first = rng.normal(loc=age * 400.0, scale=2_000.0, size=n) + second = 0.75 * first + rng.normal(scale=250.0, size=n) + train = pd.DataFrame( + { + "age": age, + "first_income_leaf": first, + "second_income_leaf": second, + } + ) + + imputer = RegimeAwareDonorImputer( + condition_vars=["age"], + target_vars=["first_income_leaf", "second_income_leaf"], + n_estimators=25, + ) + imputer.fit(train) + + first_fitted = imputer._fitted["first_income_leaf"] + second_fitted = imputer._fitted["second_income_leaf"] + assert first_fitted is second_fitted + + second_bundle = second_fitted._per_variable["second_income_leaf"] + assert second_bundle["predictors"] == ["age", "first_income_leaf"] + def _fit_generate( self, n_train: int = 1500, n_gen: int = 2000, seed: int = 0 ) -> np.ndarray: diff --git a/tests/test_variables.py b/tests/test_variables.py index 140d002..575a485 100644 --- a/tests/test_variables.py +++ b/tests/test_variables.py @@ -171,8 +171,7 @@ def test_donor_imputation_blocks_keep_dividends_in_one_composition_block(): assert blocks == ( ("dividend_income", "qualified_dividend_share"), - ("partnership_s_corp_income",), - ("taxable_interest_income",), + ("partnership_s_corp_income", "taxable_interest_income"), ) @@ -218,7 +217,9 @@ def test_donor_imputation_block_specs_use_zero_inflated_matching_for_sparse_irs_ } ) - by_variable = {spec.model_variables[0]: spec for spec in specs} + by_variable = { + variable_name: spec for spec in specs for variable_name in spec.model_variables + } for variable_name in ( "health_savings_account_ald", "self_employed_health_insurance_ald", diff --git a/uv.lock b/uv.lock index 0e2c305..f50adf7 100644 --- a/uv.lock +++ b/uv.lock @@ -1124,8 +1124,8 @@ wheels = [ [[package]] name = "microimpute" -version = "2.1.1" -source = { git = "https://github.com/PolicyEngine/microimpute.git?rev=27d23090dcf04c2c30ae369b0a209e27eb3659f0#27d23090dcf04c2c30ae369b0a209e27eb3659f0" } +version = "3.1.0" +source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "joblib" }, { name = "numpy" }, @@ -1141,6 +1141,10 @@ dependencies = [ { name = "statsmodels" }, { name = "tqdm" }, ] +sdist = { url = "https://files.pythonhosted.org/packages/71/e4/d9f5f06eaab4cd09d7700190da784a16d7e57e95802db8542658f97bb7ef/microimpute-3.1.0.tar.gz", hash = "sha256:1e7d0f69e99390127755ef10db94cfa5d91029b1075e6a1c51a1cc6168e15336", size = 145868, upload-time = "2026-06-06T06:44:14.005Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/1f/82/1d4ae0872aab0bafdb167bf4de6fa232dd599bc710425e8875e20ab23149/microimpute-3.1.0-py3-none-any.whl", hash = "sha256:4e4fcd21e9c4b1c5928075ba23858b4df47bdecba97fe2ae6b8540027ca68184", size = 127161, upload-time = "2026-06-06T06:44:12.867Z" }, +] [[package]] name = "microplex" @@ -1207,7 +1211,7 @@ requires-dist = [ { name = "h5py", specifier = ">=3.10" }, { name = "huggingface-hub", marker = "extra == 'hf'", specifier = ">=0.24" }, { name = "jupyter-book", marker = "extra == 'docs'", specifier = ">=0.15,<0.16" }, - { name = "microimpute", marker = "python_full_version >= '3.12' and python_full_version < '3.15' and extra == 'policyengine'", git = "https://github.com/PolicyEngine/microimpute.git?rev=27d23090dcf04c2c30ae369b0a209e27eb3659f0" }, + { name = "microimpute", marker = "python_full_version >= '3.12' and python_full_version < '3.15' and extra == 'policyengine'", specifier = "==3.1.0" }, { name = "microplex", extras = ["calibrate"], git = "https://github.com/PolicyEngine/microplex.git?rev=1e0627182f9df40aacd7043c96956c2895bf9d30" }, { name = "microunit", marker = "extra == 'policyengine'", specifier = ">=0.1.0" }, { name = "policyengine-us", marker = "python_full_version >= '3.11' and python_full_version < '3.15' and extra == 'policyengine'", specifier = "==1.715.2" },