From 053a0b7d682f61cb0b2ed3e75d14a67df9bac855 Mon Sep 17 00:00:00 2001 From: igerber Date: Fri, 3 Apr 2026 10:59:36 -0400 Subject: [PATCH 01/13] Add lonely PSU adjust bootstrap, CV on estimates, and weight trimming 8d: Implement lonely_psu="adjust" for survey-aware bootstrap. Singleton PSUs are pooled into a combined pseudo-stratum for weight generation in both multiplier (generate_survey_multiplier_weights_batch) and Rao-Wu (generate_rao_wu_weights) paths. This matches the analytical TSL "adjust" behavior of centering around the global mean. 8e-i: Add coef_var property (SE / |estimate|) to all 12 results classes with CV display in summary() output. Returns NaN when estimate is 0 or SE is non-finite. Used by federal agencies (NCHS) for publication standards (CV < 30%). 8e-ii: Add trim_weights() utility to prep.py for capping extreme survey weights via absolute upper bound or quantile-based threshold. Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/__init__.py | 1 + diff_diff/bootstrap_utils.py | 69 ++++- diff_diff/continuous_did_results.py | 16 +- diff_diff/efficient_did_results.py | 16 +- diff_diff/imputation_results.py | 16 +- diff_diff/prep.py | 62 ++++ diff_diff/results.py | 39 +++ diff_diff/stacked_did_results.py | 16 +- diff_diff/staggered_results.py | 38 ++- diff_diff/staggered_triple_diff_results.py | 37 +-- diff_diff/sun_abraham.py | 73 ++++- diff_diff/trop_results.py | 13 + diff_diff/two_stage_results.py | 16 +- tests/test_survey_phase8.py | 321 +++++++++++++++++++++ 14 files changed, 668 insertions(+), 65 deletions(-) diff --git a/diff_diff/__init__.py b/diff_diff/__init__.py index fd0908a3..6425b9fb 100644 --- a/diff_diff/__init__.py +++ b/diff_diff/__init__.py @@ -94,6 +94,7 @@ make_treatment_indicator, rank_control_units, summarize_did_data, + trim_weights, validate_did_data, wide_to_long, ) diff --git a/diff_diff/bootstrap_utils.py b/diff_diff/bootstrap_utils.py index cdf7ee2c..23dffb71 100644 --- a/diff_diff/bootstrap_utils.py +++ b/diff_diff/bootstrap_utils.py @@ -454,11 +454,7 @@ def generate_survey_multiplier_weights_batch( psu = resolved_survey.psu strata = resolved_survey.strata - if resolved_survey.lonely_psu == "adjust": - raise NotImplementedError( - "lonely_psu='adjust' is not yet supported for survey-aware bootstrap. " - "Use lonely_psu='remove' or 'certainty', or use analytical inference." - ) + _lonely_psu = resolved_survey.lonely_psu if psu is None: # Each observation is its own PSU @@ -499,6 +495,7 @@ def generate_survey_multiplier_weights_batch( psu_to_col = {int(p): i for i, p in enumerate(psu_ids)} unique_strata = np.unique(strata) + _singleton_cols = [] # For lonely_psu="adjust" pooling for h in unique_strata: mask_h = strata == h @@ -511,8 +508,12 @@ def generate_survey_multiplier_weights_batch( cols = np.array([psu_to_col[int(p)] for p in psus_in_h]) if n_h < 2: - # Lonely PSU — zero weight (matches remove/certainty behavior) - weights[:, cols] = 0.0 + if _lonely_psu == "adjust": + # Collect for pooled pseudo-stratum processing + _singleton_cols.extend(cols.tolist()) + else: + # remove / certainty — zero weight + weights[:, cols] = 0.0 continue # Generate weights for this stratum @@ -536,6 +537,20 @@ def generate_survey_multiplier_weights_batch( weights[:, cols] = stratum_weights + # Pool singleton PSUs into a pseudo-stratum for "adjust" + if _singleton_cols: + n_pooled = len(_singleton_cols) + if n_pooled >= 2: + pooled_weights = generate_bootstrap_weights_batch_numpy( + n_bootstrap, n_pooled, weight_type, rng + ) + # No FPC scaling for pooled singletons (conservative) + pooled_cols = np.array(_singleton_cols) + weights[:, pooled_cols] = pooled_weights + else: + # Single singleton — variance unidentified, zero weight + weights[:, _singleton_cols[0]] = 0.0 + return weights, psu_ids @@ -570,11 +585,7 @@ def generate_rao_wu_weights( psu = resolved_survey.psu strata = resolved_survey.strata - if resolved_survey.lonely_psu == "adjust": - raise NotImplementedError( - "lonely_psu='adjust' is not yet supported for survey-aware bootstrap. " - "Use lonely_psu='remove' or 'certainty', or use analytical inference." - ) + _lonely_psu_rw = resolved_survey.lonely_psu rescaled = np.zeros(n_obs, dtype=np.float64) @@ -589,14 +600,20 @@ def generate_rao_wu_weights( unique_strata = np.unique(strata) strata_masks = [strata == h for h in unique_strata] + # Collect singleton PSUs for "adjust" pooling + _singleton_info = [] # list of (mask_h, unique_psu_h) tuples + for mask_h in strata_masks: psu_h = obs_psu[mask_h] unique_psu_h = np.unique(psu_h) n_h = len(unique_psu_h) if n_h < 2: - # Census / lonely PSU — keep original weights (zero variance) - rescaled[mask_h] = base_weights[mask_h] + if _lonely_psu_rw == "adjust": + _singleton_info.append((mask_h, unique_psu_h)) + else: + # remove / certainty — keep original weights (zero variance) + rescaled[mask_h] = base_weights[mask_h] continue # Compute resample size @@ -629,6 +646,30 @@ def generate_rao_wu_weights( local_indices = np.array([psu_to_local[int(obs_psu[idx])] for idx in obs_in_h]) rescaled[obs_in_h] = base_weights[obs_in_h] * scale_per_psu[local_indices] + # Pool singleton PSUs into a pseudo-stratum for "adjust" + if _singleton_info: + # Combine all singleton PSUs into one group + pooled_psus = np.concatenate([p for _, p in _singleton_info]) + n_pooled = len(pooled_psus) + + if n_pooled >= 2: + m_pooled = n_pooled - 1 # No FPC for pooled singletons + drawn = rng.choice(n_pooled, size=m_pooled, replace=True) + counts = np.bincount(drawn, minlength=n_pooled) + scale_per_psu = (n_pooled / m_pooled) * counts.astype(np.float64) + + # Build PSU → scale mapping and apply + psu_scale_map = {int(pooled_psus[i]): scale_per_psu[i] for i in range(n_pooled)} + for mask_h, _ in _singleton_info: + obs_in_h = np.where(mask_h)[0] + for idx in obs_in_h: + p = int(obs_psu[idx]) + rescaled[idx] = base_weights[idx] * psu_scale_map.get(p, 1.0) + else: + # Single singleton total — variance unidentified, keep base weights + for mask_h, _ in _singleton_info: + rescaled[mask_h] = base_weights[mask_h] + return rescaled diff --git a/diff_diff/continuous_did_results.py b/diff_diff/continuous_did_results.py index 67a745d4..ccbeeb60 100644 --- a/diff_diff/continuous_did_results.py +++ b/diff_diff/continuous_did_results.py @@ -154,6 +154,15 @@ def __repr__(self) -> str: f"n_periods={len(self.time_periods)})" ) + @property + def coef_var(self) -> float: + """Coefficient of variation: SE / |overall ATT|. NaN when ATT is 0 or SE non-finite.""" + if not (np.isfinite(self.overall_att_se) and self.overall_att_se > 0): + return np.nan + if not np.isfinite(self.overall_att) or self.overall_att == 0: + return np.nan + return self.overall_att_se / abs(self.overall_att) + def summary(self, alpha: Optional[float] = None) -> str: """Generate formatted summary.""" alpha = alpha or self.alpha @@ -223,10 +232,15 @@ def summary(self, alpha: Optional[float] = None) -> str: f"[{self.overall_att_conf_int[0]:.4f}, {self.overall_att_conf_int[1]:.4f}]", f"{conf_level}% CI for ACRT_glob: " f"[{self.overall_acrt_conf_int[0]:.4f}, {self.overall_acrt_conf_int[1]:.4f}]", - "", ] ) + cv = self.coef_var + if np.isfinite(cv): + lines.append(f"{'CV (SE/|ATT|):':<25} {cv:>10.4f}") + + lines.append("") + # Dose-response curve summary (first/mid/last points) if len(self.dose_grid) > 0: lines.extend( diff --git a/diff_diff/efficient_did_results.py b/diff_diff/efficient_did_results.py index 4d10c9b3..7b05430d 100644 --- a/diff_diff/efficient_did_results.py +++ b/diff_diff/efficient_did_results.py @@ -172,6 +172,15 @@ def __repr__(self) -> str: f"n_periods={len(self.time_periods)})" ) + @property + def coef_var(self) -> float: + """Coefficient of variation: SE / |overall ATT|. NaN when ATT is 0 or SE non-finite.""" + if not (np.isfinite(self.overall_se) and self.overall_se > 0): + return np.nan + if not np.isfinite(self.overall_att) or self.overall_att == 0: + return np.nan + return self.overall_se / abs(self.overall_att) + def summary(self, alpha: Optional[float] = None) -> str: """Generate formatted summary of estimation results.""" alpha = alpha or self.alpha @@ -219,10 +228,15 @@ def summary(self, alpha: Optional[float] = None) -> str: "", f"{conf_level}% Confidence Interval: " f"[{self.overall_conf_int[0]:.4f}, {self.overall_conf_int[1]:.4f}]", - "", ] ) + cv = self.coef_var + if np.isfinite(cv): + lines.append(f"{'CV (SE/|ATT|):':<25} {cv:>10.4f}") + + lines.append("") + # Event study effects if self.event_study_effects: lines.extend( diff --git a/diff_diff/imputation_results.py b/diff_diff/imputation_results.py index 3a0465e1..b27f1dd1 100644 --- a/diff_diff/imputation_results.py +++ b/diff_diff/imputation_results.py @@ -152,6 +152,15 @@ def __repr__(self) -> str: f"n_treated_obs={self.n_treated_obs})" ) + @property + def coef_var(self) -> float: + """Coefficient of variation: SE / |overall ATT|. NaN when ATT is 0 or SE non-finite.""" + if not (np.isfinite(self.overall_se) and self.overall_se > 0): + return np.nan + if not np.isfinite(self.overall_att) or self.overall_att == 0: + return np.nan + return self.overall_se / abs(self.overall_att) + def summary(self, alpha: Optional[float] = None) -> str: """ Generate formatted summary of estimation results. @@ -219,10 +228,15 @@ def summary(self, alpha: Optional[float] = None) -> str: "", f"{conf_level}% Confidence Interval: " f"[{self.overall_conf_int[0]:.4f}, {self.overall_conf_int[1]:.4f}]", - "", ] ) + cv = self.coef_var + if np.isfinite(cv): + lines.append(f"{'CV (SE/|ATT|):':<25} {cv:>10.4f}") + + lines.append("") + # Event study effects if self.event_study_effects: lines.extend( diff --git a/diff_diff/prep.py b/diff_diff/prep.py index e74cce14..949e82e2 100644 --- a/diff_diff/prep.py +++ b/diff_diff/prep.py @@ -1225,3 +1225,65 @@ def _suggest_treatment_candidates( # Return top candidates result = result.nlargest(n_candidates, "treatment_candidate_score") return result.reset_index(drop=True) + + +def trim_weights( + data: pd.DataFrame, + weight_col: str, + upper: Optional[float] = None, + quantile: Optional[float] = None, + lower: Optional[float] = None, +) -> pd.DataFrame: + """Trim (winsorize) survey weights to reduce influence of extreme values. + + Caps weights at specified thresholds. Useful for reducing variance from + extreme survey weights before DiD estimation. Federal agencies (e.g., NCHS) + recommend reviewing weights with CV > 30%. + + Parameters + ---------- + data : pd.DataFrame + Input DataFrame. + weight_col : str + Name of the weight column. + upper : float, optional + Absolute upper cap. Weights above this value are set to it. + Mutually exclusive with ``quantile``. + quantile : float, optional + Quantile-based upper cap (e.g., 0.99). Weights above the quantile + value are capped at it. Mutually exclusive with ``upper``. + lower : float, optional + Absolute lower floor. Weights below this value are set to it. + Can be combined with either ``upper`` or ``quantile``. + + Returns + ------- + pd.DataFrame + Copy of data with trimmed weights. + + Raises + ------ + ValueError + If both ``upper`` and ``quantile`` are provided, or if ``weight_col`` + is not in the DataFrame. + """ + if upper is not None and quantile is not None: + raise ValueError("Specify either 'upper' or 'quantile', not both.") + if weight_col not in data.columns: + raise ValueError(f"Column '{weight_col}' not found in DataFrame.") + + result = data.copy() + w = result[weight_col].values.copy() + + if quantile is not None: + if not (0 < quantile < 1): + raise ValueError(f"quantile must be in (0, 1), got {quantile}") + upper = float(np.quantile(w, quantile)) + + if upper is not None: + w = np.minimum(w, upper) + if lower is not None: + w = np.maximum(w, lower) + + result[weight_col] = w + return result diff --git a/diff_diff/results.py b/diff_diff/results.py index bf833192..a9403c60 100644 --- a/diff_diff/results.py +++ b/diff_diff/results.py @@ -104,6 +104,15 @@ def __repr__(self) -> str: f"p={self.p_value:.4f})" ) + @property + def coef_var(self) -> float: + """Coefficient of variation: SE / |ATT|. NaN when ATT is 0 or SE non-finite.""" + if not (np.isfinite(self.se) and self.se > 0): + return np.nan + if not np.isfinite(self.att) or self.att == 0: + return np.nan + return self.se / abs(self.att) + def summary(self, alpha: Optional[float] = None) -> str: """ Generate a formatted summary of the estimation results. @@ -161,6 +170,10 @@ def summary(self, alpha: Optional[float] = None) -> str: ] ) + cv = self.coef_var + if np.isfinite(cv): + lines.append(f"{'CV (SE/|ATT|):':<25} {cv:>10.4f}") + # Add significance codes lines.extend( [ @@ -387,6 +400,15 @@ def post_period_effects(self) -> Dict[Any, PeriodEffect]: """Post-period effects only.""" return {p: pe for p, pe in self.period_effects.items() if p in self.post_periods} + @property + def coef_var(self) -> float: + """Coefficient of variation: SE / |overall ATT|. NaN when ATT is 0 or SE non-finite.""" + if not (np.isfinite(self.avg_se) and self.avg_se > 0): + return np.nan + if not np.isfinite(self.avg_att) or self.avg_att == 0: + return np.nan + return self.avg_se / abs(self.avg_att) + def summary(self, alpha: Optional[float] = None) -> str: """ Generate a formatted summary of the estimation results. @@ -495,6 +517,10 @@ def summary(self, alpha: Optional[float] = None) -> str: ] ) + cv = self.coef_var + if np.isfinite(cv): + lines.append(f"{'CV (SE/|ATT|):':<25} {cv:>10.4f}") + # Add significance codes lines.extend( [ @@ -693,6 +719,15 @@ def __repr__(self) -> str: f"p={self.p_value:.4f})" ) + @property + def coef_var(self) -> float: + """Coefficient of variation: SE / |ATT|. NaN when ATT is 0 or SE non-finite.""" + if not (np.isfinite(self.se) and self.se > 0): + return np.nan + if not np.isfinite(self.att) or self.att == 0: + return np.nan + return self.se / abs(self.att) + def summary(self, alpha: Optional[float] = None) -> str: """ Generate a formatted summary of the estimation results. @@ -756,6 +791,10 @@ def summary(self, alpha: Optional[float] = None) -> str: ] ) + cv = self.coef_var + if np.isfinite(cv): + lines.append(f"{'CV (SE/|ATT|):':<25} {cv:>10.4f}") + # Show top unit weights if self.unit_weights: sorted_weights = sorted(self.unit_weights.items(), key=lambda x: x[1], reverse=True) diff --git a/diff_diff/stacked_did_results.py b/diff_diff/stacked_did_results.py index 20bfb65a..f6348dfb 100644 --- a/diff_diff/stacked_did_results.py +++ b/diff_diff/stacked_did_results.py @@ -106,6 +106,15 @@ def __repr__(self) -> str: f"n_stacked_obs={self.n_stacked_obs})" ) + @property + def coef_var(self) -> float: + """Coefficient of variation: SE / |overall ATT|. NaN when ATT is 0 or SE non-finite.""" + if not (np.isfinite(self.overall_se) and self.overall_se > 0): + return np.nan + if not np.isfinite(self.overall_att) or self.overall_att == 0: + return np.nan + return self.overall_se / abs(self.overall_att) + def summary(self, alpha: Optional[float] = None) -> str: """ Generate formatted summary of estimation results. @@ -176,10 +185,15 @@ def summary(self, alpha: Optional[float] = None) -> str: "", f"{conf_level}% Confidence Interval: " f"[{self.overall_conf_int[0]:.4f}, {self.overall_conf_int[1]:.4f}]", - "", ] ) + cv = self.coef_var + if np.isfinite(cv): + lines.append(f"{'CV (SE/|ATT|):':<25} {cv:>10.4f}") + + lines.append("") + # Event study effects if self.event_study_effects: lines.extend( diff --git a/diff_diff/staggered_results.py b/diff_diff/staggered_results.py index 946b04d9..a98ae303 100644 --- a/diff_diff/staggered_results.py +++ b/diff_diff/staggered_results.py @@ -140,6 +140,15 @@ def __repr__(self) -> str: f"n_periods={len(self.time_periods)})" ) + @property + def coef_var(self) -> float: + """Coefficient of variation: SE / |overall ATT|. NaN when ATT is 0 or SE non-finite.""" + if not (np.isfinite(self.overall_se) and self.overall_se > 0): + return np.nan + if not np.isfinite(self.overall_att) or self.overall_att == 0: + return np.nan + return self.overall_se / abs(self.overall_att) + def summary(self, alpha: Optional[float] = None) -> str: """ Generate formatted summary of estimation results. @@ -191,10 +200,15 @@ def summary(self, alpha: Optional[float] = None) -> str: "-" * 85, "", f"{conf_level}% Confidence Interval: [{self.overall_conf_int[0]:.4f}, {self.overall_conf_int[1]:.4f}]", - "", ] ) + cv = self.coef_var + if np.isfinite(cv): + lines.append(f"{'CV (SE/|ATT|):':<25} {cv:>10.4f}") + + lines.append("") + # EPV diagnostics block (if any cohort has low EPV) if self.epv_diagnostics: low_epv = {k: v for k, v in self.epv_diagnostics.items() if v.get("is_low")} @@ -294,9 +308,7 @@ def epv_summary(self, show_all: bool = False) -> pd.DataFrame: Columns: group, time, epv, n_events, n_params, is_low. """ if not self.epv_diagnostics: - return pd.DataFrame( - columns=["group", "time", "epv", "n_events", "n_params", "is_low"] - ) + return pd.DataFrame(columns=["group", "time", "epv", "n_events", "n_params", "is_low"]) rows = [] for (g, t), diag in sorted(self.epv_diagnostics.items()): if show_all or diag.get("is_low", False): @@ -335,15 +347,15 @@ def to_dataframe(self, level: str = "group_time") -> pd.DataFrame: rows = [] for (g, t), data in self.group_time_effects.items(): row = { - "group": g, - "time": t, - "effect": data["effect"], - "se": data["se"], - "t_stat": data["t_stat"], - "p_value": data["p_value"], - "conf_int_lower": data["conf_int"][0], - "conf_int_upper": data["conf_int"][1], - } + "group": g, + "time": t, + "effect": data["effect"], + "se": data["se"], + "t_stat": data["t_stat"], + "p_value": data["p_value"], + "conf_int_lower": data["conf_int"][0], + "conf_int_upper": data["conf_int"][1], + } if self.epv_diagnostics and (g, t) in self.epv_diagnostics: row["epv"] = self.epv_diagnostics[(g, t)].get("epv") rows.append(row) diff --git a/diff_diff/staggered_triple_diff_results.py b/diff_diff/staggered_triple_diff_results.py index 56514429..dcc1db19 100644 --- a/diff_diff/staggered_triple_diff_results.py +++ b/diff_diff/staggered_triple_diff_results.py @@ -100,6 +100,15 @@ def __repr__(self) -> str: f"n_periods={len(self.time_periods)})" ) + @property + def coef_var(self) -> float: + """Coefficient of variation: SE / |overall ATT|. NaN when ATT is 0 or SE non-finite.""" + if not (np.isfinite(self.overall_se) and self.overall_se > 0): + return np.nan + if not np.isfinite(self.overall_att) or self.overall_att == 0: + return np.nan + return self.overall_se / abs(self.overall_att) + def summary(self, alpha: Optional[float] = None) -> str: """ Generate formatted summary of estimation results. @@ -156,10 +165,15 @@ def summary(self, alpha: Optional[float] = None) -> str: "", f"{conf_level}% Confidence Interval: " f"[{self.overall_conf_int[0]:.4f}, {self.overall_conf_int[1]:.4f}]", - "", ] ) + cv = self.coef_var + if np.isfinite(cv): + lines.append(f"{'CV (SE/|ATT|):':<25} {cv:>10.4f}") + + lines.append("") + # EPV diagnostics block (if any cohort has low EPV) if self.epv_diagnostics: low_epv = {k: v for k, v in self.epv_diagnostics.items() if v.get("is_low")} @@ -187,11 +201,7 @@ def summary(self, alpha: Optional[float] = None) -> str: # Event study effects if self.event_study_effects: - ci_label = ( - "Simult. CI" - if self.cband_crit_value is not None - else "Pointwise CI" - ) + ci_label = "Simult. CI" if self.cband_crit_value is not None else "Pointwise CI" lines.extend( [ "-" * 85, @@ -270,9 +280,7 @@ def epv_summary(self, show_all: bool = False) -> pd.DataFrame: Columns: group, time, epv, n_events, n_params, is_low. """ if not self.epv_diagnostics: - return pd.DataFrame( - columns=["group", "time", "epv", "n_events", "n_params", "is_low"] - ) + return pd.DataFrame(columns=["group", "time", "epv", "n_events", "n_params", "is_low"]) rows = [] for (g, t), diag in sorted(self.epv_diagnostics.items()): if show_all or diag.get("is_low", False): @@ -323,9 +331,7 @@ def to_dataframe(self, level: str = "group_time") -> pd.DataFrame: elif level == "event_study": if self.event_study_effects is None: - raise ValueError( - "Event study effects not computed. Use aggregate='event_study'." - ) + raise ValueError("Event study effects not computed. Use aggregate='event_study'.") rows = [] for rel_t, data in sorted(self.event_study_effects.items()): cband_ci = data.get("cband_conf_int", (np.nan, np.nan)) @@ -346,9 +352,7 @@ def to_dataframe(self, level: str = "group_time") -> pd.DataFrame: elif level == "group": if self.group_effects is None: - raise ValueError( - "Group effects not computed. Use aggregate='group'." - ) + raise ValueError("Group effects not computed. Use aggregate='group'.") rows = [] for group, data in sorted(self.group_effects.items()): rows.append( @@ -366,8 +370,7 @@ def to_dataframe(self, level: str = "group_time") -> pd.DataFrame: else: raise ValueError( - f"Unknown level: {level}. " - "Use 'group_time', 'event_study', or 'group'." + f"Unknown level: {level}. " "Use 'group_time', 'event_study', or 'group'." ) def to_dict(self) -> Dict[str, Any]: diff --git a/diff_diff/sun_abraham.py b/diff_diff/sun_abraham.py index c93544ab..f52c6f14 100644 --- a/diff_diff/sun_abraham.py +++ b/diff_diff/sun_abraham.py @@ -97,6 +97,15 @@ def __repr__(self) -> str: f"n_rel_periods={n_rel_periods})" ) + @property + def coef_var(self) -> float: + """Coefficient of variation: SE / |overall ATT|. NaN when ATT is 0 or SE non-finite.""" + if not (np.isfinite(self.overall_se) and self.overall_se > 0): + return np.nan + if not np.isfinite(self.overall_att) or self.overall_att == 0: + return np.nan + return self.overall_se / abs(self.overall_att) + def summary(self, alpha: Optional[float] = None) -> str: """ Generate formatted summary of estimation results. @@ -149,10 +158,15 @@ def summary(self, alpha: Optional[float] = None) -> str: "", f"{conf_level}% Confidence Interval: " f"[{self.overall_conf_int[0]:.4f}, {self.overall_conf_int[1]:.4f}]", - "", ] ) + cv = self.coef_var + if np.isfinite(cv): + lines.append(f"{'CV (SE/|ATT|):':<25} {cv:>10.4f}") + + lines.append("") + # Event study effects lines.extend( [ @@ -501,9 +515,7 @@ def fit( if resolved_survey is not None: _validate_unit_constant_survey(data, unit, survey_design) - _uses_replicate_sa = ( - resolved_survey is not None and resolved_survey.uses_replicate_variance - ) + _uses_replicate_sa = resolved_survey is not None and resolved_survey.uses_replicate_variance if _uses_replicate_sa and self.n_bootstrap > 0: raise ValueError( "Cannot use n_bootstrap > 0 with replicate-weight survey designs. " @@ -650,9 +662,16 @@ def _refit_sa(w_r): df_reg_nz = df_reg[nz] if not np.all(nz) else df_reg w_nz = w_r[nz] if not np.all(nz) else w_r ce_r, _, vcov_r, cim_r = self._fit_saturated_regression( - df_reg_nz, outcome, unit, time, first_treat, - treatment_groups, _sa_rel_periods, covariates, - cluster_var, survey_weights=w_nz, + df_reg_nz, + outcome, + unit, + time, + first_treat, + treatment_groups, + _sa_rel_periods, + covariates, + cluster_var, + survey_weights=w_nz, survey_weight_type=survey_weight_type, resolved_survey=None, ) @@ -661,11 +680,25 @@ def _refit_sa(w_r): _wt_col = "_rep_wt" df[_wt_col] = w_r es_r, _ = self._compute_iw_effects( - df, unit, first_treat, treatment_groups, _sa_rel_periods, - ce_r, {}, vcov_r, cim_r, survey_weight_col=_wt_col, + df, + unit, + first_treat, + treatment_groups, + _sa_rel_periods, + ce_r, + {}, + vcov_r, + cim_r, + survey_weight_col=_wt_col, ) att_r, _ = self._compute_overall_att( - df, first_treat, es_r, ce_r, _, vcov_r, cim_r, + df, + first_treat, + es_r, + ce_r, + _, + vcov_r, + cim_r, survey_weight_col=_wt_col, ) results = [att_r] @@ -741,7 +774,9 @@ def _refit_sa(w_r): if _n_valid_rep_sa < resolved_survey.n_replicates: _sa_survey_df = _n_valid_rep_sa - 1 if _n_valid_rep_sa > 1 else 0 if survey_metadata is not None: - survey_metadata.df_survey = _sa_survey_df if _sa_survey_df and _sa_survey_df > 0 else None + survey_metadata.df_survey = ( + _sa_survey_df if _sa_survey_df and _sa_survey_df > 0 else None + ) # Override overall ATT SE overall_se = float(np.sqrt(max(_vcov_sa[0, 0], 0.0))) @@ -769,9 +804,16 @@ def _refit_sa_cohort(w_r): df_reg_nz = df_reg[nz] if not np.all(nz) else df_reg w_nz = w_r[nz] if not np.all(nz) else w_r ce_r, _, _, _ = self._fit_saturated_regression( - df_reg_nz, outcome, unit, time, first_treat, - treatment_groups, _sa_rel_periods, covariates, - cluster_var, survey_weights=w_nz, + df_reg_nz, + outcome, + unit, + time, + first_treat, + treatment_groups, + _sa_rel_periods, + covariates, + cluster_var, + survey_weights=w_nz, survey_weight_type=survey_weight_type, resolved_survey=None, ) @@ -1421,8 +1463,7 @@ def _run_rao_wu_bootstrap( .groupby(df[unit]) .first() .reindex(all_units) - .values - .astype(np.float64) + .values.astype(np.float64) ) strata_unit = None diff --git a/diff_diff/trop_results.py b/diff_diff/trop_results.py index 2123499f..934fef78 100644 --- a/diff_diff/trop_results.py +++ b/diff_diff/trop_results.py @@ -160,6 +160,15 @@ def __repr__(self) -> str: f"p={self.p_value:.4f})" ) + @property + def coef_var(self) -> float: + """Coefficient of variation: SE / |ATT|. NaN when ATT is 0 or SE non-finite.""" + if not (np.isfinite(self.se) and self.se > 0): + return np.nan + if not np.isfinite(self.att) or self.att == 0: + return np.nan + return self.se / abs(self.att) + def summary(self, alpha: Optional[float] = None) -> str: """ Generate a formatted summary of the estimation results. @@ -225,6 +234,10 @@ def summary(self, alpha: Optional[float] = None) -> str: ] ) + cv = self.coef_var + if np.isfinite(cv): + lines.append(f"{'CV (SE/|ATT|):':<25} {cv:>10.4f}") + # Add significance codes lines.extend( [ diff --git a/diff_diff/two_stage_results.py b/diff_diff/two_stage_results.py index bb064258..7bd44ace 100644 --- a/diff_diff/two_stage_results.py +++ b/diff_diff/two_stage_results.py @@ -150,6 +150,15 @@ def __repr__(self) -> str: f"n_treated_obs={self.n_treated_obs})" ) + @property + def coef_var(self) -> float: + """Coefficient of variation: SE / |overall ATT|. NaN when ATT is 0 or SE non-finite.""" + if not (np.isfinite(self.overall_se) and self.overall_se > 0): + return np.nan + if not np.isfinite(self.overall_att) or self.overall_att == 0: + return np.nan + return self.overall_se / abs(self.overall_att) + def summary(self, alpha: Optional[float] = None) -> str: """ Generate formatted summary of estimation results. @@ -217,10 +226,15 @@ def summary(self, alpha: Optional[float] = None) -> str: "", f"{conf_level}% Confidence Interval: " f"[{self.overall_conf_int[0]:.4f}, {self.overall_conf_int[1]:.4f}]", - "", ] ) + cv = self.coef_var + if np.isfinite(cv): + lines.append(f"{'CV (SE/|ATT|):':<25} {cv:>10.4f}") + + lines.append("") + # Event study effects if self.event_study_effects: lines.extend( diff --git a/tests/test_survey_phase8.py b/tests/test_survey_phase8.py index 92517afe..7df22655 100644 --- a/tests/test_survey_phase8.py +++ b/tests/test_survey_phase8.py @@ -742,3 +742,324 @@ def test_all_singleton_remove_unidentified(self): assert legit_zero == 0 # Meat is zero matrix (unidentified — caller should produce NaN) assert np.all(meat == 0.0) + + +# =========================================================================== +# 8d: Lonely PSU "adjust" in Bootstrap +# =========================================================================== + + +class TestBootstrapLonelyPSUAdjust: + """Tests for lonely_psu='adjust' in survey-aware bootstrap.""" + + def _make_survey_data_with_singletons(self): + """Create staggered panel data with singleton strata for bootstrap tests.""" + np.random.seed(42) + n_units, n_periods = 40, 6 + rows = [] + for i in range(n_units): + ft = 4 if i < 15 else (0 if i >= 30 else 6) + # Stratum 0 has 1 PSU (singleton), strata 1-3 have multiple + if i == 0: + stratum = 0 + elif i < 15: + stratum = 1 + elif i < 30: + stratum = 2 + else: + stratum = 3 + for t in range(1, n_periods + 1): + y = 5.0 + i * 0.02 + t * 0.1 + if ft > 0 and t >= ft: + y += 1.5 + y += np.random.normal(0, 0.3) + rows.append( + { + "unit": i, + "time": t, + "first_treat": ft, + "outcome": y, + "weight": 1.0 + 0.2 * (i % 5), + "stratum": stratum, + "psu": i, + } + ) + return pd.DataFrame(rows) + + def test_multiplier_adjust_no_raise(self): + """Multiplier bootstrap with lonely_psu='adjust' runs without error.""" + from diff_diff import CallawaySantAnna + + data = self._make_survey_data_with_singletons() + sd = SurveyDesign( + weights="weight", + strata="stratum", + psu="psu", + lonely_psu="adjust", + ) + est = CallawaySantAnna(n_bootstrap=50, seed=42) + result = est.fit( + data, + outcome="outcome", + unit="unit", + time="time", + first_treat="first_treat", + survey_design=sd, + ) + assert np.isfinite(result.overall_att) + assert np.isfinite(result.overall_se) + assert result.overall_se > 0 + + def test_adjust_nonzero_variance(self): + """With 'adjust', singleton strata contribute nonzero bootstrap variance.""" + from diff_diff.bootstrap_utils import generate_survey_multiplier_weights_batch + from diff_diff.survey import ResolvedSurveyDesign + + np.random.seed(42) + n = 20 + # Strata 0 and 1 are singletons (1 PSU each), strata 2 and 3 have multiple + strata = np.array([0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3]) + psu = np.array([0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]) + + n_unique_psu = len(np.unique(psu)) + resolved_remove = ResolvedSurveyDesign( + weights=np.ones(n), + weight_type="pweight", + strata=strata, + psu=psu, + fpc=None, + n_strata=4, + n_psu=n_unique_psu, + lonely_psu="remove", + ) + resolved_adjust = ResolvedSurveyDesign( + weights=np.ones(n), + weight_type="pweight", + strata=strata, + psu=psu, + fpc=None, + n_strata=4, + n_psu=n_unique_psu, + lonely_psu="adjust", + ) + + rng = np.random.default_rng(42) + w_remove, _ = generate_survey_multiplier_weights_batch( + 100, + resolved_remove, + "rademacher", + rng, + ) + rng = np.random.default_rng(42) + w_adjust, _ = generate_survey_multiplier_weights_batch( + 100, + resolved_adjust, + "rademacher", + rng, + ) + + # With remove, column 0 (singleton PSU) is all zeros + assert np.all(w_remove[:, 0] == 0.0) + # With adjust, column 0 gets nonzero pooled weights + assert not np.all(w_adjust[:, 0] == 0.0) + + def test_adjust_all_singleton(self): + """All singleton strata: adjust pools them into one pseudo-stratum.""" + from diff_diff import CallawaySantAnna + + np.random.seed(99) + n_units, n_periods = 30, 6 + rows = [] + for i in range(n_units): + ft = 4 if i < 10 else (6 if i < 20 else 0) + for t in range(1, n_periods + 1): + y = 5.0 + i * 0.02 + t * 0.1 + if ft > 0 and t >= ft: + y += 1.5 + y += np.random.normal(0, 0.3) + rows.append( + { + "unit": i, + "time": t, + "first_treat": ft, + "outcome": y, + "weight": 1.0, + # Every unit is its own stratum (all singletons) + "stratum": i, + "psu": i, + } + ) + data = pd.DataFrame(rows) + + sd = SurveyDesign( + weights="weight", + strata="stratum", + psu="psu", + lonely_psu="adjust", + ) + est = CallawaySantAnna(n_bootstrap=50, seed=42) + result = est.fit( + data, + outcome="outcome", + unit="unit", + time="time", + first_treat="first_treat", + survey_design=sd, + ) + assert np.isfinite(result.overall_att) + assert np.isfinite(result.overall_se) + assert result.overall_se > 0 + + +# =========================================================================== +# 8e-i: CV on Estimates +# =========================================================================== + + +class TestCoefficientOfVariation: + """Tests for coef_var property on results classes.""" + + def test_coef_var_basic(self): + """Known values: att=2.0, se=0.5 -> CV=0.25.""" + from diff_diff.results import DiDResults + + r = DiDResults( + att=2.0, + se=0.5, + t_stat=4.0, + p_value=0.0001, + conf_int=(1.0, 3.0), + n_obs=100, + n_treated=50, + n_control=50, + ) + assert r.coef_var == pytest.approx(0.25) + + def test_coef_var_zero_estimate(self): + """ATT=0 -> CV=NaN.""" + from diff_diff.results import DiDResults + + r = DiDResults( + att=0.0, + se=0.5, + t_stat=0.0, + p_value=1.0, + conf_int=(-1.0, 1.0), + n_obs=100, + n_treated=50, + n_control=50, + ) + assert np.isnan(r.coef_var) + + def test_coef_var_nan_se(self): + """NaN SE -> CV=NaN.""" + from diff_diff.results import DiDResults + + r = DiDResults( + att=2.0, + se=np.nan, + t_stat=np.nan, + p_value=np.nan, + conf_int=(np.nan, np.nan), + n_obs=100, + n_treated=50, + n_control=50, + ) + assert np.isnan(r.coef_var) + + def test_coef_var_in_summary(self): + """CV appears in summary output.""" + from diff_diff.results import DiDResults + + r = DiDResults( + att=2.0, + se=0.5, + t_stat=4.0, + p_value=0.0001, + conf_int=(1.0, 3.0), + n_obs=100, + n_treated=50, + n_control=50, + ) + assert "CV" in r.summary() + + def test_coef_var_overall_att_pattern(self): + """CV works on overall_att/overall_se pattern (CallawaySantAnna).""" + from diff_diff import CallawaySantAnna + + np.random.seed(42) + n_units, n_periods = 30, 6 + rows = [] + for i in range(n_units): + ft = 4 if i < 10 else (6 if i < 20 else 0) + for t in range(1, n_periods + 1): + y = 5.0 + i * 0.02 + t * 0.1 + if ft > 0 and t >= ft: + y += 1.5 + y += np.random.normal(0, 0.3) + rows.append({"unit": i, "time": t, "first_treat": ft, "outcome": y}) + data = pd.DataFrame(rows) + + result = CallawaySantAnna().fit( + data, + outcome="outcome", + unit="unit", + time="time", + first_treat="first_treat", + ) + cv = result.coef_var + assert np.isfinite(cv) + assert cv == pytest.approx(result.overall_se / abs(result.overall_att)) + + +# =========================================================================== +# 8e-ii: Weight Trimming +# =========================================================================== + + +class TestTrimWeights: + """Tests for trim_weights utility.""" + + def test_trim_upper(self): + """Upper cap trims large weights.""" + from diff_diff.prep import trim_weights + + data = pd.DataFrame({"w": [1.0, 2.0, 5.0, 10.0, 20.0]}) + result = trim_weights(data, "w", upper=5.0) + assert result["w"].max() == 5.0 + assert result["w"].tolist() == [1.0, 2.0, 5.0, 5.0, 5.0] + + def test_trim_quantile(self): + """Quantile-based trimming.""" + from diff_diff.prep import trim_weights + + np.random.seed(42) + data = pd.DataFrame({"w": np.random.exponential(1.0, 1000)}) + result = trim_weights(data, "w", quantile=0.95) + q95 = np.quantile(data["w"].values, 0.95) + assert result["w"].max() == pytest.approx(q95) + + def test_trim_lower(self): + """Lower floor raises small weights.""" + from diff_diff.prep import trim_weights + + data = pd.DataFrame({"w": [0.01, 0.1, 1.0, 5.0]}) + result = trim_weights(data, "w", lower=0.5) + assert result["w"].min() == 0.5 + + def test_trim_returns_copy(self): + """Original DataFrame is not modified.""" + from diff_diff.prep import trim_weights + + data = pd.DataFrame({"w": [1.0, 10.0, 100.0]}) + original_max = data["w"].max() + _ = trim_weights(data, "w", upper=5.0) + assert data["w"].max() == original_max + + def test_trim_upper_and_quantile_raises(self): + """Both upper and quantile raises ValueError.""" + from diff_diff.prep import trim_weights + + data = pd.DataFrame({"w": [1.0, 2.0]}) + with pytest.raises(ValueError, match="upper.*quantile"): + trim_weights(data, "w", upper=5.0, quantile=0.95) From f997ece20a03b1def9aff3c11d4caa6a2c0fe285 Mon Sep 17 00:00:00 2001 From: igerber Date: Fri, 3 Apr 2026 11:07:03 -0400 Subject: [PATCH 02/13] Add survey support for ImputationDiD pretrends (8e-iii) Thread survey weights through _compute_lead_coefficients(): - Pass survey weights to _iterative_demean() for weighted demeaning - Pass survey weights to solve_ols() for WLS point estimates - Replace cluster-robust VCV with compute_survey_vcov() for design-based inference (strata/PSU/FPC) - Use survey df in safe_inference() for t-distribution critical values - Adjust F-test denominator df in _pretrend_test() to use survey df Persist resolved_survey and survey_weights in _fit_data dict so _pretrend_test() can access them post-fit. Subset resolved_survey to Omega_0 via dataclasses.replace() before passing to lead regression. Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/imputation.py | 123 +++++++++++++++++++++++++++++------- tests/test_survey_phase8.py | 74 ++++++++++++++++++++++ 2 files changed, 173 insertions(+), 24 deletions(-) diff --git a/diff_diff/imputation.py b/diff_diff/imputation.py index 676eda7b..a37d3653 100644 --- a/diff_diff/imputation.py +++ b/diff_diff/imputation.py @@ -236,13 +236,8 @@ def fit( if missing: raise ValueError(f"Missing columns: {missing}") - if self.pretrends and survey_design is not None and aggregate in ("event_study", "all"): - raise NotImplementedError( - "pretrends=True is not yet compatible with survey_design. " - "The pre-period lead regression uses unweighted demeaning, " - "which does not account for survey weights. Use pretrends=False " - "with survey_design for now." - ) + # pretrends + survey_design is supported (Phase 8e-iii): + # survey weights are threaded through _compute_lead_coefficients(). # Create working copy df = data.copy() @@ -734,6 +729,8 @@ def _refit_imp(w_r): "delta_hat": delta_hat, "kept_cov_mask": kept_cov_mask, "survey_design": survey_design, + "resolved_survey": resolved_survey, + "survey_weights": survey_weights, } # Pre-compute cluster psi sums for bootstrap @@ -1724,6 +1721,32 @@ def _aggregate_event_study( if self.horizon_max is not None: pre_rel_times = [h for h in pre_rel_times if abs(h) <= self.horizon_max] if pre_rel_times: + # Build Omega_0 survey subsetting for pretrends + _sw_0_pre = None + _rs_0_pre = None + if survey_weights is not None and resolved_survey is not None: + _sw_0_pre = survey_weights[omega_0_mask.values] + from dataclasses import replace as dc_replace + + _rs_0_pre = dc_replace( + resolved_survey, + weights=resolved_survey.weights[omega_0_mask.values], + strata=( + resolved_survey.strata[omega_0_mask.values] + if resolved_survey.strata is not None + else None + ), + psu=( + resolved_survey.psu[omega_0_mask.values] + if resolved_survey.psu is not None + else None + ), + fpc=( + resolved_survey.fpc[omega_0_mask.values] + if resolved_survey.fpc is not None + else None + ), + ) pre_effects, _, _ = self._compute_lead_coefficients( df_0, outcome, @@ -1735,6 +1758,8 @@ def _aggregate_event_study( pre_rel_times, alpha=self.alpha, balanced_cohorts=balanced_cohorts, + survey_weights_0=_sw_0_pre, + resolved_survey_0=_rs_0_pre, ) event_study_effects.update(pre_effects) @@ -1991,6 +2016,8 @@ def _compute_lead_coefficients( pre_rel_times: List[int], alpha: float = 0.05, balanced_cohorts: Optional[set] = None, + survey_weights_0: Optional[np.ndarray] = None, + resolved_survey_0=None, ) -> Tuple[Dict[int, Dict[str, Any]], np.ndarray, np.ndarray]: """ Compute pre-period lead coefficients via within-transformed OLS (Test 1). @@ -2032,9 +2059,13 @@ def _compute_lead_coefficients( df_0[col_name] = indicator lead_cols.append(col_name) - # Within-transform via iterative demeaning + # Within-transform via iterative demeaning (survey-weighted when present) y_dm = self._iterative_demean( - df_0[outcome].values, df_0[unit].values, df_0[time].values, df_0.index + df_0[outcome].values, + df_0[unit].values, + df_0[time].values, + df_0.index, + weights=survey_weights_0, ) all_x_cols = lead_cols[:] @@ -2044,18 +2075,26 @@ def _compute_lead_coefficients( X_dm = np.column_stack( [ self._iterative_demean( - df_0[col].values, df_0[unit].values, df_0[time].values, df_0.index + df_0[col].values, + df_0[unit].values, + df_0[time].values, + df_0.index, + weights=survey_weights_0, ) for col in all_x_cols ] ) - # OLS with cluster-robust SEs + # OLS with cluster-robust SEs (WLS when survey weights present) cluster_ids = df_0[cluster_var].values + _ols_weights = survey_weights_0 + _ols_weight_type = "pweight" if survey_weights_0 is not None else None try: result = solve_ols( X_dm, y_dm, + weights=_ols_weights, + weight_type=_ols_weight_type, cluster_ids=cluster_ids, return_vcov=True, rank_deficient_action=self.rank_deficient_action, @@ -2086,10 +2125,20 @@ def _compute_lead_coefficients( vcov = result[2] assert vcov is not None + # Replace cluster-robust VCV with survey design-based VCV when present + if resolved_survey_0 is not None: + from diff_diff.survey import compute_survey_vcov + + residuals = y_dm - X_dm @ coefficients + vcov = compute_survey_vcov(X_dm, residuals, resolved_survey_0) + n_leads = len(lead_cols) gamma = coefficients[:n_leads] V_gamma = vcov[:n_leads, :n_leads] + # Use survey df for t-distribution inference when present + _df = resolved_survey_0.df_survey if resolved_survey_0 is not None else None + # Build per-horizon effects effects = {} for j, h in enumerate(pre_rel_times): @@ -2097,7 +2146,7 @@ def _compute_lead_coefficients( se = float(np.sqrt(max(V_gamma[j, j], 0.0))) # n_obs from the lead indicator (respects balanced_cohorts restriction) n_obs = int(df_0[f"_lead_{h}"].sum()) - t_stat, p_value, conf_int = safe_inference(effect, se, alpha=alpha) + t_stat, p_value, conf_int = safe_inference(effect, se, alpha=alpha, df=_df) effects[h] = { "effect": effect, "se": se, @@ -2123,14 +2172,6 @@ def _pretrend_test(self, n_leads: Optional[int] = None) -> Dict[str, Any]: if self._fit_data is None: raise RuntimeError("Must call fit() before pretrend_test().") - if self._fit_data.get("survey_design") is not None: - raise NotImplementedError( - "pretrend_test() is not yet survey-aware. The pre-trend F-test " - "uses unweighted demeaning and cluster-count degrees of freedom, " - "which do not account for survey weights. Survey-weighted " - "pretrend_test() is planned for future work." - ) - fd = self._fit_data df = fd["df"] outcome = fd["outcome"] @@ -2140,6 +2181,8 @@ def _pretrend_test(self, n_leads: Optional[int] = None) -> Dict[str, Any]: covariates = fd["covariates"] omega_0_mask = fd["omega_0_mask"] cluster_var = fd["cluster_var"] + resolved_survey = fd.get("resolved_survey") + survey_weights = fd.get("survey_weights") df_0 = df.loc[omega_0_mask].copy() @@ -2181,6 +2224,33 @@ def _pretrend_test(self, n_leads: Optional[int] = None) -> Dict[str, Any]: "lead_coefficients": {}, } + # Build Omega_0 survey subsetting for pretrends + _sw_0_pt = None + _rs_0_pt = None + if survey_weights is not None and resolved_survey is not None: + _sw_0_pt = survey_weights[omega_0_mask.values] + from dataclasses import replace as dc_replace + + _rs_0_pt = dc_replace( + resolved_survey, + weights=resolved_survey.weights[omega_0_mask.values], + strata=( + resolved_survey.strata[omega_0_mask.values] + if resolved_survey.strata is not None + else None + ), + psu=( + resolved_survey.psu[omega_0_mask.values] + if resolved_survey.psu is not None + else None + ), + fpc=( + resolved_survey.fpc[omega_0_mask.values] + if resolved_survey.fpc is not None + else None + ), + ) + # Use shared lead coefficient computation effects, gamma, V_gamma = self._compute_lead_coefficients( df_0, @@ -2192,6 +2262,8 @@ def _pretrend_test(self, n_leads: Optional[int] = None) -> Dict[str, Any]: cluster_var, pre_rel_times, alpha=self.alpha, + survey_weights_0=_sw_0_pt, + resolved_survey_0=_rs_0_pt, ) n_leads_actual = len(pre_rel_times) @@ -2204,11 +2276,14 @@ def _pretrend_test(self, n_leads: Optional[int] = None) -> Dict[str, Any]: except np.linalg.LinAlgError: f_stat = np.nan - # P-value from F distribution - cluster_ids = df_0[cluster_var].values + # P-value from F distribution (survey df when available) if np.isfinite(f_stat) and f_stat >= 0: - n_clusters = len(np.unique(cluster_ids)) - df_denom = max(n_clusters - 1, 1) + if resolved_survey is not None and resolved_survey.df_survey is not None: + df_denom = max(resolved_survey.df_survey, 1) + else: + cluster_ids = df_0[cluster_var].values + n_clusters = len(np.unique(cluster_ids)) + df_denom = max(n_clusters - 1, 1) p_value = float(stats.f.sf(f_stat, n_leads_actual, df_denom)) else: p_value = np.nan diff --git a/tests/test_survey_phase8.py b/tests/test_survey_phase8.py index 7df22655..ea60fc38 100644 --- a/tests/test_survey_phase8.py +++ b/tests/test_survey_phase8.py @@ -1063,3 +1063,77 @@ def test_trim_upper_and_quantile_raises(self): data = pd.DataFrame({"w": [1.0, 2.0]}) with pytest.raises(ValueError, match="upper.*quantile"): trim_weights(data, "w", upper=5.0, quantile=0.95) + + +# =========================================================================== +# 8e-iii: ImputationDiD Pretrends + Survey +# =========================================================================== + + +class TestImputationPretrendsSurvey: + """Tests for pretrends + survey support in ImputationDiD.""" + + def test_pretrends_survey_no_raise(self, staggered_data): + """ImputationDiD with pretrends=True + survey runs without error.""" + from diff_diff import ImputationDiD + + data = staggered_data + sd = SurveyDesign(weights="weight", strata="stratum", psu="psu") + est = ImputationDiD(pretrends=True, horizon_max=3) + result = est.fit( + data, + outcome="outcome", + unit="unit", + time="time", + first_treat="first_treat", + survey_design=sd, + aggregate="event_study", + ) + assert result.event_study_effects is not None + # Should have negative-horizon (pre-period) effects + pre_horizons = [h for h in result.event_study_effects if h < 0] + assert len(pre_horizons) > 0 + + def test_pretrends_survey_finite_se(self, staggered_data): + """Pre-period lead SEs are finite with survey design.""" + from diff_diff import ImputationDiD + + data = staggered_data + sd = SurveyDesign(weights="weight", strata="stratum", psu="psu") + est = ImputationDiD(pretrends=True, horizon_max=3) + result = est.fit( + data, + outcome="outcome", + unit="unit", + time="time", + first_treat="first_treat", + survey_design=sd, + aggregate="event_study", + ) + for h, eff in result.event_study_effects.items(): + if h < -1: # Skip reference period (h=-1) + assert np.isfinite(eff["se"]), f"Non-finite SE at pre-horizon {h}" + assert eff["se"] > 0, f"Zero SE at pre-horizon {h}" + + def test_pretrend_test_survey_no_raise(self, staggered_data): + """pretrend_test() works with survey design.""" + from diff_diff import ImputationDiD + + data = staggered_data + sd = SurveyDesign(weights="weight", strata="stratum", psu="psu") + est = ImputationDiD(pretrends=True, horizon_max=3) + est.fit( + data, + outcome="outcome", + unit="unit", + time="time", + first_treat="first_treat", + survey_design=sd, + aggregate="event_study", + ) + pt = est._pretrend_test() + assert "f_stat" in pt + assert "p_value" in pt + if pt["n_leads"] > 0: + assert np.isfinite(pt["f_stat"]) + assert np.isfinite(pt["p_value"]) From 713f261e4259346c70e09708cc4ee9c8fd8d050a Mon Sep 17 00:00:00 2001 From: igerber Date: Fri, 3 Apr 2026 13:01:45 -0400 Subject: [PATCH 03/13] Address AI review P1/P2/P3 findings for PR #260 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P1: Use consistent full-design df_survey for both per-lead inference and F-test in ImputationDiD pretrends. Add explicit survey_df parameter to _compute_lead_coefficients() passed from the full resolved_survey. P1: Document bootstrap lonely_psu="adjust" pseudo-stratum pooling in REGISTRY.md with deviation note listing all affected estimators. P1: Add Rao-Wu lonely_psu="adjust" test (direct helper test). P2: Fix stale test_imputation_pretrends_survey_raises in test_pretrends_event_study.py — now expects success. P2: Update REGISTRY.md pretrends+survey note to reflect support. P3: Add trim_weights to __all__ in __init__.py. Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/__init__.py | 1 + diff_diff/imputation.py | 11 ++- docs/methodology/REGISTRY.md | 12 ++- tests/test_pretrends_event_study.py | 121 +++++++++++++--------------- tests/test_survey_phase8.py | 38 +++++++++ 5 files changed, 111 insertions(+), 72 deletions(-) diff --git a/diff_diff/__init__.py b/diff_diff/__init__.py index 6425b9fb..1012fa34 100644 --- a/diff_diff/__init__.py +++ b/diff_diff/__init__.py @@ -308,6 +308,7 @@ "make_post_indicator", "wide_to_long", "balance_panel", + "trim_weights", "validate_did_data", "summarize_did_data", "generate_did_data", diff --git a/diff_diff/imputation.py b/diff_diff/imputation.py index a37d3653..4804402b 100644 --- a/diff_diff/imputation.py +++ b/diff_diff/imputation.py @@ -1747,6 +1747,10 @@ def _aggregate_event_study( else None ), ) + # Use full-design df for consistent inference + _survey_df_pre = ( + resolved_survey.df_survey if resolved_survey is not None else None + ) pre_effects, _, _ = self._compute_lead_coefficients( df_0, outcome, @@ -1760,6 +1764,7 @@ def _aggregate_event_study( balanced_cohorts=balanced_cohorts, survey_weights_0=_sw_0_pre, resolved_survey_0=_rs_0_pre, + survey_df=_survey_df_pre, ) event_study_effects.update(pre_effects) @@ -2018,6 +2023,7 @@ def _compute_lead_coefficients( balanced_cohorts: Optional[set] = None, survey_weights_0: Optional[np.ndarray] = None, resolved_survey_0=None, + survey_df: Optional[int] = None, ) -> Tuple[Dict[int, Dict[str, Any]], np.ndarray, np.ndarray]: """ Compute pre-period lead coefficients via within-transformed OLS (Test 1). @@ -2136,8 +2142,8 @@ def _compute_lead_coefficients( gamma = coefficients[:n_leads] V_gamma = vcov[:n_leads, :n_leads] - # Use survey df for t-distribution inference when present - _df = resolved_survey_0.df_survey if resolved_survey_0 is not None else None + # Use full-design survey df for t-distribution inference + _df = survey_df # Build per-horizon effects effects = {} @@ -2264,6 +2270,7 @@ def _pretrend_test(self, n_leads: Optional[int] = None) -> Dict[str, Any]: alpha=self.alpha, survey_weights_0=_sw_0_pt, resolved_survey_0=_rs_0_pt, + survey_df=(resolved_survey.df_survey if resolved_survey is not None else None), ) n_leads_actual = len(pre_rel_times) diff --git a/docs/methodology/REGISTRY.md b/docs/methodology/REGISTRY.md index f6e9882c..df120d9d 100644 --- a/docs/methodology/REGISTRY.md +++ b/docs/methodology/REGISTRY.md @@ -864,7 +864,7 @@ where `W_it(h) = 1[K_it = h]` are lead indicators, estimated on `Omega_0` only. - Bootstrap does not update pre-period SEs (they are from the lead regression) - When `balance_e` is set, lead indicators are restricted to balanced cohorts; the full Omega_0 sample (including never-treated) is kept for within-transformation - Only affects event study aggregation; overall ATT and group aggregation unchanged -- **Note:** `pretrends=True` with `survey_design` raises `NotImplementedError` because the lead regression uses unweighted demeaning (same limitation as `pretrend_test()`) +- **Note:** `pretrends=True` with `survey_design` is supported. The lead regression uses survey-weighted demeaning, WLS point estimates, and `compute_survey_vcov()` for design-based VCV. The F-test in `pretrend_test()` uses the full-design `df_survey` as denominator df. The Omega_0 subset of `resolved_survey` is used for VCV computation (array alignment), but inference df comes from the full design for consistency. *Edge cases:* - **Unbalanced panels:** FE estimated via iterative alternating projection (Gauss-Seidel), equivalent to OLS with unit+time dummies. Converges in O(max_iter) passes; typically 5-20 iterations for unbalanced panels, 1-2 for balanced. One-pass demeaning is only exact for balanced panels. @@ -2214,9 +2214,13 @@ ContinuousDiD, EfficientDiD): Rescaled weight: `w*_i = w_i * (n_h / m_h) * r_hi` where `r_hi` = count of PSU *i* drawn. - **Note:** FPC enters through the resample size `m_h`, not as a post-hoc scaling factor. When `f_h >= 1` (census stratum), observations keep original weights (zero variance). -- **Note:** Bootstrap paths support `lonely_psu="remove"` and `"certainty"` only. - `lonely_psu="adjust"` raises `NotImplementedError` for survey-aware bootstrap; - use analytical inference for designs requiring `adjust` semantics. +- **Note:** Bootstrap paths support all three `lonely_psu` modes: `"remove"`, `"certainty"`, + and `"adjust"`. For `"adjust"`, singleton PSUs from different strata are pooled into a + combined pseudo-stratum and weights are generated for the pooled group. This is the + bootstrap analogue of the TSL "adjust" behavior (centering around the global mean). + Applies to both multiplier bootstrap (CallawaySantAnna, ImputationDiD, TwoStageDiD, + ContinuousDiD, EfficientDiD) and Rao-Wu bootstrap (SunAbraham, SyntheticDiD, TROP). + FPC scaling is skipped for pooled singletons (conservative). Reference: Rust & Rao (1996). - **Deviation from R:** For the no-FPC case (`m_h = n_h - 1`), this matches R `survey::as.svrepdesign(type="subbootstrap")`. The FPC-adjusted resample size `m_h = round((1-f_h)*(n_h-1))` follows Rao, Wu & Yue (1992) Section 3. diff --git a/tests/test_pretrends_event_study.py b/tests/test_pretrends_event_study.py index 0e059bf0..a0ab8990 100644 --- a/tests/test_pretrends_event_study.py +++ b/tests/test_pretrends_event_study.py @@ -7,7 +7,6 @@ import numpy as np import pandas as pd -import pytest from diff_diff.imputation import ImputationDiD from diff_diff.two_stage import TwoStageDiD @@ -49,10 +48,7 @@ def generate_test_data( effect = treatment_effect * dynamic_mult outcomes = ( - unit_fe_expanded - + time_fe_expanded - + effect * post - + rng.standard_normal(len(units)) * 0.5 + unit_fe_expanded + time_fe_expanded + effect * post + rng.standard_normal(len(units)) * 0.5 ) return pd.DataFrame( @@ -111,9 +107,9 @@ def test_pretrends_coefficients_near_zero(self): if h >= 0 or h == ref_period: continue assert np.isfinite(eff["effect"]), f"h={h}: effect not finite" - assert abs(eff["effect"]) < 3 * eff["se"] + 0.5, ( - f"h={h}: pre-period effect {eff['effect']:.3f} too large" - ) + assert ( + abs(eff["effect"]) < 3 * eff["se"] + 0.5 + ), f"h={h}: pre-period effect {eff['effect']:.3f} too large" def test_pretrends_se_finite_positive(self): """All pre-period horizons have finite, positive SEs.""" @@ -212,9 +208,7 @@ def test_post_treatment_invariance(self): assert h in results_on.event_study_effects eff_off = results_off.event_study_effects[h] eff_on = results_on.event_study_effects[h] - np.testing.assert_allclose( - eff_off["effect"], eff_on["effect"], rtol=1e-10 - ) + np.testing.assert_allclose(eff_off["effect"], eff_on["effect"], rtol=1e-10) np.testing.assert_allclose(eff_off["se"], eff_on["se"], rtol=1e-10) def test_horizon_max_interaction(self): @@ -271,9 +265,7 @@ def test_no_pretreatment_obs_graceful(self): "unit": np.repeat(np.arange(n_units), n_periods), "time": np.tile(np.arange(n_periods), n_units), "outcome": rng.standard_normal(n_units * n_periods), - "first_treat": np.repeat( - np.ones(n_units, dtype=int), n_periods - ), + "first_treat": np.repeat(np.ones(n_units, dtype=int), n_periods), } ) @@ -335,9 +327,9 @@ def test_pretrends_coefficients_near_zero(self): if h >= 0 or h == ref_period: continue assert np.isfinite(eff["effect"]), f"h={h}: effect not finite" - assert abs(eff["effect"]) < 3 * eff["se"] + 0.5, ( - f"h={h}: pre-period effect {eff['effect']:.3f} too large" - ) + assert ( + abs(eff["effect"]) < 3 * eff["se"] + 0.5 + ), f"h={h}: pre-period effect {eff['effect']:.3f} too large" def test_pretrends_se_finite_positive(self): """All pre-period horizons have finite, positive SEs.""" @@ -391,9 +383,7 @@ def test_post_treatment_invariance(self): assert h in results_on.event_study_effects eff_off = results_off.event_study_effects[h] eff_on = results_on.event_study_effects[h] - np.testing.assert_allclose( - eff_off["effect"], eff_on["effect"], rtol=1e-10 - ) + np.testing.assert_allclose(eff_off["effect"], eff_on["effect"], rtol=1e-10) np.testing.assert_allclose(eff_off["se"], eff_on["se"], rtol=1e-10) def test_get_params_includes_pretrends(self): @@ -466,9 +456,7 @@ def test_imputation_pretrends_match_pretrend_test(self): # Every lead coefficient should match the event study pre-period effect for h, coef in lead_coefs.items(): - assert h in results.event_study_effects, ( - f"h={h}: lead coefficient not in event study" - ) + assert h in results.event_study_effects, f"h={h}: lead coefficient not in event study" es_effect = results.event_study_effects[h]["effect"] np.testing.assert_allclose( es_effect, @@ -498,9 +486,7 @@ def test_imputation_survey_weighted_no_covariates(self): rng = np.random.default_rng(42) n_units = data["unit"].nunique() unit_weights = rng.uniform(0.5, 2.0, n_units) - data["weight"] = data["unit"].map( - dict(enumerate(unit_weights)) - ) + data["weight"] = data["unit"].map(dict(enumerate(unit_weights))) sd = SurveyDesign(weights="weight") est = ImputationDiD() @@ -525,9 +511,7 @@ def test_imputation_survey_weighted_event_study(self): rng = np.random.default_rng(42) n_units = data["unit"].nunique() unit_weights = rng.uniform(0.5, 2.0, n_units) - data["weight"] = data["unit"].map( - dict(enumerate(unit_weights)) - ) + data["weight"] = data["unit"].map(dict(enumerate(unit_weights))) sd = SurveyDesign(weights="weight") est = ImputationDiD() @@ -567,9 +551,7 @@ def test_imputation_pretrends_with_covariates(self): ) negative = { - h: v - for h, v in results.event_study_effects.items() - if h < -1 and v["n_obs"] > 0 + h: v for h, v in results.event_study_effects.items() if h < -1 and v["n_obs"] > 0 } assert len(negative) > 0, "Should have pre-period horizons" for h, eff in negative.items(): @@ -595,9 +577,7 @@ def test_imputation_pretrends_bootstrap_post_only(self): ) # With bootstrap - results_boot = ImputationDiD( - pretrends=True, n_bootstrap=50, seed=42 - ).fit( + results_boot = ImputationDiD(pretrends=True, n_bootstrap=50, seed=42).fit( data, outcome="outcome", unit="unit", @@ -613,11 +593,15 @@ def test_imputation_pretrends_bootstrap_post_only(self): eff_nb = results_no_boot.event_study_effects[h] eff_b = results_boot.event_study_effects[h] np.testing.assert_allclose( - eff_nb["effect"], eff_b["effect"], rtol=1e-10, + eff_nb["effect"], + eff_b["effect"], + rtol=1e-10, err_msg=f"h={h}: bootstrap changed pre-period effect", ) np.testing.assert_allclose( - eff_nb["se"], eff_b["se"], rtol=1e-10, + eff_nb["se"], + eff_b["se"], + rtol=1e-10, err_msg=f"h={h}: bootstrap changed pre-period SE", ) @@ -672,19 +656,27 @@ def test_nondefault_index_analytical(self): eff_p = results_perm.event_study_effects[h] eff_g = results_gap.event_study_effects[h] np.testing.assert_allclose( - eff_d["effect"], eff_p["effect"], rtol=1e-10, + eff_d["effect"], + eff_p["effect"], + rtol=1e-10, err_msg=f"h={h}: permuted index changed effect", ) np.testing.assert_allclose( - eff_d["se"], eff_p["se"], rtol=1e-10, + eff_d["se"], + eff_p["se"], + rtol=1e-10, err_msg=f"h={h}: permuted index changed SE", ) np.testing.assert_allclose( - eff_d["effect"], eff_g["effect"], rtol=1e-10, + eff_d["effect"], + eff_g["effect"], + rtol=1e-10, err_msg=f"h={h}: gapped index changed effect", ) np.testing.assert_allclose( - eff_d["se"], eff_g["se"], rtol=1e-10, + eff_d["se"], + eff_g["se"], + rtol=1e-10, err_msg=f"h={h}: gapped index changed SE", ) @@ -695,9 +687,7 @@ def test_nondefault_index_bootstrap(self): """ data = generate_test_data(seed=42) - results_default = ImputationDiD( - pretrends=True, n_bootstrap=50, seed=99 - ).fit( + results_default = ImputationDiD(pretrends=True, n_bootstrap=50, seed=99).fit( data.copy(), outcome="outcome", unit="unit", @@ -709,9 +699,7 @@ def test_nondefault_index_bootstrap(self): rng = np.random.default_rng(42) data_perm = data.copy() data_perm.index = rng.permutation(len(data_perm)) - results_perm = ImputationDiD( - pretrends=True, n_bootstrap=50, seed=99 - ).fit( + results_perm = ImputationDiD(pretrends=True, n_bootstrap=50, seed=99).fit( data_perm, outcome="outcome", unit="unit", @@ -726,16 +714,20 @@ def test_nondefault_index_bootstrap(self): eff_d = results_default.event_study_effects[h] eff_p = results_perm.event_study_effects[h] np.testing.assert_allclose( - eff_d["effect"], eff_p["effect"], rtol=1e-10, + eff_d["effect"], + eff_p["effect"], + rtol=1e-10, err_msg=f"h={h}: permuted index changed effect", ) np.testing.assert_allclose( - eff_d["se"], eff_p["se"], rtol=1e-10, + eff_d["se"], + eff_p["se"], + rtol=1e-10, err_msg=f"h={h}: permuted index changed bootstrap SE", ) - def test_imputation_pretrends_survey_raises(self): - """pretrends=True + survey_design raises NotImplementedError.""" + def test_imputation_pretrends_survey_accepted(self): + """pretrends=True + survey_design is now supported (Phase 8e-iii).""" from diff_diff.survey import SurveyDesign data = generate_test_data(seed=42) @@ -745,18 +737,17 @@ def test_imputation_pretrends_survey_raises(self): sd = SurveyDesign(weights="weight") est = ImputationDiD(pretrends=True) - with pytest.raises( - NotImplementedError, match="pretrends=True is not yet compatible" - ): - est.fit( - data, - outcome="outcome", - unit="unit", - time="time", - first_treat="first_treat", - aggregate="event_study", - survey_design=sd, - ) + result = est.fit( + data, + outcome="outcome", + unit="unit", + time="time", + first_treat="first_treat", + aggregate="event_study", + survey_design=sd, + ) + assert result.event_study_effects is not None + assert np.isfinite(result.overall_att) def test_imputation_pretrends_balance_e(self): """balance_e restricts pre-period lead regression to balanced cohorts.""" @@ -802,9 +793,7 @@ def test_two_stage_pretrends_bootstrap(self): ) negative = { - h: v - for h, v in results.event_study_effects.items() - if h < -1 and v["n_obs"] > 0 + h: v for h, v in results.event_study_effects.items() if h < -1 and v["n_obs"] > 0 } assert len(negative) > 0, "Should have pre-period horizons" for h, eff in negative.items(): diff --git a/tests/test_survey_phase8.py b/tests/test_survey_phase8.py index ea60fc38..3012c215 100644 --- a/tests/test_survey_phase8.py +++ b/tests/test_survey_phase8.py @@ -911,6 +911,44 @@ def test_adjust_all_singleton(self): assert result.overall_se > 0 +# =========================================================================== +class TestRaoWuLonelyPSUAdjust: + """Tests for lonely_psu='adjust' in Rao-Wu bootstrap (SunAbraham consumer).""" + + def test_rao_wu_adjust_direct(self): + """Direct test of generate_rao_wu_weights with lonely_psu='adjust'.""" + from diff_diff.bootstrap_utils import generate_rao_wu_weights + from diff_diff.survey import ResolvedSurveyDesign + + np.random.seed(42) + n = 20 + # Two singleton strata (0, 1), two multi-PSU strata (2, 3) + strata = np.array([0] * 5 + [1] * 5 + [2] * 5 + [3] * 5) + psu = np.array([0] * 5 + [1] * 5 + [2, 3, 4, 5, 6] + [7, 8, 9, 10, 11]) + + resolved = ResolvedSurveyDesign( + weights=np.ones(n), + weight_type="pweight", + strata=strata, + psu=psu, + fpc=None, + n_strata=4, + n_psu=12, + lonely_psu="adjust", + ) + # Run multiple draws to check that singleton weights are not constant + base = resolved.weights.copy() + any_different = False + for seed in range(42, 52): + rng = np.random.default_rng(seed) + rescaled = generate_rao_wu_weights(resolved, rng) + # Singleton PSU weights should vary across draws (pooled resampling) + if not np.allclose(rescaled[:5], base[:5]): + any_different = True + break + assert any_different, "Singleton PSU weights should vary with 'adjust'" + + # =========================================================================== # 8e-i: CV on Estimates # =========================================================================== From 0d916630782a55a4614966e4cb1139cbf35020bf Mon Sep 17 00:00:00 2001 From: igerber Date: Fri, 3 Apr 2026 13:14:27 -0400 Subject: [PATCH 04/13] Address P0: reject pretrends + replicate-weight survey designs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replicate-weight survey designs need per-replicate lead regression refits for pretrends, which are not yet implemented. Reject with NotImplementedError for now — analytical survey designs (strata/PSU/FPC) remain supported. Fixes: replicate designs silently falling back to unweighted pretrends, negative-horizon keys contaminating the replicate refit vector, and compute_survey_vcov being called for replicate designs. Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/imputation.py | 16 ++++++++++++++-- docs/methodology/REGISTRY.md | 2 +- tests/test_survey_phase8.py | 36 ++++++++++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 3 deletions(-) diff --git a/diff_diff/imputation.py b/diff_diff/imputation.py index 4804402b..60147723 100644 --- a/diff_diff/imputation.py +++ b/diff_diff/imputation.py @@ -236,8 +236,20 @@ def fit( if missing: raise ValueError(f"Missing columns: {missing}") - # pretrends + survey_design is supported (Phase 8e-iii): - # survey weights are threaded through _compute_lead_coefficients(). + # pretrends + analytical survey is supported (Phase 8e-iii). + # Replicate-weight surveys need per-replicate lead regression refits + # which are not yet implemented — reject that combination. + if ( + self.pretrends + and survey_design is not None + and survey_design.replicate_method is not None + and aggregate in ("event_study", "all") + ): + raise NotImplementedError( + "pretrends=True is not yet compatible with replicate-weight " + "survey designs. Analytical survey designs (strata/PSU/FPC) " + "are supported. Use pretrends=False with replicate weights." + ) # Create working copy df = data.copy() diff --git a/docs/methodology/REGISTRY.md b/docs/methodology/REGISTRY.md index df120d9d..d32a7f7b 100644 --- a/docs/methodology/REGISTRY.md +++ b/docs/methodology/REGISTRY.md @@ -864,7 +864,7 @@ where `W_it(h) = 1[K_it = h]` are lead indicators, estimated on `Omega_0` only. - Bootstrap does not update pre-period SEs (they are from the lead regression) - When `balance_e` is set, lead indicators are restricted to balanced cohorts; the full Omega_0 sample (including never-treated) is kept for within-transformation - Only affects event study aggregation; overall ATT and group aggregation unchanged -- **Note:** `pretrends=True` with `survey_design` is supported. The lead regression uses survey-weighted demeaning, WLS point estimates, and `compute_survey_vcov()` for design-based VCV. The F-test in `pretrend_test()` uses the full-design `df_survey` as denominator df. The Omega_0 subset of `resolved_survey` is used for VCV computation (array alignment), but inference df comes from the full design for consistency. +- **Note:** `pretrends=True` with analytical `survey_design` (strata/PSU/FPC) is supported. The lead regression uses survey-weighted demeaning, WLS point estimates, and `compute_survey_vcov()` for design-based VCV. The F-test in `pretrend_test()` uses the full-design `df_survey` as denominator df. Replicate-weight survey designs raise `NotImplementedError` with `pretrends=True` because per-replicate lead regression refits are not yet implemented. *Edge cases:* - **Unbalanced panels:** FE estimated via iterative alternating projection (Gauss-Seidel), equivalent to OLS with unit+time dummies. Converges in O(max_iter) passes; typically 5-20 iterations for unbalanced panels, 1-2 for balanced. One-pass demeaning is only exact for balanced panels. diff --git a/tests/test_survey_phase8.py b/tests/test_survey_phase8.py index 3012c215..e676f87b 100644 --- a/tests/test_survey_phase8.py +++ b/tests/test_survey_phase8.py @@ -1175,3 +1175,39 @@ def test_pretrend_test_survey_no_raise(self, staggered_data): if pt["n_leads"] > 0: assert np.isfinite(pt["f_stat"]) assert np.isfinite(pt["p_value"]) + + def test_pretrends_replicate_raises(self, staggered_data): + """pretrends=True + replicate-weight survey raises NotImplementedError.""" + from diff_diff import ImputationDiD + + data = staggered_data + # Add BRR replicate weights + rng = np.random.RandomState(42) + units = sorted(data["unit"].unique()) + rep_cols = [] + for r in range(10): + signs = rng.choice([-1, 1], size=len(units)) + sign_map = dict(zip(units, signs)) + pert = data["unit"].map(sign_map).values.astype(float) + w_r = data["weight"].values * (1.0 + 0.5 * pert) + w_r = np.maximum(w_r, 0.0) + col = f"brr_{r}" + data[col] = w_r + rep_cols.append(col) + + sd = SurveyDesign( + weights="weight", + replicate_weights=rep_cols, + replicate_method="BRR", + ) + est = ImputationDiD(pretrends=True, horizon_max=3) + with pytest.raises(NotImplementedError, match="replicate"): + est.fit( + data, + outcome="outcome", + unit="unit", + time="time", + first_treat="first_treat", + survey_design=sd, + aggregate="event_study", + ) From 84143550e5561011c498d5c5b62f800bdc8a21e6 Mon Sep 17 00:00:00 2001 From: igerber Date: Fri, 3 Apr 2026 13:52:27 -0400 Subject: [PATCH 05/13] Address P0/P1: block replicate pretrend_test, use solve_ols residuals MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P0: Block replicate-weight survey designs in _pretrend_test() — the public method was still accessible after a replicate fit, routing through compute_survey_vcov (TSL) instead of replicate variance. P1: Use residuals from solve_ols (result[1]) instead of recomputing y_dm - X_dm @ coefficients in the survey VCV path. solve_ols already handles rank-deficient fits by rebuilding fitted values from kept columns only, so residuals are finite even with NaN coefficients. P2: Add regression test for pretrend_test() rejection on replicate fits. Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/imputation.py | 14 +++++++++++++- tests/test_survey_phase8.py | 37 +++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/diff_diff/imputation.py b/diff_diff/imputation.py index 60147723..3d0bb015 100644 --- a/diff_diff/imputation.py +++ b/diff_diff/imputation.py @@ -2147,7 +2147,10 @@ def _compute_lead_coefficients( if resolved_survey_0 is not None: from diff_diff.survey import compute_survey_vcov - residuals = y_dm - X_dm @ coefficients + # Use residuals from solve_ols (safe for rank-deficient fits: + # solve_ols rebuilds fitted values from kept columns only, + # so residuals are finite even when some coefficients are NaN). + residuals = result[1] vcov = compute_survey_vcov(X_dm, residuals, resolved_survey_0) n_leads = len(lead_cols) @@ -2191,6 +2194,15 @@ def _pretrend_test(self, n_leads: Optional[int] = None) -> Dict[str, Any]: raise RuntimeError("Must call fit() before pretrend_test().") fd = self._fit_data + resolved_survey = fd.get("resolved_survey") + if resolved_survey is not None and resolved_survey.uses_replicate_variance: + raise NotImplementedError( + "pretrend_test() is not yet supported for replicate-weight " + "survey designs. Per-replicate Equation 9 lead regression " + "refits are not implemented. Use analytical survey designs " + "(strata/PSU/FPC) or call pretrend_test() without survey." + ) + df = fd["df"] outcome = fd["outcome"] unit = fd["unit"] diff --git a/tests/test_survey_phase8.py b/tests/test_survey_phase8.py index e676f87b..8bbeadf8 100644 --- a/tests/test_survey_phase8.py +++ b/tests/test_survey_phase8.py @@ -1211,3 +1211,40 @@ def test_pretrends_replicate_raises(self, staggered_data): survey_design=sd, aggregate="event_study", ) + + def test_pretrend_test_replicate_raises(self, staggered_data): + """pretrend_test() with replicate-weight survey raises NotImplementedError.""" + from diff_diff import ImputationDiD + + data = staggered_data + rng = np.random.RandomState(42) + units = sorted(data["unit"].unique()) + rep_cols = [] + for r in range(10): + signs = rng.choice([-1, 1], size=len(units)) + sign_map = dict(zip(units, signs)) + pert = data["unit"].map(sign_map).values.astype(float) + w_r = data["weight"].values * (1.0 + 0.5 * pert) + w_r = np.maximum(w_r, 0.0) + col = f"brr_{r}" + data[col] = w_r + rep_cols.append(col) + + sd = SurveyDesign( + weights="weight", + replicate_weights=rep_cols, + replicate_method="BRR", + ) + # fit() without pretrends succeeds + est = ImputationDiD(horizon_max=3) + est.fit( + data, + outcome="outcome", + unit="unit", + time="time", + first_treat="first_treat", + survey_design=sd, + ) + # But pretrend_test() should reject replicate designs + with pytest.raises(NotImplementedError, match="replicate"): + est._pretrend_test() From 01e9a8fd48fd42e2e7593c9f74cfe32bc7e9fa5d Mon Sep 17 00:00:00 2001 From: igerber Date: Fri, 3 Apr 2026 14:14:35 -0400 Subject: [PATCH 06/13] Address P0/P1/P2: F-test df, rank-deficient survey VCV, coef_var SE=0 P0: Don't coerce nonpositive survey df_survey to 1 in pretrend F-test. Return NaN p-value when df_survey <= 0 (variance unidentified). P1: Reduce to kept (finite-coefficient) columns before calling compute_survey_vcov in _compute_lead_coefficients, then expand VCV back to full matrix with NaN for dropped columns. Prevents singular X'WX on rank-deficient lead/covariate designs. P2: Change coef_var guard from se > 0 to se >= 0 across all 12 results classes. SE=0 with finite nonzero estimate (e.g., FPC census) now correctly returns CV=0.0 instead of NaN. Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/continuous_did_results.py | 2 +- diff_diff/efficient_did_results.py | 2 +- diff_diff/imputation.py | 26 +++++++++++++++++----- diff_diff/imputation_results.py | 2 +- diff_diff/results.py | 6 ++--- diff_diff/stacked_did_results.py | 2 +- diff_diff/staggered_results.py | 2 +- diff_diff/staggered_triple_diff_results.py | 2 +- diff_diff/sun_abraham.py | 2 +- diff_diff/trop_results.py | 2 +- diff_diff/two_stage_results.py | 2 +- 11 files changed, 32 insertions(+), 18 deletions(-) diff --git a/diff_diff/continuous_did_results.py b/diff_diff/continuous_did_results.py index ccbeeb60..f87f9c65 100644 --- a/diff_diff/continuous_did_results.py +++ b/diff_diff/continuous_did_results.py @@ -157,7 +157,7 @@ def __repr__(self) -> str: @property def coef_var(self) -> float: """Coefficient of variation: SE / |overall ATT|. NaN when ATT is 0 or SE non-finite.""" - if not (np.isfinite(self.overall_att_se) and self.overall_att_se > 0): + if not (np.isfinite(self.overall_att_se) and self.overall_att_se >= 0): return np.nan if not np.isfinite(self.overall_att) or self.overall_att == 0: return np.nan diff --git a/diff_diff/efficient_did_results.py b/diff_diff/efficient_did_results.py index 7b05430d..7f86f5a9 100644 --- a/diff_diff/efficient_did_results.py +++ b/diff_diff/efficient_did_results.py @@ -175,7 +175,7 @@ def __repr__(self) -> str: @property def coef_var(self) -> float: """Coefficient of variation: SE / |overall ATT|. NaN when ATT is 0 or SE non-finite.""" - if not (np.isfinite(self.overall_se) and self.overall_se > 0): + if not (np.isfinite(self.overall_se) and self.overall_se >= 0): return np.nan if not np.isfinite(self.overall_att) or self.overall_att == 0: return np.nan diff --git a/diff_diff/imputation.py b/diff_diff/imputation.py index 3d0bb015..b6a6582a 100644 --- a/diff_diff/imputation.py +++ b/diff_diff/imputation.py @@ -2147,11 +2147,22 @@ def _compute_lead_coefficients( if resolved_survey_0 is not None: from diff_diff.survey import compute_survey_vcov - # Use residuals from solve_ols (safe for rank-deficient fits: - # solve_ols rebuilds fitted values from kept columns only, - # so residuals are finite even when some coefficients are NaN). + # Use residuals from solve_ols (safe for rank-deficient fits). residuals = result[1] - vcov = compute_survey_vcov(X_dm, residuals, resolved_survey_0) + + # Reduce to kept (finite-coefficient) columns to avoid singular + # X'WX in compute_survey_vcov, then expand back. + kept_mask = np.isfinite(coefficients) + if np.all(kept_mask): + vcov = compute_survey_vcov(X_dm, residuals, resolved_survey_0) + else: + n_full = len(coefficients) + X_kept = X_dm[:, kept_mask] + vcov_kept = compute_survey_vcov(X_kept, residuals, resolved_survey_0) + # Expand back: NaN rows/cols for dropped columns + vcov = np.full((n_full, n_full), np.nan) + kept_idx = np.where(kept_mask)[0] + vcov[np.ix_(kept_idx, kept_idx)] = vcov_kept n_leads = len(lead_cols) gamma = coefficients[:n_leads] @@ -2310,12 +2321,15 @@ def _pretrend_test(self, n_leads: Optional[int] = None) -> Dict[str, Any]: # P-value from F distribution (survey df when available) if np.isfinite(f_stat) and f_stat >= 0: if resolved_survey is not None and resolved_survey.df_survey is not None: - df_denom = max(resolved_survey.df_survey, 1) + df_denom = resolved_survey.df_survey else: cluster_ids = df_0[cluster_var].values n_clusters = len(np.unique(cluster_ids)) df_denom = max(n_clusters - 1, 1) - p_value = float(stats.f.sf(f_stat, n_leads_actual, df_denom)) + if df_denom <= 0: + p_value = np.nan + else: + p_value = float(stats.f.sf(f_stat, n_leads_actual, df_denom)) else: p_value = np.nan diff --git a/diff_diff/imputation_results.py b/diff_diff/imputation_results.py index b27f1dd1..6031393f 100644 --- a/diff_diff/imputation_results.py +++ b/diff_diff/imputation_results.py @@ -155,7 +155,7 @@ def __repr__(self) -> str: @property def coef_var(self) -> float: """Coefficient of variation: SE / |overall ATT|. NaN when ATT is 0 or SE non-finite.""" - if not (np.isfinite(self.overall_se) and self.overall_se > 0): + if not (np.isfinite(self.overall_se) and self.overall_se >= 0): return np.nan if not np.isfinite(self.overall_att) or self.overall_att == 0: return np.nan diff --git a/diff_diff/results.py b/diff_diff/results.py index a9403c60..c92a3083 100644 --- a/diff_diff/results.py +++ b/diff_diff/results.py @@ -107,7 +107,7 @@ def __repr__(self) -> str: @property def coef_var(self) -> float: """Coefficient of variation: SE / |ATT|. NaN when ATT is 0 or SE non-finite.""" - if not (np.isfinite(self.se) and self.se > 0): + if not (np.isfinite(self.se) and self.se >= 0): return np.nan if not np.isfinite(self.att) or self.att == 0: return np.nan @@ -403,7 +403,7 @@ def post_period_effects(self) -> Dict[Any, PeriodEffect]: @property def coef_var(self) -> float: """Coefficient of variation: SE / |overall ATT|. NaN when ATT is 0 or SE non-finite.""" - if not (np.isfinite(self.avg_se) and self.avg_se > 0): + if not (np.isfinite(self.avg_se) and self.avg_se >= 0): return np.nan if not np.isfinite(self.avg_att) or self.avg_att == 0: return np.nan @@ -722,7 +722,7 @@ def __repr__(self) -> str: @property def coef_var(self) -> float: """Coefficient of variation: SE / |ATT|. NaN when ATT is 0 or SE non-finite.""" - if not (np.isfinite(self.se) and self.se > 0): + if not (np.isfinite(self.se) and self.se >= 0): return np.nan if not np.isfinite(self.att) or self.att == 0: return np.nan diff --git a/diff_diff/stacked_did_results.py b/diff_diff/stacked_did_results.py index f6348dfb..7a45a316 100644 --- a/diff_diff/stacked_did_results.py +++ b/diff_diff/stacked_did_results.py @@ -109,7 +109,7 @@ def __repr__(self) -> str: @property def coef_var(self) -> float: """Coefficient of variation: SE / |overall ATT|. NaN when ATT is 0 or SE non-finite.""" - if not (np.isfinite(self.overall_se) and self.overall_se > 0): + if not (np.isfinite(self.overall_se) and self.overall_se >= 0): return np.nan if not np.isfinite(self.overall_att) or self.overall_att == 0: return np.nan diff --git a/diff_diff/staggered_results.py b/diff_diff/staggered_results.py index a98ae303..2cb31d60 100644 --- a/diff_diff/staggered_results.py +++ b/diff_diff/staggered_results.py @@ -143,7 +143,7 @@ def __repr__(self) -> str: @property def coef_var(self) -> float: """Coefficient of variation: SE / |overall ATT|. NaN when ATT is 0 or SE non-finite.""" - if not (np.isfinite(self.overall_se) and self.overall_se > 0): + if not (np.isfinite(self.overall_se) and self.overall_se >= 0): return np.nan if not np.isfinite(self.overall_att) or self.overall_att == 0: return np.nan diff --git a/diff_diff/staggered_triple_diff_results.py b/diff_diff/staggered_triple_diff_results.py index dcc1db19..bc664d4a 100644 --- a/diff_diff/staggered_triple_diff_results.py +++ b/diff_diff/staggered_triple_diff_results.py @@ -103,7 +103,7 @@ def __repr__(self) -> str: @property def coef_var(self) -> float: """Coefficient of variation: SE / |overall ATT|. NaN when ATT is 0 or SE non-finite.""" - if not (np.isfinite(self.overall_se) and self.overall_se > 0): + if not (np.isfinite(self.overall_se) and self.overall_se >= 0): return np.nan if not np.isfinite(self.overall_att) or self.overall_att == 0: return np.nan diff --git a/diff_diff/sun_abraham.py b/diff_diff/sun_abraham.py index f52c6f14..bb79052f 100644 --- a/diff_diff/sun_abraham.py +++ b/diff_diff/sun_abraham.py @@ -100,7 +100,7 @@ def __repr__(self) -> str: @property def coef_var(self) -> float: """Coefficient of variation: SE / |overall ATT|. NaN when ATT is 0 or SE non-finite.""" - if not (np.isfinite(self.overall_se) and self.overall_se > 0): + if not (np.isfinite(self.overall_se) and self.overall_se >= 0): return np.nan if not np.isfinite(self.overall_att) or self.overall_att == 0: return np.nan diff --git a/diff_diff/trop_results.py b/diff_diff/trop_results.py index 934fef78..66d85d8c 100644 --- a/diff_diff/trop_results.py +++ b/diff_diff/trop_results.py @@ -163,7 +163,7 @@ def __repr__(self) -> str: @property def coef_var(self) -> float: """Coefficient of variation: SE / |ATT|. NaN when ATT is 0 or SE non-finite.""" - if not (np.isfinite(self.se) and self.se > 0): + if not (np.isfinite(self.se) and self.se >= 0): return np.nan if not np.isfinite(self.att) or self.att == 0: return np.nan diff --git a/diff_diff/two_stage_results.py b/diff_diff/two_stage_results.py index 7bd44ace..6097f05a 100644 --- a/diff_diff/two_stage_results.py +++ b/diff_diff/two_stage_results.py @@ -153,7 +153,7 @@ def __repr__(self) -> str: @property def coef_var(self) -> float: """Coefficient of variation: SE / |overall ATT|. NaN when ATT is 0 or SE non-finite.""" - if not (np.isfinite(self.overall_se) and self.overall_se > 0): + if not (np.isfinite(self.overall_se) and self.overall_se >= 0): return np.nan if not np.isfinite(self.overall_att) or self.overall_att == 0: return np.nan From c0580dfa6661e2822be3ac35b84147122edbb5cc Mon Sep 17 00:00:00 2001 From: igerber Date: Fri, 3 Apr 2026 17:15:14 -0400 Subject: [PATCH 07/13] Address P1/P2: skip cluster VCV for survey pretrends, add SE=0 CV test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P1: Skip cluster_ids in solve_ols when survey VCV will replace the cluster-robust VCV anyway. Prevents errors on single-PSU untreated domains where cluster-robust VCV would fail before survey VCV runs. P2: Add test_coef_var_zero_se regression (SE=0 -> CV=0.0). Defer subpopulation-based Omega_0 survey design to TODO.md as medium priority — current physical subsetting is correct for the common case but can change lonely-PSU/FPC behavior when some PSU/stratum has no untreated observations. Co-Authored-By: Claude Opus 4.6 (1M context) --- TODO.md | 1 + diff_diff/imputation.py | 7 +++++-- tests/test_survey_phase8.py | 16 ++++++++++++++++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/TODO.md b/TODO.md index d23caa20..e709f3b2 100644 --- a/TODO.md +++ b/TODO.md @@ -54,6 +54,7 @@ Deferred items from PR reviews that were not addressed before merge. |-------|----------|----|----------| | CallawaySantAnna: consider materializing NaN entries for non-estimable (g,t) cells in group_time_effects dict (currently omitted with consolidated warning); would require updating downstream consumers (event study, balance_e, aggregation) | `staggered.py` | #256 | Low | | ImputationDiD dense `(A0'A0).toarray()` scales O((U+T+K)^2), OOM risk on large panels | `imputation.py` | #141 | Medium (deferred — only triggers when sparse solver fails) | +| ImputationDiD survey pretrends: use subpopulation approach (full design with zeroed weights) instead of physical Omega_0 subsetting for survey VCV. Current approach can change lonely-PSU/FPC behavior when some PSU/stratum has no untreated obs. | `imputation.py` | #260 | Medium | | Multi-absorb weighted demeaning needs iterative alternating projections for N > 1 absorbed FE with survey weights; unweighted multi-absorb also uses single-pass (pre-existing, exact only for balanced panels) | `estimators.py` | #218 | Medium | | Replicate-weight survey df — **Resolved**. `df_survey = rank(replicate_weights) - 1` matching R's `survey::degf()`. For IF paths, `n_valid - 1` when dropped replicates reduce effective count. | `survey.py` | #238 | Resolved | | CallawaySantAnna survey: strata/PSU/FPC — **Resolved**. Aggregated SEs (overall, event study, group) use `compute_survey_if_variance()`. Bootstrap uses PSU-level multiplier weights. | `staggered.py` | #237 | Resolved | diff --git a/diff_diff/imputation.py b/diff_diff/imputation.py index b6a6582a..8d2e7780 100644 --- a/diff_diff/imputation.py +++ b/diff_diff/imputation.py @@ -2103,17 +2103,20 @@ def _compute_lead_coefficients( ] ) - # OLS with cluster-robust SEs (WLS when survey weights present) + # OLS for point estimates + VCV. When survey VCV will replace the + # cluster-robust VCV, skip cluster_ids to avoid errors on domains + # with few PSUs (the cluster-robust VCV is discarded anyway). cluster_ids = df_0[cluster_var].values _ols_weights = survey_weights_0 _ols_weight_type = "pweight" if survey_weights_0 is not None else None + _use_survey_vcov = resolved_survey_0 is not None try: result = solve_ols( X_dm, y_dm, weights=_ols_weights, weight_type=_ols_weight_type, - cluster_ids=cluster_ids, + cluster_ids=None if _use_survey_vcov else cluster_ids, return_vcov=True, rank_deficient_action=self.rank_deficient_action, column_names=all_x_cols, diff --git a/tests/test_survey_phase8.py b/tests/test_survey_phase8.py index 8bbeadf8..796ba492 100644 --- a/tests/test_survey_phase8.py +++ b/tests/test_survey_phase8.py @@ -989,6 +989,22 @@ def test_coef_var_zero_estimate(self): ) assert np.isnan(r.coef_var) + def test_coef_var_zero_se(self): + """SE=0 with nonzero estimate -> CV=0.0 (e.g., FPC census).""" + from diff_diff.results import DiDResults + + r = DiDResults( + att=2.0, + se=0.0, + t_stat=np.inf, + p_value=0.0, + conf_int=(2.0, 2.0), + n_obs=100, + n_treated=50, + n_control=50, + ) + assert r.coef_var == 0.0 + def test_coef_var_nan_se(self): """NaN SE -> CV=NaN.""" from diff_diff.results import DiDResults From 383b23b725b9ff3376175ba7ceea135184dc7871 Mon Sep 17 00:00:00 2001 From: igerber Date: Fri, 3 Apr 2026 17:32:33 -0400 Subject: [PATCH 08/13] Implement subpopulation approach for survey pretrends Replace physical Omega_0 subsetting with zero-padded full-design approach: scores and residuals from the lead regression are zero-padded back to full-panel length before calling compute_survey_vcov(), so the full PSU/strata structure is preserved for design-based variance. This follows the standard survey subpopulation convention (Lumley 2004, R survey::subset.survey.design) where out-of-domain observations contribute zero to the estimating equation but retain their design information for correct variance estimation. Co-Authored-By: Claude Opus 4.6 (1M context) --- TODO.md | 2 +- diff_diff/imputation.py | 114 ++++++++++++++++------------------- docs/methodology/REGISTRY.md | 2 +- 3 files changed, 55 insertions(+), 63 deletions(-) diff --git a/TODO.md b/TODO.md index e709f3b2..9c1aad97 100644 --- a/TODO.md +++ b/TODO.md @@ -54,7 +54,7 @@ Deferred items from PR reviews that were not addressed before merge. |-------|----------|----|----------| | CallawaySantAnna: consider materializing NaN entries for non-estimable (g,t) cells in group_time_effects dict (currently omitted with consolidated warning); would require updating downstream consumers (event study, balance_e, aggregation) | `staggered.py` | #256 | Low | | ImputationDiD dense `(A0'A0).toarray()` scales O((U+T+K)^2), OOM risk on large panels | `imputation.py` | #141 | Medium (deferred — only triggers when sparse solver fails) | -| ImputationDiD survey pretrends: use subpopulation approach (full design with zeroed weights) instead of physical Omega_0 subsetting for survey VCV. Current approach can change lonely-PSU/FPC behavior when some PSU/stratum has no untreated obs. | `imputation.py` | #260 | Medium | +| ImputationDiD survey pretrends: subpopulation approach implemented (full design with zero-padded scores). Resolved in #260. | `imputation.py` | #260 | Resolved | | Multi-absorb weighted demeaning needs iterative alternating projections for N > 1 absorbed FE with survey weights; unweighted multi-absorb also uses single-pass (pre-existing, exact only for balanced panels) | `estimators.py` | #218 | Medium | | Replicate-weight survey df — **Resolved**. `df_survey = rank(replicate_weights) - 1` matching R's `survey::degf()`. For IF paths, `n_valid - 1` when dropped replicates reduce effective count. | `survey.py` | #238 | Resolved | | CallawaySantAnna survey: strata/PSU/FPC — **Resolved**. Aggregated SEs (overall, event study, group) use `compute_survey_if_variance()`. Bootstrap uses PSU-level multiplier weights. | `staggered.py` | #237 | Resolved | diff --git a/diff_diff/imputation.py b/diff_diff/imputation.py index 8d2e7780..1f27abcf 100644 --- a/diff_diff/imputation.py +++ b/diff_diff/imputation.py @@ -1733,33 +1733,16 @@ def _aggregate_event_study( if self.horizon_max is not None: pre_rel_times = [h for h in pre_rel_times if abs(h) <= self.horizon_max] if pre_rel_times: - # Build Omega_0 survey subsetting for pretrends + # Survey pretrends: pass full design (subpopulation approach) _sw_0_pre = None - _rs_0_pre = None + _rs_full_pre = None + _n_full_pre = None + _o0_idx_pre = None if survey_weights is not None and resolved_survey is not None: _sw_0_pre = survey_weights[omega_0_mask.values] - from dataclasses import replace as dc_replace - - _rs_0_pre = dc_replace( - resolved_survey, - weights=resolved_survey.weights[omega_0_mask.values], - strata=( - resolved_survey.strata[omega_0_mask.values] - if resolved_survey.strata is not None - else None - ), - psu=( - resolved_survey.psu[omega_0_mask.values] - if resolved_survey.psu is not None - else None - ), - fpc=( - resolved_survey.fpc[omega_0_mask.values] - if resolved_survey.fpc is not None - else None - ), - ) - # Use full-design df for consistent inference + _rs_full_pre = resolved_survey + _n_full_pre = len(df) + _o0_idx_pre = np.where(omega_0_mask.values)[0] _survey_df_pre = ( resolved_survey.df_survey if resolved_survey is not None else None ) @@ -1775,7 +1758,9 @@ def _aggregate_event_study( alpha=self.alpha, balanced_cohorts=balanced_cohorts, survey_weights_0=_sw_0_pre, - resolved_survey_0=_rs_0_pre, + resolved_survey_full=_rs_full_pre, + n_obs_full=_n_full_pre, + omega_0_indices=_o0_idx_pre, survey_df=_survey_df_pre, ) event_study_effects.update(pre_effects) @@ -2034,7 +2019,9 @@ def _compute_lead_coefficients( alpha: float = 0.05, balanced_cohorts: Optional[set] = None, survey_weights_0: Optional[np.ndarray] = None, - resolved_survey_0=None, + resolved_survey_full=None, + n_obs_full: Optional[int] = None, + omega_0_indices: Optional[np.ndarray] = None, survey_df: Optional[int] = None, ) -> Tuple[Dict[int, Dict[str, Any]], np.ndarray, np.ndarray]: """ @@ -2109,7 +2096,7 @@ def _compute_lead_coefficients( cluster_ids = df_0[cluster_var].values _ols_weights = survey_weights_0 _ols_weight_type = "pweight" if survey_weights_0 is not None else None - _use_survey_vcov = resolved_survey_0 is not None + _use_survey_vcov = resolved_survey_full is not None try: result = solve_ols( X_dm, @@ -2146,26 +2133,45 @@ def _compute_lead_coefficients( vcov = result[2] assert vcov is not None - # Replace cluster-robust VCV with survey design-based VCV when present - if resolved_survey_0 is not None: + # Replace cluster-robust VCV with survey design-based VCV. + # Use the FULL survey design (subpopulation approach): zero-pad + # the Omega_0 scores back to full-panel length so PSU/strata + # structure is preserved for variance estimation. + if resolved_survey_full is not None: from diff_diff.survey import compute_survey_vcov # Use residuals from solve_ols (safe for rank-deficient fits). - residuals = result[1] + residuals_0 = result[1] - # Reduce to kept (finite-coefficient) columns to avoid singular - # X'WX in compute_survey_vcov, then expand back. + # Reduce to kept (finite-coefficient) columns for VCV kept_mask = np.isfinite(coefficients) if np.all(kept_mask): - vcov = compute_survey_vcov(X_dm, residuals, resolved_survey_0) + X_for_vcov = X_dm + res_for_vcov = residuals_0 else: - n_full = len(coefficients) - X_kept = X_dm[:, kept_mask] - vcov_kept = compute_survey_vcov(X_kept, residuals, resolved_survey_0) + X_for_vcov = X_dm[:, kept_mask] + res_for_vcov = residuals_0 + + # Zero-pad to full panel length (subpopulation approach): + # observations outside Omega_0 contribute zero to the score, + # but preserve PSU/strata structure for design-based variance. + n_full_obs = n_obs_full + k_vcov = X_for_vcov.shape[1] + X_full = np.zeros((n_full_obs, k_vcov), dtype=np.float64) + res_full = np.zeros(n_full_obs, dtype=np.float64) + X_full[omega_0_indices] = X_for_vcov + res_full[omega_0_indices] = res_for_vcov + + vcov_kept = compute_survey_vcov(X_full, res_full, resolved_survey_full) + + if not np.all(kept_mask): # Expand back: NaN rows/cols for dropped columns - vcov = np.full((n_full, n_full), np.nan) + n_coef = len(coefficients) + vcov = np.full((n_coef, n_coef), np.nan) kept_idx = np.where(kept_mask)[0] vcov[np.ix_(kept_idx, kept_idx)] = vcov_kept + else: + vcov = vcov_kept n_leads = len(lead_cols) gamma = coefficients[:n_leads] @@ -2268,32 +2274,16 @@ def _pretrend_test(self, n_leads: Optional[int] = None) -> Dict[str, Any]: "lead_coefficients": {}, } - # Build Omega_0 survey subsetting for pretrends + # Survey pretrends: pass full design (subpopulation approach) _sw_0_pt = None - _rs_0_pt = None + _rs_full_pt = None + _n_full_pt = None + _o0_idx_pt = None if survey_weights is not None and resolved_survey is not None: _sw_0_pt = survey_weights[omega_0_mask.values] - from dataclasses import replace as dc_replace - - _rs_0_pt = dc_replace( - resolved_survey, - weights=resolved_survey.weights[omega_0_mask.values], - strata=( - resolved_survey.strata[omega_0_mask.values] - if resolved_survey.strata is not None - else None - ), - psu=( - resolved_survey.psu[omega_0_mask.values] - if resolved_survey.psu is not None - else None - ), - fpc=( - resolved_survey.fpc[omega_0_mask.values] - if resolved_survey.fpc is not None - else None - ), - ) + _rs_full_pt = resolved_survey + _n_full_pt = len(fd["df"]) + _o0_idx_pt = np.where(omega_0_mask.values)[0] # Use shared lead coefficient computation effects, gamma, V_gamma = self._compute_lead_coefficients( @@ -2307,7 +2297,9 @@ def _pretrend_test(self, n_leads: Optional[int] = None) -> Dict[str, Any]: pre_rel_times, alpha=self.alpha, survey_weights_0=_sw_0_pt, - resolved_survey_0=_rs_0_pt, + resolved_survey_full=_rs_full_pt, + n_obs_full=_n_full_pt, + omega_0_indices=_o0_idx_pt, survey_df=(resolved_survey.df_survey if resolved_survey is not None else None), ) diff --git a/docs/methodology/REGISTRY.md b/docs/methodology/REGISTRY.md index d32a7f7b..048a6325 100644 --- a/docs/methodology/REGISTRY.md +++ b/docs/methodology/REGISTRY.md @@ -864,7 +864,7 @@ where `W_it(h) = 1[K_it = h]` are lead indicators, estimated on `Omega_0` only. - Bootstrap does not update pre-period SEs (they are from the lead regression) - When `balance_e` is set, lead indicators are restricted to balanced cohorts; the full Omega_0 sample (including never-treated) is kept for within-transformation - Only affects event study aggregation; overall ATT and group aggregation unchanged -- **Note:** `pretrends=True` with analytical `survey_design` (strata/PSU/FPC) is supported. The lead regression uses survey-weighted demeaning, WLS point estimates, and `compute_survey_vcov()` for design-based VCV. The F-test in `pretrend_test()` uses the full-design `df_survey` as denominator df. Replicate-weight survey designs raise `NotImplementedError` with `pretrends=True` because per-replicate lead regression refits are not yet implemented. +- **Note:** `pretrends=True` with analytical `survey_design` (strata/PSU/FPC) is supported. The lead regression uses survey-weighted demeaning, WLS point estimates, and `compute_survey_vcov()` for design-based VCV. The full survey design is preserved (subpopulation approach): Omega_0 scores are zero-padded back to full-panel length so PSU/strata structure is maintained for variance estimation. The F-test in `pretrend_test()` uses the full-design `df_survey` as denominator df. Replicate-weight survey designs raise `NotImplementedError` with `pretrends=True` because per-replicate lead regression refits are not yet implemented. *Edge cases:* - **Unbalanced panels:** FE estimated via iterative alternating projection (Gauss-Seidel), equivalent to OLS with unit+time dummies. Converges in O(max_iter) passes; typically 5-20 iterations for unbalanced panels, 1-2 for balanced. One-pass demeaning is only exact for balanced panels. From 8fc20baca08dd8758db55d91245951be16b9f532 Mon Sep 17 00:00:00 2001 From: igerber Date: Fri, 3 Apr 2026 17:47:30 -0400 Subject: [PATCH 09/13] Fix trim_weights NaN poisoning and add regression test Use np.nanquantile instead of np.quantile so NaN weights don't poison the quantile computation and corrupt the entire output column. Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/prep.py | 2 +- tests/test_survey_phase8.py | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/diff_diff/prep.py b/diff_diff/prep.py index 949e82e2..e43b9031 100644 --- a/diff_diff/prep.py +++ b/diff_diff/prep.py @@ -1278,7 +1278,7 @@ def trim_weights( if quantile is not None: if not (0 < quantile < 1): raise ValueError(f"quantile must be in (0, 1), got {quantile}") - upper = float(np.quantile(w, quantile)) + upper = float(np.nanquantile(w, quantile)) if upper is not None: w = np.minimum(w, upper) diff --git a/tests/test_survey_phase8.py b/tests/test_survey_phase8.py index 796ba492..7a6e497a 100644 --- a/tests/test_survey_phase8.py +++ b/tests/test_survey_phase8.py @@ -1118,6 +1118,17 @@ def test_trim_upper_and_quantile_raises(self): with pytest.raises(ValueError, match="upper.*quantile"): trim_weights(data, "w", upper=5.0, quantile=0.95) + def test_trim_quantile_with_nan(self): + """Quantile trimming with NaN weights doesn't poison the column.""" + from diff_diff.prep import trim_weights + + data = pd.DataFrame({"w": [1.0, 2.0, np.nan, 5.0, 10.0]}) + result = trim_weights(data, "w", quantile=0.9) + # NaN stays NaN, non-NaN values are trimmed + assert np.isnan(result["w"].iloc[2]) + assert np.isfinite(result["w"].iloc[0]) + assert np.isfinite(result["w"].iloc[4]) + # =========================================================================== # 8e-iii: ImputationDiD Pretrends + Survey From 4cbe488a602b100acb5326d03adb61a699d5bcc0 Mon Sep 17 00:00:00 2001 From: igerber Date: Fri, 3 Apr 2026 19:16:59 -0400 Subject: [PATCH 10/13] Address P2/P3: add always-treated PSU test and update stale docstrings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P2: Add test_pretrends_survey_always_treated_psu — regression test with a PSU/stratum that has mostly treated obs (sparse untreated domain). Exercises the zero-padding subpopulation path. P3: Update docstrings in _compute_lead_coefficients, _pretrend_test, pretrend_test (results), and both bootstrap helper functions to describe survey VCV support and pooled-singleton adjust behavior. Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/bootstrap_utils.py | 7 +++++ diff_diff/imputation.py | 8 +++-- diff_diff/imputation_results.py | 3 +- tests/test_survey_phase8.py | 53 +++++++++++++++++++++++++++++++++ 4 files changed, 68 insertions(+), 3 deletions(-) diff --git a/diff_diff/bootstrap_utils.py b/diff_diff/bootstrap_utils.py index 23dffb71..24c0a30f 100644 --- a/diff_diff/bootstrap_utils.py +++ b/diff_diff/bootstrap_utils.py @@ -433,6 +433,10 @@ def generate_survey_multiplier_weights_batch( is present, weights are scaled by ``sqrt(1 - f_h)`` per stratum so the bootstrap variance matches the TSL variance. + For ``lonely_psu="adjust"``, singleton PSUs from different strata are + pooled into a combined pseudo-stratum and weights are generated for + the pooled group (no FPC scaling on pooled singletons). + Parameters ---------- n_bootstrap : int @@ -568,6 +572,9 @@ def generate_rao_wu_weights( With FPC: ``m_h = max(1, round((1 - f_h) * (n_h - 1)))`` (Rao, Wu & Yue 1992, Section 3). + For ``lonely_psu="adjust"``, singleton PSUs are pooled into a combined + pseudo-stratum and resampled together (no FPC scaling on pooled group). + Parameters ---------- resolved_survey : ResolvedSurveyDesign diff --git a/diff_diff/imputation.py b/diff_diff/imputation.py index 1f27abcf..5fe3eb93 100644 --- a/diff_diff/imputation.py +++ b/diff_diff/imputation.py @@ -2028,7 +2028,10 @@ def _compute_lead_coefficients( Compute pre-period lead coefficients via within-transformed OLS (Test 1). Adds lead indicator dummies W_it(h) = 1[K_it = h] to the untreated - model and estimates their coefficients with cluster-robust SEs. + model and estimates their coefficients. Uses cluster-robust SEs by + default, or design-based survey VCV when ``resolved_survey_full`` + is provided (subpopulation approach: scores zero-padded to full + panel length to preserve PSU/strata structure). The full Omega_0 sample (including never-treated controls) is always used for within-transformation. When balanced_cohorts is provided, @@ -2208,7 +2211,8 @@ def _pretrend_test(self, n_leads: Optional[int] = None) -> Dict[str, Any]: Run pre-trend test (Equation 9). Adds pre-treatment lead indicators to the Step 1 OLS on Omega_0 - and tests their joint significance via cluster-robust Wald F-test. + and tests their joint significance via Wald F-test (cluster-robust + or design-based survey VCV when survey_design is present). """ if self._fit_data is None: raise RuntimeError("Must call fit() before pretrend_test().") diff --git a/diff_diff/imputation_results.py b/diff_diff/imputation_results.py index 6031393f..870b7271 100644 --- a/diff_diff/imputation_results.py +++ b/diff_diff/imputation_results.py @@ -413,7 +413,8 @@ def pretrend_test(self, n_leads: Optional[int] = None) -> Dict[str, Any]: Run a pre-trend test (Equation 9 of Borusyak et al. 2024). Adds pre-treatment lead indicators to the Step 1 OLS and tests - their joint significance via a cluster-robust Wald F-test. + their joint significance via a Wald F-test (cluster-robust, or + design-based survey VCV when survey_design was provided at fit). Parameters ---------- diff --git a/tests/test_survey_phase8.py b/tests/test_survey_phase8.py index 7a6e497a..616b3b0a 100644 --- a/tests/test_survey_phase8.py +++ b/tests/test_survey_phase8.py @@ -1203,6 +1203,59 @@ def test_pretrend_test_survey_no_raise(self, staggered_data): assert np.isfinite(pt["f_stat"]) assert np.isfinite(pt["p_value"]) + def test_pretrends_survey_always_treated_psu(self): + """Survey pretrends with a PSU/stratum that has no untreated obs.""" + from diff_diff import ImputationDiD + + np.random.seed(77) + n_periods = 8 + rows = [] + for i in range(40): + if i < 10: + ft = 2 # early cohort — treated in all but period 1 + elif i < 25: + ft = 5 # late cohort + else: + ft = 0 # never-treated + # PSU 0-9 are in stratum 0 (all early-treated → few untreated obs) + # PSU 10-39 are in strata 1-2 (mix of treated/never-treated) + stratum = 0 if i < 10 else (1 if i < 25 else 2) + for t in range(1, n_periods + 1): + y = 5.0 + i * 0.02 + t * 0.1 + if ft > 0 and t >= ft: + y += 1.5 + y += np.random.normal(0, 0.3) + rows.append( + { + "unit": i, + "time": t, + "first_treat": ft, + "outcome": y, + "weight": 1.0 + 0.2 * (i % 5), + "stratum": stratum, + "psu": i, + } + ) + data = pd.DataFrame(rows) + + sd = SurveyDesign(weights="weight", strata="stratum", psu="psu") + est = ImputationDiD(pretrends=True, horizon_max=3) + result = est.fit( + data, + outcome="outcome", + unit="unit", + time="time", + first_treat="first_treat", + survey_design=sd, + aggregate="event_study", + ) + # Should produce finite results even though stratum 0 has + # mostly treated PSUs (sparse untreated domain) + assert np.isfinite(result.overall_att) + pre = [h for h in result.event_study_effects if h < -1] + for h in pre: + assert np.isfinite(result.event_study_effects[h]["se"]) + def test_pretrends_replicate_raises(self, staggered_data): """pretrends=True + replicate-weight survey raises NotImplementedError.""" from diff_diff import ImputationDiD From 5bec03c543fc08bd398a33cbf3e874e48df33ca1 Mon Sep 17 00:00:00 2001 From: igerber Date: Fri, 3 Apr 2026 19:36:37 -0400 Subject: [PATCH 11/13] Address P1/P2/P3: single-singleton warning, zero-Omega_0 test, REGISTRY P1: Emit UserWarning when lonely_psu="adjust" has only 1 singleton stratum total in both multiplier and Rao-Wu bootstrap. Singleton gets zero variance (same as remove) since pooling requires 2+ singletons. Matches R survey package behavior. P2: Fix test_pretrends_survey_always_treated_psu to use first_treat=1 so stratum 0 has genuinely zero untreated rows, exercising the zero-padding branch. P2: Add single-singleton warning tests for both bootstrap families. P3: Update REGISTRY bullets at lines 860/863 to mention design-based survey VCV alongside cluster-robust. Document single-singleton bootstrap fallback behavior. Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/bootstrap_utils.py | 26 +++++++++++- docs/methodology/REGISTRY.md | 9 +++-- tests/test_survey_phase8.py | 76 +++++++++++++++++++++++++++++++++++- 3 files changed, 104 insertions(+), 7 deletions(-) diff --git a/diff_diff/bootstrap_utils.py b/diff_diff/bootstrap_utils.py index 24c0a30f..d160d80c 100644 --- a/diff_diff/bootstrap_utils.py +++ b/diff_diff/bootstrap_utils.py @@ -552,7 +552,18 @@ def generate_survey_multiplier_weights_batch( pooled_cols = np.array(_singleton_cols) weights[:, pooled_cols] = pooled_weights else: - # Single singleton — variance unidentified, zero weight + # Single singleton — cannot pool, zero weight (matches R survey + # package behavior where adjust with one singleton = remove). + import warnings + + warnings.warn( + "lonely_psu='adjust' with only 1 singleton stratum in " + "bootstrap: singleton PSU contributes zero variance " + "(same as 'remove'). At least 2 singleton strata are " + "needed for pooled pseudo-stratum bootstrap.", + UserWarning, + stacklevel=3, + ) weights[:, _singleton_cols[0]] = 0.0 return weights, psu_ids @@ -673,7 +684,18 @@ def generate_rao_wu_weights( p = int(obs_psu[idx]) rescaled[idx] = base_weights[idx] * psu_scale_map.get(p, 1.0) else: - # Single singleton total — variance unidentified, keep base weights + # Single singleton — cannot pool, keep base weights (matches R + # survey package: adjust with one singleton = remove). + import warnings + + warnings.warn( + "lonely_psu='adjust' with only 1 singleton stratum in " + "bootstrap: singleton PSU contributes zero variance " + "(same as 'remove'). At least 2 singleton strata are " + "needed for pooled pseudo-stratum bootstrap.", + UserWarning, + stacklevel=2, + ) for mask_h, _ in _singleton_info: rescaled[mask_h] = base_weights[mask_h] diff --git a/docs/methodology/REGISTRY.md b/docs/methodology/REGISTRY.md index 048a6325..bf1caeb2 100644 --- a/docs/methodology/REGISTRY.md +++ b/docs/methodology/REGISTRY.md @@ -857,10 +857,10 @@ Pre-period coefficients reuse the existing pre-trend test machinery (BJS Equatio Y_it = alpha_i + beta_t [+ X'_it * delta] + sum_h gamma_h * W_it(h) + epsilon_it ``` where `W_it(h) = 1[K_it = h]` are lead indicators, estimated on `Omega_0` only. -- `gamma_h` are the pre-period event study coefficients with cluster-robust SEs +- `gamma_h` are the pre-period event study coefficients (cluster-robust SEs by default; design-based survey VCV when analytical `survey_design` is present) - Under parallel trends (Assumption 1), `gamma_h = 0` for all `h < -anticipation` - Reference period `h = -1 - anticipation` is the omitted category (normalized to zero) -- SEs from cluster-robust Wald variance (consistent with `pretrend_test()`) +- SEs from cluster-robust Wald variance by default; design-based when survey present (consistent with `pretrend_test()`) - Bootstrap does not update pre-period SEs (they are from the lead regression) - When `balance_e` is set, lead indicators are restricted to balanced cohorts; the full Omega_0 sample (including never-treated) is kept for within-transformation - Only affects event study aggregation; overall ATT and group aggregation unchanged @@ -2220,7 +2220,10 @@ ContinuousDiD, EfficientDiD): bootstrap analogue of the TSL "adjust" behavior (centering around the global mean). Applies to both multiplier bootstrap (CallawaySantAnna, ImputationDiD, TwoStageDiD, ContinuousDiD, EfficientDiD) and Rao-Wu bootstrap (SunAbraham, SyntheticDiD, TROP). - FPC scaling is skipped for pooled singletons (conservative). Reference: Rust & Rao (1996). + FPC scaling is skipped for pooled singletons (conservative). When only one singleton + stratum exists total, pooling is not possible — the singleton contributes zero bootstrap + variance (same as `remove`), with a `UserWarning` emitted. This matches R's `survey` + package behavior. Reference: Rust & Rao (1996). - **Deviation from R:** For the no-FPC case (`m_h = n_h - 1`), this matches R `survey::as.svrepdesign(type="subbootstrap")`. The FPC-adjusted resample size `m_h = round((1-f_h)*(n_h-1))` follows Rao, Wu & Yue (1992) Section 3. diff --git a/tests/test_survey_phase8.py b/tests/test_survey_phase8.py index 616b3b0a..39c68594 100644 --- a/tests/test_survey_phase8.py +++ b/tests/test_survey_phase8.py @@ -911,6 +911,78 @@ def test_adjust_all_singleton(self): assert result.overall_se > 0 +class TestSingleSingletonAdjust: + """Single singleton stratum with adjust: warns and contributes zero variance.""" + + def test_multiplier_single_singleton_warns(self): + """Multiplier bootstrap with 1 singleton warns.""" + from diff_diff.bootstrap_utils import generate_survey_multiplier_weights_batch + from diff_diff.survey import ResolvedSurveyDesign + + np.random.seed(42) + n = 15 + # Stratum 0 has 1 PSU (singleton), strata 1-2 have multiple + strata = np.array([0] * 5 + [1] * 5 + [2] * 5) + psu = np.array([0] * 5 + [1, 2, 3, 4, 5] + [6, 7, 8, 9, 10]) + + resolved = ResolvedSurveyDesign( + weights=np.ones(n), + weight_type="pweight", + strata=strata, + psu=psu, + fpc=None, + n_strata=3, + n_psu=11, + lonely_psu="adjust", + ) + rng = np.random.default_rng(42) + import warnings + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + weights, _ = generate_survey_multiplier_weights_batch( + 50, + resolved, + "rademacher", + rng, + ) + user_warns = [x for x in w if issubclass(x.category, UserWarning)] + assert any("1 singleton" in str(x.message) for x in user_warns) + # Singleton column is zero (same as remove) + assert np.all(weights[:, 0] == 0.0) + + def test_rao_wu_single_singleton_warns(self): + """Rao-Wu with 1 singleton warns.""" + from diff_diff.bootstrap_utils import generate_rao_wu_weights + from diff_diff.survey import ResolvedSurveyDesign + + np.random.seed(42) + n = 15 + strata = np.array([0] * 5 + [1] * 5 + [2] * 5) + psu = np.array([0] * 5 + [1, 2, 3, 4, 5] + [6, 7, 8, 9, 10]) + + resolved = ResolvedSurveyDesign( + weights=np.ones(n), + weight_type="pweight", + strata=strata, + psu=psu, + fpc=None, + n_strata=3, + n_psu=11, + lonely_psu="adjust", + ) + rng = np.random.default_rng(42) + import warnings + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + rescaled = generate_rao_wu_weights(resolved, rng) + user_warns = [x for x in w if issubclass(x.category, UserWarning)] + assert any("1 singleton" in str(x.message) for x in user_warns) + # Singleton obs keep base weights (zero variance contribution) + np.testing.assert_allclose(rescaled[:5], np.ones(5)) + + # =========================================================================== class TestRaoWuLonelyPSUAdjust: """Tests for lonely_psu='adjust' in Rao-Wu bootstrap (SunAbraham consumer).""" @@ -1212,12 +1284,12 @@ def test_pretrends_survey_always_treated_psu(self): rows = [] for i in range(40): if i < 10: - ft = 2 # early cohort — treated in all but period 1 + ft = 1 # always-treated: treated from period 1 (zero untreated rows) elif i < 25: ft = 5 # late cohort else: ft = 0 # never-treated - # PSU 0-9 are in stratum 0 (all early-treated → few untreated obs) + # PSU 0-9 are in stratum 0 (all always-treated → zero untreated obs) # PSU 10-39 are in strata 1-2 (mix of treated/never-treated) stratum = 0 if i < 10 else (1 if i < 25 else 2) for t in range(1, n_periods + 1): From f065afe25a025dbb632a0e8dac7c60682bcc0f7f Mon Sep 17 00:00:00 2001 From: igerber Date: Fri, 3 Apr 2026 20:53:36 -0400 Subject: [PATCH 12/13] Address P2/P3: trim_weights lower>upper, R-equivalence claim, test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P2: Validate lower <= upper in trim_weights after quantile resolution. Raise ValueError when lower exceeds upper to prevent silent corruption. P3: Reword single-singleton bootstrap fallback as library-specific documented behavior (not R equivalence — R's analytical adjust uses grand-mean centering, but the bootstrap single-singleton analogue is not defined in the literature). Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/bootstrap_utils.py | 8 ++++---- diff_diff/prep.py | 6 ++++++ docs/methodology/REGISTRY.md | 5 +++-- tests/test_survey_phase8.py | 8 ++++++++ 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/diff_diff/bootstrap_utils.py b/diff_diff/bootstrap_utils.py index d160d80c..4d80f4a6 100644 --- a/diff_diff/bootstrap_utils.py +++ b/diff_diff/bootstrap_utils.py @@ -552,8 +552,8 @@ def generate_survey_multiplier_weights_batch( pooled_cols = np.array(_singleton_cols) weights[:, pooled_cols] = pooled_weights else: - # Single singleton — cannot pool, zero weight (matches R survey - # package behavior where adjust with one singleton = remove). + # Single singleton — cannot pool, zero weight (library-specific + # fallback; bootstrap adjust with one singleton = remove). import warnings warnings.warn( @@ -684,8 +684,8 @@ def generate_rao_wu_weights( p = int(obs_psu[idx]) rescaled[idx] = base_weights[idx] * psu_scale_map.get(p, 1.0) else: - # Single singleton — cannot pool, keep base weights (matches R - # survey package: adjust with one singleton = remove). + # Single singleton — cannot pool, keep base weights (library-specific + # fallback; bootstrap adjust with one singleton = remove). import warnings warnings.warn( diff --git a/diff_diff/prep.py b/diff_diff/prep.py index e43b9031..d5429629 100644 --- a/diff_diff/prep.py +++ b/diff_diff/prep.py @@ -1280,6 +1280,12 @@ def trim_weights( raise ValueError(f"quantile must be in (0, 1), got {quantile}") upper = float(np.nanquantile(w, quantile)) + if upper is not None and lower is not None and lower > upper: + raise ValueError( + f"lower ({lower}) must be <= upper ({upper}). " + f"When using quantile, the resolved upper cap may be below lower." + ) + if upper is not None: w = np.minimum(w, upper) if lower is not None: diff --git a/docs/methodology/REGISTRY.md b/docs/methodology/REGISTRY.md index bf1caeb2..554acd28 100644 --- a/docs/methodology/REGISTRY.md +++ b/docs/methodology/REGISTRY.md @@ -2222,8 +2222,9 @@ ContinuousDiD, EfficientDiD): ContinuousDiD, EfficientDiD) and Rao-Wu bootstrap (SunAbraham, SyntheticDiD, TROP). FPC scaling is skipped for pooled singletons (conservative). When only one singleton stratum exists total, pooling is not possible — the singleton contributes zero bootstrap - variance (same as `remove`), with a `UserWarning` emitted. This matches R's `survey` - package behavior. Reference: Rust & Rao (1996). + variance (same as `remove`), with a `UserWarning` emitted. This is a library-specific + documented fallback (R's analytical `adjust` uses grand-mean centering, but the bootstrap + analogue for a single singleton is not defined in the literature). Reference: Rust & Rao (1996). - **Deviation from R:** For the no-FPC case (`m_h = n_h - 1`), this matches R `survey::as.svrepdesign(type="subbootstrap")`. The FPC-adjusted resample size `m_h = round((1-f_h)*(n_h-1))` follows Rao, Wu & Yue (1992) Section 3. diff --git a/tests/test_survey_phase8.py b/tests/test_survey_phase8.py index 39c68594..b133bd95 100644 --- a/tests/test_survey_phase8.py +++ b/tests/test_survey_phase8.py @@ -1201,6 +1201,14 @@ def test_trim_quantile_with_nan(self): assert np.isfinite(result["w"].iloc[0]) assert np.isfinite(result["w"].iloc[4]) + def test_trim_lower_exceeds_upper_raises(self): + """lower > upper raises ValueError.""" + from diff_diff.prep import trim_weights + + data = pd.DataFrame({"w": [1.0, 5.0, 10.0]}) + with pytest.raises(ValueError, match="lower.*<=.*upper"): + trim_weights(data, "w", upper=5.0, lower=6.0) + # =========================================================================== # 8e-iii: ImputationDiD Pretrends + Survey From 4fbb82de620ac83364a0a51cb2a5b745815007ea Mon Sep 17 00:00:00 2001 From: igerber Date: Fri, 3 Apr 2026 21:04:14 -0400 Subject: [PATCH 13/13] Address P2/P3: validate trim_weights caps, add survey pretrend coverage P2: Validate upper/lower are finite and non-negative in trim_weights. Reject NaN/negative/inf caps before applying np.minimum/np.maximum. P3: Add survey+covariates pretrend test and survey+bootstrap pretrend test exercising the distinct code paths. Co-Authored-By: Claude Opus 4.6 (1M context) --- diff_diff/prep.py | 7 ++++++ tests/test_survey_phase8.py | 43 +++++++++++++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/diff_diff/prep.py b/diff_diff/prep.py index d5429629..97543576 100644 --- a/diff_diff/prep.py +++ b/diff_diff/prep.py @@ -1280,6 +1280,13 @@ def trim_weights( raise ValueError(f"quantile must be in (0, 1), got {quantile}") upper = float(np.nanquantile(w, quantile)) + # Validate cap values are finite and non-negative + if upper is not None: + if not np.isfinite(upper) or upper < 0: + raise ValueError(f"upper must be finite and >= 0, got {upper}") + if lower is not None: + if not np.isfinite(lower) or lower < 0: + raise ValueError(f"lower must be finite and >= 0, got {lower}") if upper is not None and lower is not None and lower > upper: raise ValueError( f"lower ({lower}) must be <= upper ({upper}). " diff --git a/tests/test_survey_phase8.py b/tests/test_survey_phase8.py index b133bd95..0eeea543 100644 --- a/tests/test_survey_phase8.py +++ b/tests/test_survey_phase8.py @@ -1283,6 +1283,49 @@ def test_pretrend_test_survey_no_raise(self, staggered_data): assert np.isfinite(pt["f_stat"]) assert np.isfinite(pt["p_value"]) + def test_pretrends_survey_with_covariates(self, staggered_data): + """Survey pretrends with covariates uses the survey+covariate path.""" + from diff_diff import ImputationDiD + + data = staggered_data + data["x1"] = np.random.default_rng(42).normal(0, 1, len(data)) + sd = SurveyDesign(weights="weight", strata="stratum", psu="psu") + est = ImputationDiD(pretrends=True, horizon_max=3) + result = est.fit( + data, + outcome="outcome", + unit="unit", + time="time", + first_treat="first_treat", + covariates=["x1"], + survey_design=sd, + aggregate="event_study", + ) + pre = [h for h in result.event_study_effects if h < -1] + for h in pre: + assert np.isfinite(result.event_study_effects[h]["se"]) + + def test_pretrends_survey_with_bootstrap(self, staggered_data): + """Survey pretrends + bootstrap: bootstrap doesn't overwrite pre-period SEs.""" + from diff_diff import ImputationDiD + + data = staggered_data + sd = SurveyDesign(weights="weight", strata="stratum", psu="psu") + est = ImputationDiD(pretrends=True, horizon_max=3, n_bootstrap=20, seed=42) + result = est.fit( + data, + outcome="outcome", + unit="unit", + time="time", + first_treat="first_treat", + survey_design=sd, + aggregate="event_study", + ) + pre = [h for h in result.event_study_effects if h < -1] + for h in pre: + # Pre-period SEs come from lead regression, not bootstrap + assert np.isfinite(result.event_study_effects[h]["se"]) + def test_pretrends_survey_always_treated_psu(self): """Survey pretrends with a PSU/stratum that has no untreated obs.""" from diff_diff import ImputationDiD