Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions src/microplex_us/pipelines/donor_imputers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@
from sklearn.ensemble import RandomForestClassifier


def _deduplicate_columns_preserve_first(frame: pd.DataFrame) -> pd.DataFrame:
"""Return a frame with one column per label, keeping the first occurrence."""

if frame.columns.is_unique:
return frame
return frame.loc[:, ~frame.columns.duplicated()].copy()


class ColumnwiseQRFDonorImputer:
"""Columnwise QRF donor imputer, optionally with zero-inflated support."""

Expand Down Expand Up @@ -218,7 +226,10 @@ def fit(
dict.fromkeys(var for var in self.condition_vars if var not in target_set)
)
fit_columns = tuple(dict.fromkeys((*predictor_vars, *target_vars)))
subset = data[list(fit_columns)].replace([np.inf, -np.inf], np.nan).dropna()
unique_data = _deduplicate_columns_preserve_first(data)
subset = (
unique_data[list(fit_columns)].replace([np.inf, -np.inf], np.nan).dropna()
)
if len(subset) < 25:
return self

Expand Down Expand Up @@ -247,17 +258,18 @@ def generate(
conditions: pd.DataFrame,
seed: int | None = None,
) -> pd.DataFrame:
synthetic = conditions.copy().reset_index(drop=True)
synthetic = _deduplicate_columns_preserve_first(conditions).copy()
synthetic = synthetic.reset_index(drop=True)
fitted = next(iter(self._fitted.values()), None)
if fitted is None:
for column in self.target_vars:
for column in dict.fromkeys(self.target_vars):
synthetic[column] = np.nan
return synthetic

prediction_seed = self.seed if seed is None else int(seed)
self._reset_prediction_rngs(fitted, seed=prediction_seed)
preds = fitted.predict(synthetic[list(self._predictor_columns)])
for column in self.target_vars:
for column in self._fitted_columns:
if column in preds.columns:
synthetic[column] = preds[column].to_numpy(dtype=float)
else:
Expand Down
55 changes: 55 additions & 0 deletions tests/pipelines/test_regime_aware_donor_imputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,61 @@ def test_target_predictor_overlap_is_owned_by_sequential_chain(self) -> None:
synthetic[["first_income_leaf", "second_income_leaf"]].notna().all().all()
)

def test_duplicate_input_columns_are_collapsed_before_microimpute(self) -> None:
from microplex_us.pipelines.us import RegimeAwareDonorImputer

rng = np.random.default_rng(2026060602)
n = 300
age = rng.integers(18, 80, size=n).astype(float)
first = rng.normal(loc=age * 300.0, scale=1_000.0, size=n)
second = 0.5 * first + rng.normal(scale=250.0, size=n)
train = pd.DataFrame(
np.column_stack([age, first, first, second]),
columns=[
"age",
"first_income_leaf",
"first_income_leaf",
"second_income_leaf",
],
)
assert not train.columns.is_unique

imputer = RegimeAwareDonorImputer(
condition_vars=["age", "first_income_leaf"],
target_vars=[
"first_income_leaf",
"first_income_leaf",
"second_income_leaf",
],
n_estimators=25,
)
imputer.fit(train)

assert imputer._fitted_columns == (
"first_income_leaf",
"second_income_leaf",
)
fitted = imputer._fitted["first_income_leaf"]
first_bundle = fitted._per_variable["first_income_leaf"]
second_bundle = fitted._per_variable["second_income_leaf"]
assert first_bundle["predictors"] == ["age"]
assert second_bundle["predictors"] == ["age", "first_income_leaf"]

conditions = pd.DataFrame(
np.column_stack([[25.0, 45.0, 65.0], [26.0, 46.0, 66.0]]),
columns=["age", "age"],
)
synthetic = imputer.generate(conditions, seed=20260606)
assert list(synthetic.columns) == [
"age",
"first_income_leaf",
"second_income_leaf",
]
assert synthetic.columns.is_unique
assert (
synthetic[["first_income_leaf", "second_income_leaf"]].notna().all().all()
)

def _fit_generate(
self, n_train: int = 1500, n_gen: int = 2000, seed: int = 0
) -> np.ndarray:
Expand Down
Loading