Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ hf = [
"huggingface_hub>=0.24",
]
policyengine = [
"microimpute @ git+https://github.com/PolicyEngine/microimpute.git@27d23090dcf04c2c30ae369b0a209e27eb3659f0 ; python_full_version >= '3.12' and python_full_version < '3.15'",
"microimpute==3.1.0; python_full_version >= '3.12' and python_full_version < '3.15'",
"policyengine-us==1.715.2; python_version >= '3.11' and python_version < '3.15'",
"spm-calculator>=0.3.1",
# Standalone tax-unit construction engine (the extraction of eCPS's
Expand Down
89 changes: 49 additions & 40 deletions src/microplex_us/pipelines/donor_imputers.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,13 @@ def generate(


class RegimeAwareDonorImputer:
"""Donor imputer that wraps `microimpute.ZeroInflatedImputer` per column.
"""Donor imputer that wraps one chained `ZeroInflatedImputer` per block.

Each target is fit with an independent `ZeroInflatedImputer`, which
The whole target block is fit with one `ZeroInflatedImputer`, which
auto-detects one of seven regimes (THREE_SIGN / ZI_POSITIVE /
ZI_NEGATIVE / SIGN_ONLY / POSITIVE_ONLY / NEGATIVE_ONLY /
DEGENERATE_ZERO) from the training distribution and composes a
gate classifier + one or two base imputers as appropriate.
DEGENERATE_ZERO) for each target and composes a gate classifier + one or
two base imputers as appropriate.

Key advantages over `ColumnwiseQRFDonorImputer`:

Expand All @@ -157,10 +157,12 @@ class RegimeAwareDonorImputer:
tripartite gate routes to sign-specific base imputers that each
see only one sign of training data.

This class is a thin columnwise adapter: one `ZeroInflatedImputer`
is fit per target, using `microimpute.QRF` as the base. Fit and
generate work column-by-column so memory scales with the single
largest base imputer, not with the total target count.
This class is a thin block adapter: the target order is passed through
to microimpute so target i+1 conditions on the realized imputation for
target i. That preserves cross-target donor relationships such as
income totals, losses, interest, dividends, pensions, and deduction
leaves instead of independently drawing each target from the same
original predictor surface.
"""

def __init__(
Expand All @@ -177,6 +179,7 @@ def __init__(
self.classifier_type = str(classifier_type)
self.seed = int(seed)
self._fitted: dict[str, Any] = {}
self._fitted_columns: tuple[str, ...] = ()
self._regimes: dict[str, str] = {}

def fit(
Expand Down Expand Up @@ -205,28 +208,33 @@ def fit(
from microimpute.models.zero_inflated import ZeroInflatedImputer

self._fitted = {}
self._fitted_columns = ()
self._regimes = {}
for column in self.target_vars:
subset = data[self.condition_vars + [column]].dropna()
if len(subset) < 25:
continue
# base_imputer_kwargs={} because microimpute 2.x's
# ZeroInflatedImputer._fit_base_single already passes
# log_level="ERROR" to the base, and duplicating it here
# raises TypeError. Upstream fix tracked.
wrapper = ZeroInflatedImputer(
base_imputer_class=QRF,
base_imputer_kwargs={},
classifier_type=self.classifier_type,
seed=self.seed,
)
fitted = wrapper.fit(
subset,
predictors=list(self.condition_vars),
imputed_variables=[column],
)
self._fitted[column] = fitted
self._regimes[column] = wrapper.get_regime(column)
subset = (
data[self.condition_vars + self.target_vars]
.replace([np.inf, -np.inf], np.nan)
.dropna()
)
if len(subset) < 25:
return self

wrapper = ZeroInflatedImputer(
base_imputer_class=QRF,
base_imputer_kwargs={},
classifier_type=self.classifier_type,
sequential=True,
seed=self.seed,
)
fitted = wrapper.fit(
subset,
predictors=list(self.condition_vars),
imputed_variables=list(self.target_vars),
)
self._fitted_columns = tuple(self.target_vars)
self._fitted = {column: fitted for column in self._fitted_columns}
self._regimes = {
column: wrapper.get_regime(column) for column in self._fitted_columns
}
return self

def generate(
Expand All @@ -235,19 +243,20 @@ def generate(
seed: int | None = None,
) -> pd.DataFrame:
synthetic = conditions.copy().reset_index(drop=True)
master_seed = self.seed if seed is None else int(seed)
master_rng = np.random.default_rng(master_seed)
fitted = next(iter(self._fitted.values()), None)
if fitted is None:
for column in self.target_vars:
synthetic[column] = np.nan
return synthetic

prediction_seed = self.seed if seed is None else int(seed)
self._reset_prediction_rngs(fitted, seed=prediction_seed)
preds = fitted.predict(synthetic[self.condition_vars])
for column in self.target_vars:
fitted = self._fitted.get(column)
if fitted is None:
if column in preds.columns:
synthetic[column] = preds[column].to_numpy(dtype=float)
else:
synthetic[column] = np.nan
continue
column_seed = int(
master_rng.integers(0, np.iinfo(np.int32).max, dtype=np.int64)
)
self._reset_prediction_rngs(fitted, seed=column_seed)
preds = fitted.predict(synthetic[self.condition_vars])
synthetic[column] = preds[column].to_numpy(dtype=float)
return synthetic

def _reset_prediction_rngs(
Expand Down
91 changes: 85 additions & 6 deletions src/microplex_us/variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

from collections import defaultdict
from collections.abc import Callable, Iterable, Mapping
from dataclasses import dataclass, field
from enum import Enum
Expand Down Expand Up @@ -834,6 +835,66 @@ def restore_dividend_components_from_composition(frame: pd.DataFrame) -> pd.Data
restore_frame=restore_dividend_components_from_composition,
)

DONOR_CHAIN_PRIORITY: tuple[str, ...] = (
"income",
"employment_income",
"employment_income_before_lsr",
"self_employment_income",
"self_employment_income_before_lsr",
"sstb_self_employment_income",
"sstb_self_employment_income_before_lsr",
"partnership_s_corp_income",
"tax_unit_partnership_s_corp_income",
"rental_income",
"rental_income_positive",
"rental_income_negative",
"farm_income",
"farm_operations_income",
"taxable_interest_income",
"tax_exempt_interest_income",
"dividend_income",
"ordinary_dividend_income",
"qualified_dividend_income",
"non_qualified_dividend_income",
DIVIDEND_SHARE_COLUMN,
"short_term_capital_gains",
"long_term_capital_gains",
"net_capital_gains",
"social_security",
"social_security_retirement",
"social_security_disability",
"social_security_survivors",
"social_security_dependents",
"taxable_social_security",
"pension_income",
"taxable_pension_income",
"tax_exempt_pension_income",
"unemployment_compensation",
"taxable_unemployment_compensation",
"health_savings_account_ald",
"self_employed_health_insurance_ald",
"self_employed_pension_contribution_ald",
"self_employed_pension_contributions",
"traditional_401k_contributions",
"roth_401k_contributions",
"traditional_ira_contributions",
"roth_ira_contributions",
"ira_deduction",
"student_loan_interest",
"state_income_tax_paid",
"real_estate_tax_paid",
"mortgage_interest_paid",
"charitable_cash",
"charitable_noncash",
)
_DONOR_CHAIN_PRIORITY_INDEX = {
variable: index for index, variable in enumerate(DONOR_CHAIN_PRIORITY)
}


def _donor_chain_sort_key(variable_name: str) -> tuple[int, str]:
return (_DONOR_CHAIN_PRIORITY_INDEX.get(variable_name, 10_000), variable_name)


def variable_semantic_spec_for(variable_name: str) -> VariableSemanticSpec:
"""Return semantic metadata for one variable."""
Expand Down Expand Up @@ -961,15 +1022,33 @@ def donor_imputation_block_specs(
if set(DIVIDEND_COMPONENT_COLUMNS).issubset(remaining):
block_specs.append(DIVIDEND_DONOR_BLOCK_SPEC)
remaining.difference_update(DIVIDEND_COMPONENT_COLUMNS)
for variable in sorted(remaining):

grouped_variables: dict[tuple[EntityType, tuple[EntityType, ...]], list[str]] = (
defaultdict(list)
)
for variable in sorted(remaining, key=_donor_chain_sort_key):
spec = variable_semantic_spec_for(variable)
grouped_variables[
(spec.native_entity, resolve_condition_entities_for_targets((variable,)))
].append(variable)

for (native_entity, _), variables in sorted(
grouped_variables.items(),
key=lambda item: _donor_chain_sort_key(item[1][0]),
):
block_variables = tuple(sorted(variables, key=_donor_chain_sort_key))
block_specs.append(
DonorImputationBlockSpec(
native_entity=spec.native_entity,
condition_entities=resolve_condition_entities_for_targets((variable,)),
model_variables=(variable,),
restored_variables=(variable,),
match_strategies={variable: spec.donor_match_strategy},
native_entity=native_entity,
condition_entities=resolve_condition_entities_for_targets(
block_variables
),
model_variables=block_variables,
restored_variables=block_variables,
match_strategies={
variable: variable_semantic_spec_for(variable).donor_match_strategy
for variable in block_variables
},
)
)
return tuple(block_specs)
Expand Down
30 changes: 30 additions & 0 deletions tests/pipelines/test_regime_aware_donor_imputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,36 @@ def test_factory_dispatches_to_regime_aware(self) -> None:
class TestRegimeAwareFitGenerate:
"""Fit/generate contract and tripartite-specific guarantees."""

def test_multi_target_fit_uses_one_chained_zero_inflated_imputer(self) -> None:
from microplex_us.pipelines.us import RegimeAwareDonorImputer

rng = np.random.default_rng(20260606)
n = 300
age = rng.integers(18, 80, size=n).astype(float)
first = rng.normal(loc=age * 400.0, scale=2_000.0, size=n)
second = 0.75 * first + rng.normal(scale=250.0, size=n)
train = pd.DataFrame(
{
"age": age,
"first_income_leaf": first,
"second_income_leaf": second,
}
)

imputer = RegimeAwareDonorImputer(
condition_vars=["age"],
target_vars=["first_income_leaf", "second_income_leaf"],
n_estimators=25,
)
imputer.fit(train)

first_fitted = imputer._fitted["first_income_leaf"]
second_fitted = imputer._fitted["second_income_leaf"]
assert first_fitted is second_fitted

second_bundle = second_fitted._per_variable["second_income_leaf"]
assert second_bundle["predictors"] == ["age", "first_income_leaf"]

def _fit_generate(
self, n_train: int = 1500, n_gen: int = 2000, seed: int = 0
) -> np.ndarray:
Expand Down
7 changes: 4 additions & 3 deletions tests/test_variables.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,7 @@ def test_donor_imputation_blocks_keep_dividends_in_one_composition_block():

assert blocks == (
("dividend_income", "qualified_dividend_share"),
("partnership_s_corp_income",),
("taxable_interest_income",),
("partnership_s_corp_income", "taxable_interest_income"),
)


Expand Down Expand Up @@ -218,7 +217,9 @@ def test_donor_imputation_block_specs_use_zero_inflated_matching_for_sparse_irs_
}
)

by_variable = {spec.model_variables[0]: spec for spec in specs}
by_variable = {
variable_name: spec for spec in specs for variable_name in spec.model_variables
}
for variable_name in (
"health_savings_account_ald",
"self_employed_health_insurance_ald",
Expand Down
10 changes: 7 additions & 3 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading