diff --git a/diff_diff/linalg.py b/diff_diff/linalg.py index 6da9fe4c..9b3425cb 100644 --- a/diff_diff/linalg.py +++ b/diff_diff/linalg.py @@ -2372,6 +2372,7 @@ def solve_poisson( tol: float = 1e-8, init_beta: Optional[np.ndarray] = None, rank_deficient_action: str = "warn", + weights: Optional[np.ndarray] = None, ) -> Tuple[np.ndarray, np.ndarray]: """Poisson IRLS (Newton-Raphson with log link). @@ -2389,6 +2390,9 @@ def solve_poisson( log(mean(y)) to improve convergence for large-scale outcomes. rank_deficient_action : {"warn", "error", "silent"} How to handle rank-deficient design matrices. Mirrors solve_ols/solve_logit. + weights : (n,) optional observation weights (e.g. survey sampling weights). + When provided, the weighted pseudo-log-likelihood is maximised: + score = X'(w*(y - mu)), Hessian = X'diag(w*mu)X. Returns ------- @@ -2397,6 +2401,20 @@ def solve_poisson( """ n, k_orig = X.shape + # Validate weights (mirrors solve_logit validation) + if weights is not None: + weights = np.asarray(weights, dtype=np.float64) + if weights.shape != (n,): + raise ValueError(f"weights must have shape ({n},), got {weights.shape}") + if np.any(np.isnan(weights)): + raise ValueError("weights contain NaN values") + if np.any(~np.isfinite(weights)): + raise ValueError("weights contain Inf values") + if np.any(weights < 0): + raise ValueError("weights must be non-negative") + if np.sum(weights) <= 0: + raise ValueError("weights sum to zero — no observations have positive weight") + # Validate rank_deficient_action (same as solve_logit/solve_ols) valid_actions = ("warn", "error", "silent") if rank_deficient_action not in valid_actions: @@ -2425,6 +2443,46 @@ def solve_poisson( X = X[:, kept_cols] n, k = X.shape + + # Validate effective weighted sample when weights have zeros + # (mirrors solve_logit's positive-weight safeguards) + if weights is not None and np.any(weights == 0): + pos_mask = weights > 0 + n_pos = int(np.sum(pos_mask)) + X_eff = X[pos_mask] + eff_rank_info = _detect_rank_deficiency(X_eff) + if len(eff_rank_info[1]) > 0: + n_dropped_eff = len(eff_rank_info[1]) + if rank_deficient_action == "error": + raise ValueError( + f"Effective (positive-weight) sample is rank-deficient: " + f"{n_dropped_eff} linearly dependent column(s). " + f"Cannot identify Poisson model on this subpopulation." + ) + elif rank_deficient_action == "warn": + warnings.warn( + f"Effective (positive-weight) sample is rank-deficient: " + f"dropping {n_dropped_eff} column(s). Poisson estimates " + f"may be unreliable on this subpopulation.", + UserWarning, + stacklevel=2, + ) + eff_dropped = set(int(d) for d in eff_rank_info[1]) + eff_kept = np.array([i for i in range(k) if i not in eff_dropped]) + X = X[:, eff_kept] + if len(dropped_cols) > 0: + kept_cols = kept_cols[eff_kept] + else: + kept_cols = eff_kept + dropped_cols = list(eff_dropped) + n, k = X.shape + if n_pos <= k: + raise ValueError( + f"Only {n_pos} positive-weight observation(s) for " + f"{k} parameters (after rank reduction). " + f"Cannot identify Poisson model." + ) + if init_beta is not None: beta = init_beta[kept_cols].copy() if len(dropped_cols) > 0 else init_beta.copy() else: @@ -2438,8 +2496,12 @@ def solve_poisson( for _ in range(max_iter): eta = np.clip(X @ beta, -500, 500) mu = np.exp(eta) - score = X.T @ (y - mu) # gradient of log-likelihood - hess = X.T @ (mu[:, None] * X) # -Hessian = X'WX, W=diag(mu) + if weights is not None: + score = X.T @ (weights * (y - mu)) + hess = X.T @ ((weights * mu)[:, None] * X) + else: + score = X.T @ (y - mu) + hess = X.T @ (mu[:, None] * X) try: delta = np.linalg.solve(hess + 1e-12 * np.eye(k), score) except np.linalg.LinAlgError: diff --git a/diff_diff/wooldridge.py b/diff_diff/wooldridge.py index f51fb20b..4bc2c0ce 100644 --- a/diff_diff/wooldridge.py +++ b/diff_diff/wooldridge.py @@ -42,6 +42,7 @@ def _compute_weighted_agg( gt_keys: List, gt_vcov: Optional[np.ndarray], alpha: float, + df: Optional[int] = None, ) -> Dict: """Compute simple (overall) weighted average ATT and SE via delta method.""" post_keys = [(g, t) for (g, t) in gt_keys if t >= g] @@ -63,10 +64,54 @@ def _compute_weighted_agg( else: se = float("nan") - t_stat, p_value, conf_int = safe_inference(att, se, alpha=alpha) + t_stat, p_value, conf_int = safe_inference(att, se, alpha=alpha, df=df) return {"att": att, "se": se, "t_stat": t_stat, "p_value": p_value, "conf_int": conf_int} +def _resolve_survey_for_wooldridge(survey_design, sample, cluster_ids, cluster_name): + """Resolve survey design, inject cluster as PSU, recompute metadata. + + Shared helper for all three WooldridgeDiD sub-fitters. Matches the + resolution chain in DifferenceInDifferences.fit() (estimators.py:344-359). + """ + from diff_diff.survey import ( + _resolve_survey_for_fit, + _resolve_effective_cluster, + _inject_cluster_as_psu, + compute_survey_metadata, + ) + + resolved, survey_weights, survey_weight_type, survey_metadata = ( + _resolve_survey_for_fit(survey_design, sample) + ) + if resolved is not None and resolved.uses_replicate_variance: + raise NotImplementedError( + "WooldridgeDiD does not yet support replicate-weight variance. " + "Use TSL (strata/PSU/FPC) instead." + ) + if resolved is not None and resolved.weight_type != "pweight": + raise ValueError( + f"WooldridgeDiD survey support requires weight_type='pweight', " + f"got '{resolved.weight_type}'. The survey variance math " + f"assumes probability weights (pweight)." + ) + if resolved is not None: + effective_cluster = _resolve_effective_cluster( + resolved, cluster_ids, cluster_name + ) + if effective_cluster is not None: + resolved = _inject_cluster_as_psu(resolved, effective_cluster) + if resolved.psu is not None and survey_metadata is not None: + raw_w = ( + sample[survey_design.weights].values.astype(np.float64) + if survey_design.weights + else np.ones(len(sample), dtype=np.float64) + ) + survey_metadata = compute_survey_metadata(resolved, raw_w) + df_inf = resolved.df_survey if resolved is not None else None + return resolved, survey_weights, survey_weight_type, survey_metadata, df_inf + + def _filter_sample( data: pd.DataFrame, unit: str, @@ -329,6 +374,7 @@ def fit( exovar: Optional[List[str]] = None, xtvar: Optional[List[str]] = None, xgvar: Optional[List[str]] = None, + survey_design=None, ) -> WooldridgeDiDResults: """Fit the ETWFE model. See class docstring for parameter details. @@ -343,6 +389,11 @@ def fit( xtvar : time-varying covariates (demeaned within cohort×period cells when ``demean_covariates=True``) xgvar : covariates interacted with each cohort indicator + survey_design : SurveyDesign, optional + Survey design specification for complex survey data. Supports + stratified, clustered, and weighted designs via Taylor Series + Linearization (TSL). Replicate-weight designs raise + ``NotImplementedError``. """ df = data.copy() df[cohort] = df[cohort].fillna(0) @@ -366,6 +417,13 @@ def fit( f"Set n_bootstrap=0 for analytic SEs." ) + # 0c. Reject bootstrap + survey (no survey-aware bootstrap variant) + if self.n_bootstrap > 0 and survey_design is not None: + raise ValueError( + "Bootstrap inference is not supported with survey_design. " + "Set n_bootstrap=0 for analytic survey SEs." + ) + # 1. Filter to analysis sample sample = _filter_sample(df, unit, time, cohort, self.control_group, self.anticipation) @@ -502,6 +560,7 @@ def fit( gt_keys, int_col_names, groups, + survey_design=survey_design, ) elif self.method == "logit": n_cov_interact = X_cov.shape[1] if X_cov is not None else 0 @@ -517,6 +576,7 @@ def fit( int_col_names, groups, n_cov_interact=n_cov_interact, + survey_design=survey_design, ) else: # poisson n_cov_interact = X_cov.shape[1] if X_cov is not None else 0 @@ -532,6 +592,7 @@ def fit( int_col_names, groups, n_cov_interact=n_cov_interact, + survey_design=survey_design, ) self._results = results @@ -561,8 +622,21 @@ def _fit_ols( gt_keys: List[Tuple], int_col_names: List[str], groups: List[Any], + survey_design=None, ) -> WooldridgeDiDResults: """OLS path: within-transform FE, solve_ols, cluster SE.""" + # Reset index so numpy positional indexing matches pandas groupby + sample = sample.reset_index(drop=True) + # Cluster IDs (default: unit level) — needed before survey resolution + cluster_col = self.cluster if self.cluster else unit + cluster_ids = sample[cluster_col].values + + # Resolve survey design, inject cluster as PSU only when user explicitly set cluster= + survey_cluster_ids = cluster_ids if self.cluster else None + resolved, survey_weights, survey_weight_type, survey_metadata, df_inf = ( + _resolve_survey_for_wooldridge(survey_design, sample, survey_cluster_ids, self.cluster) + ) + # 4. Within-transform: absorb unit + time FE all_vars = [outcome] + [f"_x{i}" for i in range(X_design.shape[1])] tmp = sample[[unit, time]].copy() @@ -570,32 +644,60 @@ def _fit_ols( for i in range(X_design.shape[1]): tmp[f"_x{i}"] = X_design[:, i] - # Use uniform weights to trigger iterative alternating projections, - # which is exact for both balanced and unbalanced panels. - # The one-pass formula (y - ȳ_i - ȳ_t + ȳ) is only exact for balanced panels. + # Use iterative alternating projections for demeaning (exact for + # both balanced and unbalanced panels). Survey weights change the + # weighted FWL projection — all columns (treatment interactions + + # covariates) are demeaned together. + wt_weights = survey_weights if survey_weights is not None else np.ones(len(tmp)) + + # Guard: zero-weight unit/time groups cause 0/0 in within_transform + if survey_weights is not None and np.any(survey_weights == 0): + sw_series = pd.Series(survey_weights, index=sample.index) + for grp_col, grp_label in [(unit, "unit"), (time, "time period")]: + grp_sums = sw_series.groupby(sample[grp_col]).sum() + zero_grps = grp_sums[grp_sums == 0].index.tolist() + if zero_grps: + raise ValueError( + f"Survey weights sum to zero for {grp_label}(s) " + f"{zero_grps[:3]}. Cannot compute weighted " + f"within-transformation. Remove zero-weight " + f"{grp_label}s or use non-zero weights." + ) + transformed = within_transform( tmp, all_vars, unit=unit, time=time, suffix="_demeaned", - weights=np.ones(len(tmp)), + weights=wt_weights, ) y = transformed[f"{outcome}_demeaned"].values X_cols = [f"_x{i}_demeaned" for i in range(X_design.shape[1])] X = transformed[X_cols].values - # 5. Cluster IDs (default: unit level) - cluster_col = self.cluster if self.cluster else unit - cluster_ids = sample[cluster_col].values - - # 6. Solve OLS + # 6. Solve OLS (skip cluster-robust vcov when survey will provide TSL vcov) coefs, resids, vcov = solve_ols( X, y, cluster_ids=cluster_ids, - return_vcov=True, + return_vcov=(resolved is None), rank_deficient_action=self.rank_deficient_action, column_names=col_names, + weights=survey_weights, + weight_type=survey_weight_type, ) + # Survey TSL vcov replaces cluster-robust vcov + if resolved is not None: + from diff_diff.survey import compute_survey_vcov + nan_mask_ols = np.isnan(coefs) + if np.any(nan_mask_ols): + kept = ~nan_mask_ols + vcov_kept = compute_survey_vcov(X[:, kept], resids, resolved) + vcov = np.full((len(coefs), len(coefs)), np.nan) + kept_idx = np.where(kept)[0] + vcov[np.ix_(kept_idx, kept_idx)] = vcov_kept + else: + vcov = compute_survey_vcov(X, resids, resolved) + # 7. Extract β_{g,t} and build gt_effects dict gt_effects: Dict[Tuple, Dict] = {} gt_weights: Dict[Tuple, int] = {} @@ -607,7 +709,7 @@ def _fit_ols( continue att = float(coefs[idx]) se = float(np.sqrt(max(vcov[idx, idx], 0.0))) if vcov is not None else float("nan") - t_stat, p_value, conf_int = safe_inference(att, se, alpha=self.alpha) + t_stat, p_value, conf_int = safe_inference(att, se, alpha=self.alpha, df=df_inf) gt_effects[(g, t)] = { "att": att, "se": se, @@ -628,7 +730,7 @@ def _fit_ols( # 8. Simple aggregation (always computed) overall = _compute_weighted_agg( - gt_effects, gt_weights, gt_keys_ordered, gt_vcov, self.alpha + gt_effects, gt_weights, gt_keys_ordered, gt_vcov, self.alpha, df=df_inf ) # Metadata @@ -652,9 +754,11 @@ def _fit_ols( n_control_units=n_control, alpha=self.alpha, anticipation=self.anticipation, + survey_metadata=survey_metadata, _gt_weights=gt_weights, _gt_vcov=gt_vcov, _gt_keys=gt_keys_ordered, + _df_survey=df_inf, ) # 9. Optional multiplier bootstrap (overrides analytic SE for overall ATT) @@ -723,6 +827,7 @@ def _fit_logit( int_col_names: List[str], groups: List[Any], n_cov_interact: int = 0, + survey_design=None, ) -> WooldridgeDiDResults: """Logit path: cohort + time additive FEs + solve_logit + ASF ATT. @@ -749,10 +854,18 @@ def _fit_logit( cluster_col = self.cluster if self.cluster else unit cluster_ids = sample[cluster_col].values + # Resolve survey design, inject cluster as PSU only when user explicitly set cluster= + survey_cluster_ids = cluster_ids if self.cluster else None + resolved, survey_weights, survey_weight_type, survey_metadata, df_inf = ( + _resolve_survey_for_wooldridge(survey_design, sample, survey_cluster_ids, self.cluster) + ) + _has_survey = resolved is not None + beta, probs = solve_logit( X_full, y, rank_deficient_action=self.rank_deficient_action, + weights=survey_weights, ) # solve_logit prepends intercept — beta[0] is intercept, beta[1:] are X_full cols beta_int_cols = beta[1 : n_int + 1] # treatment interaction coefficients @@ -763,34 +876,65 @@ def _fit_logit( beta_clean = np.where(nan_mask, 0.0, beta) kept_beta = ~nan_mask - # QMLE sandwich vcov via shared linalg backend + # QMLE sandwich vcov resids = y - probs X_with_intercept = np.column_stack([np.ones(len(y)), X_full]) - if np.any(nan_mask): - # Compute vcov on reduced design (only identified columns) - X_reduced = X_with_intercept[:, kept_beta] - vcov_reduced = compute_robust_vcov( - X_reduced, - resids, - cluster_ids=cluster_ids, - weights=probs * (1 - probs), - weight_type="aweight", - ) - # Expand back to full size with NaN for dropped columns - k_full = len(beta) - vcov_full = np.full((k_full, k_full), np.nan) - kept_idx = np.where(kept_beta)[0] - vcov_full[np.ix_(kept_idx, kept_idx)] = vcov_reduced + + if _has_survey: + # X_tilde trick: transform design matrix so compute_survey_vcov + # produces the correct QMLE sandwich for nonlinear models. + # Bread: (X_tilde'WX_tilde)^{-1} = (X'diag(w*V)X)^{-1} + # Scores: w*X_tilde*r_tilde = w*X*(y-mu) + from diff_diff.survey import compute_survey_vcov + V = probs * (1 - probs) + sqrt_V = np.sqrt(np.clip(V, 1e-20, None)) + X_tilde = X_with_intercept * sqrt_V[:, None] + r_tilde = resids / sqrt_V + if np.any(nan_mask): + X_tilde_r = X_tilde[:, kept_beta] + vcov_reduced = compute_survey_vcov(X_tilde_r, r_tilde, resolved) + k_full = len(beta) + vcov_full = np.full((k_full, k_full), np.nan) + kept_idx = np.where(kept_beta)[0] + vcov_full[np.ix_(kept_idx, kept_idx)] = vcov_reduced + else: + vcov_full = compute_survey_vcov(X_tilde, r_tilde, resolved) else: - vcov_full = compute_robust_vcov( - X_with_intercept, - resids, - cluster_ids=cluster_ids, - weights=probs * (1 - probs), - weight_type="aweight", - ) + # Cluster-robust QMLE sandwich (non-survey path) + if np.any(nan_mask): + X_reduced = X_with_intercept[:, kept_beta] + vcov_reduced = compute_robust_vcov( + X_reduced, + resids, + cluster_ids=cluster_ids, + weights=probs * (1 - probs), + weight_type="aweight", + ) + k_full = len(beta) + vcov_full = np.full((k_full, k_full), np.nan) + kept_idx = np.where(kept_beta)[0] + vcov_full[np.ix_(kept_idx, kept_idx)] = vcov_reduced + else: + vcov_full = compute_robust_vcov( + X_with_intercept, + resids, + cluster_ids=cluster_ids, + weights=probs * (1 - probs), + weight_type="aweight", + ) beta = beta_clean + # Survey-weighted averaging helpers for ASF computation + def _avg(a, cell_mask): + if survey_weights is not None: + return float(np.average(a, weights=survey_weights[cell_mask])) + return float(np.mean(a)) + + def _avg_ax0(a, cell_mask): + if survey_weights is not None: + return np.average(a, weights=survey_weights[cell_mask], axis=0) + return np.mean(a, axis=0) + # ASF ATT(g,t) for treated units in each cell gt_effects: Dict[Tuple, Dict] = {} gt_weights: Dict[Tuple, int] = {} @@ -802,6 +946,9 @@ def _fit_logit( if cell_mask.sum() == 0: continue # Skip cells whose interaction coefficient was dropped (rank deficiency) + # Skip cells where all survey weights are zero (non-estimable) + if survey_weights is not None and np.sum(survey_weights[cell_mask]) == 0: + continue delta = beta_int_cols[idx] if np.isnan(delta): continue @@ -816,26 +963,26 @@ def _fit_logit( x_hat_j = X_with_intercept[cell_mask, coef_pos] delta_total = delta_total + beta[coef_pos] * x_hat_j eta_0 = eta_base - delta_total - att = float(np.mean(_logistic(eta_base) - _logistic(eta_0))) + att = _avg(_logistic(eta_base) - _logistic(eta_0), cell_mask) # Delta method gradient: d(ATT)/d(β) # for nuisance p: mean_i[(Λ'(η_1) - Λ'(η_0)) * X_p] # for cell intercept: mean_i[Λ'(η_1)] # for cell × cov j: mean_i[Λ'(η_1) * x_hat_j] d_diff = _logistic_deriv(eta_base) - _logistic_deriv(eta_0) - grad = np.mean(X_with_intercept[cell_mask] * d_diff[:, None], axis=0) - grad[1 + idx] = float(np.mean(_logistic_deriv(eta_base))) + grad = _avg_ax0(X_with_intercept[cell_mask] * d_diff[:, None], cell_mask) + grad[1 + idx] = _avg(_logistic_deriv(eta_base), cell_mask) for j in range(n_cov_interact): coef_pos = 1 + n_int + idx * n_cov_interact + j if coef_pos < len(beta): x_hat_j = X_with_intercept[cell_mask, coef_pos] - grad[coef_pos] = float(np.mean(_logistic_deriv(eta_base) * x_hat_j)) + grad[coef_pos] = _avg(_logistic_deriv(eta_base) * x_hat_j, cell_mask) # Compute SE in reduced parameter space if rank-deficient if np.any(nan_mask): grad_r = grad[kept_beta] se = float(np.sqrt(max(grad_r @ vcov_reduced @ grad_r, 0.0))) else: se = float(np.sqrt(max(grad @ vcov_full @ grad, 0.0))) - t_stat, p_value, conf_int = safe_inference(att, se, alpha=self.alpha) + t_stat, p_value, conf_int = safe_inference(att, se, alpha=self.alpha, df=df_inf) gt_effects[(g, t)] = { "att": att, "se": se, @@ -864,7 +1011,7 @@ def _fit_logit( overall_att = sum(gt_weights[k] * gt_effects[k]["att"] for k in post_keys) / w_total agg_grad = sum((gt_weights[k] / w_total) * gt_grads[k] for k in post_keys) overall_se = float(np.sqrt(max(agg_grad @ _vcov_se @ agg_grad, 0.0))) - t_stat, p_value, conf_int = safe_inference(overall_att, overall_se, alpha=self.alpha) + t_stat, p_value, conf_int = safe_inference(overall_att, overall_se, alpha=self.alpha, df=df_inf) overall = { "att": overall_att, "se": overall_se, @@ -874,7 +1021,7 @@ def _fit_logit( } else: overall = _compute_weighted_agg( - gt_effects, gt_weights, gt_keys_ordered, None, self.alpha + gt_effects, gt_weights, gt_keys_ordered, None, self.alpha, df=df_inf ) return WooldridgeDiDResults( @@ -893,9 +1040,11 @@ def _fit_logit( n_control_units=self._count_control_units(sample, unit, cohort, time), alpha=self.alpha, anticipation=self.anticipation, + survey_metadata=survey_metadata, _gt_weights=gt_weights, _gt_vcov=gt_vcov, _gt_keys=gt_keys_ordered, + _df_survey=df_inf, ) def _fit_poisson( @@ -911,6 +1060,7 @@ def _fit_poisson( int_col_names: List[str], groups: List[Any], n_cov_interact: int = 0, + survey_design=None, ) -> WooldridgeDiDResults: """Poisson path: cohort + time additive FEs + solve_poisson + ASF ATT. @@ -940,7 +1090,18 @@ def _fit_poisson( cluster_col = self.cluster if self.cluster else unit cluster_ids = sample[cluster_col].values - beta, mu_hat = solve_poisson(X_full, y, rank_deficient_action=self.rank_deficient_action) + # Resolve survey design, inject cluster as PSU only when user explicitly set cluster= + survey_cluster_ids = cluster_ids if self.cluster else None + resolved, survey_weights, survey_weight_type, survey_metadata, df_inf = ( + _resolve_survey_for_wooldridge(survey_design, sample, survey_cluster_ids, self.cluster) + ) + _has_survey = resolved is not None + + beta, mu_hat = solve_poisson( + X_full, y, + rank_deficient_action=self.rank_deficient_action, + weights=survey_weights, + ) # Handle rank-deficient designs: compute vcov on reduced design. # Preserve raw interaction coefficients BEFORE zeroing NaN so the @@ -950,34 +1111,63 @@ def _fit_poisson( beta_clean = np.where(nan_mask, 0.0, beta) kept_beta = ~nan_mask - # QMLE sandwich vcov via shared linalg backend + # QMLE sandwich vcov resids = y - mu_hat - if np.any(nan_mask): - X_reduced = X_full[:, kept_beta] - vcov_reduced = compute_robust_vcov( - X_reduced, - resids, - cluster_ids=cluster_ids, - weights=mu_hat, - weight_type="aweight", - ) - k_full = len(beta) - vcov_full = np.full((k_full, k_full), np.nan) - kept_idx = np.where(kept_beta)[0] - vcov_full[np.ix_(kept_idx, kept_idx)] = vcov_reduced + + if _has_survey: + # X_tilde trick for nonlinear survey vcov (V = mu for Poisson) + from diff_diff.survey import compute_survey_vcov + sqrt_V = np.sqrt(np.clip(mu_hat, 1e-20, None)) + X_tilde = X_full * sqrt_V[:, None] + r_tilde = resids / sqrt_V + if np.any(nan_mask): + X_tilde_r = X_tilde[:, kept_beta] + vcov_reduced = compute_survey_vcov(X_tilde_r, r_tilde, resolved) + k_full = len(beta) + vcov_full = np.full((k_full, k_full), np.nan) + kept_idx = np.where(kept_beta)[0] + vcov_full[np.ix_(kept_idx, kept_idx)] = vcov_reduced + else: + vcov_full = compute_survey_vcov(X_tilde, r_tilde, resolved) else: - vcov_full = compute_robust_vcov( - X_full, - resids, - cluster_ids=cluster_ids, - weights=mu_hat, - weight_type="aweight", - ) + # Cluster-robust QMLE sandwich (non-survey path) + if np.any(nan_mask): + X_reduced = X_full[:, kept_beta] + vcov_reduced = compute_robust_vcov( + X_reduced, + resids, + cluster_ids=cluster_ids, + weights=mu_hat, + weight_type="aweight", + ) + k_full = len(beta) + vcov_full = np.full((k_full, k_full), np.nan) + kept_idx = np.where(kept_beta)[0] + vcov_full[np.ix_(kept_idx, kept_idx)] = vcov_reduced + else: + vcov_full = compute_robust_vcov( + X_full, + resids, + cluster_ids=cluster_ids, + weights=mu_hat, + weight_type="aweight", + ) beta = beta_clean # Treatment interaction coefficients (from cleaned beta for computation) beta_int = beta[1 : 1 + n_int] + # Survey-weighted averaging helpers for ASF computation + def _avg(a, cell_mask): + if survey_weights is not None: + return float(np.average(a, weights=survey_weights[cell_mask])) + return float(np.mean(a)) + + def _avg_ax0(a, cell_mask): + if survey_weights is not None: + return np.average(a, weights=survey_weights[cell_mask], axis=0) + return np.mean(a, axis=0) + # ASF ATT(g,t) for treated units in each cell. # eta_base = X_full @ beta already includes the treatment effect (D_{g,t}=1). # Counterfactual: eta_0 = eta_base - delta (treatment switched off). @@ -995,6 +1185,9 @@ def _fit_poisson( # Use raw coefficients (before NaN->0 zeroing) to detect dropped cells. if np.isnan(beta_int_raw[idx]): continue + # Skip cells where all survey weights are zero (non-estimable) + if survey_weights is not None and np.sum(survey_weights[cell_mask]) == 0: + continue delta = beta_int[idx] if np.isnan(delta): continue @@ -1009,26 +1202,26 @@ def _fit_poisson( eta_0 = eta_base - delta_total mu_1 = np.exp(eta_base) mu_0 = np.exp(eta_0) - att = float(np.mean(mu_1 - mu_0)) + att = _avg(mu_1 - mu_0, cell_mask) # Delta method gradient: # for nuisance p: mean_i[(μ_1 - μ_0) * X_p] # for cell intercept: mean_i[μ_1] # for cell × cov j: mean_i[μ_1 * x_hat_j] diff_mu = mu_1 - mu_0 - grad = np.mean(X_full[cell_mask] * diff_mu[:, None], axis=0) - grad[1 + idx] = float(np.mean(mu_1)) + grad = _avg_ax0(X_full[cell_mask] * diff_mu[:, None], cell_mask) + grad[1 + idx] = _avg(mu_1, cell_mask) for j in range(n_cov_interact): coef_pos = 1 + n_int + idx * n_cov_interact + j if coef_pos < len(beta): x_hat_j = X_full[cell_mask, coef_pos] - grad[coef_pos] = float(np.mean(mu_1 * x_hat_j)) + grad[coef_pos] = _avg(mu_1 * x_hat_j, cell_mask) # Compute SE in reduced parameter space if rank-deficient if np.any(nan_mask): grad_r = grad[kept_beta] se = float(np.sqrt(max(grad_r @ vcov_reduced @ grad_r, 0.0))) else: se = float(np.sqrt(max(grad @ vcov_full @ grad, 0.0))) - t_stat, p_value, conf_int = safe_inference(att, se, alpha=self.alpha) + t_stat, p_value, conf_int = safe_inference(att, se, alpha=self.alpha, df=df_inf) gt_effects[(g, t)] = { "att": att, "se": se, @@ -1055,7 +1248,7 @@ def _fit_poisson( overall_att = sum(gt_weights[k] * gt_effects[k]["att"] for k in post_keys) / w_total agg_grad = sum((gt_weights[k] / w_total) * gt_grads[k] for k in post_keys) overall_se = float(np.sqrt(max(agg_grad @ _vcov_se @ agg_grad, 0.0))) - t_stat, p_value, conf_int = safe_inference(overall_att, overall_se, alpha=self.alpha) + t_stat, p_value, conf_int = safe_inference(overall_att, overall_se, alpha=self.alpha, df=df_inf) overall = { "att": overall_att, "se": overall_se, @@ -1065,7 +1258,7 @@ def _fit_poisson( } else: overall = _compute_weighted_agg( - gt_effects, gt_weights, gt_keys_ordered, None, self.alpha + gt_effects, gt_weights, gt_keys_ordered, None, self.alpha, df=df_inf ) return WooldridgeDiDResults( @@ -1084,7 +1277,9 @@ def _fit_poisson( n_control_units=self._count_control_units(sample, unit, cohort, time), alpha=self.alpha, anticipation=self.anticipation, + survey_metadata=survey_metadata, _gt_weights=gt_weights, _gt_vcov=gt_vcov, _gt_keys=gt_keys_ordered, + _df_survey=df_inf, ) diff --git a/diff_diff/wooldridge_results.py b/diff_diff/wooldridge_results.py index 601f7d5b..57f9eed1 100644 --- a/diff_diff/wooldridge_results.py +++ b/diff_diff/wooldridge_results.py @@ -54,6 +54,7 @@ class WooldridgeDiDResults: n_control_units: int = 0 alpha: float = 0.05 anticipation: int = 0 + survey_metadata: Optional[Any] = field(default=None, repr=False) # ------------------------------------------------------------------ # # Internal — used by aggregate() for delta-method SEs # @@ -63,6 +64,8 @@ class WooldridgeDiDResults: """Full vcov of all β_{g,t} coefficients (ordered same as sorted group_time_effects keys).""" _gt_keys: List[Tuple[Any, Any]] = field(default_factory=list, repr=False) """Ordered list of (g,t) keys corresponding to _gt_vcov columns.""" + _df_survey: Optional[int] = field(default=None, repr=False) + """Survey degrees of freedom for t-distribution inference.""" # ------------------------------------------------------------------ # # Public methods # @@ -93,7 +96,7 @@ def _agg_se(w_vec: np.ndarray) -> float: return float(np.sqrt(max(w_vec @ vcov @ w_vec, 0.0))) def _build_effect(att: float, se: float) -> Dict[str, Any]: - t_stat, p_value, conf_int = safe_inference(att, se, alpha=self.alpha) + t_stat, p_value, conf_int = safe_inference(att, se, alpha=self.alpha, df=self._df_survey) return { "att": att, "se": se, @@ -181,6 +184,11 @@ def summary(self, aggregation: str = "simple") -> str: "-" * 70, ] + if self.survey_metadata is not None: + from diff_diff.results import _format_survey_block + lines.extend(_format_survey_block(self.survey_metadata, 70)) + lines.append("-" * 70) + def _fmt_row(label: str, att: float, se: float, t: float, p: float, ci: Tuple) -> str: from diff_diff.results import _get_significance_stars # type: ignore diff --git a/docs/choosing_estimator.rst b/docs/choosing_estimator.rst index 77a8fd26..8af57097 100644 --- a/docs/choosing_estimator.rst +++ b/docs/choosing_estimator.rst @@ -599,10 +599,9 @@ If you're unsure which estimator to use: Survey Design Support --------------------- -All estimators except :class:`~diff_diff.WooldridgeDiD` accept an optional -``survey_design`` parameter in ``fit()``. Pass a :class:`~diff_diff.SurveyDesign` -object to get design-based variance estimation. The depth of support varies by -estimator (WooldridgeDiD survey support is planned for Phase 10f): +All estimators accept an optional ``survey_design`` parameter in ``fit()``. +Pass a :class:`~diff_diff.SurveyDesign` object to get design-based variance +estimation. The depth of support varies by estimator: .. list-table:: :header-rows: 1 @@ -684,8 +683,8 @@ estimator (WooldridgeDiD survey support is planned for Phase 10f): - -- - Rao-Wu rescaled * - ``WooldridgeDiD`` - - -- - - -- + - Full (pweight only) + - Full (analytical) - -- - -- * - ``BaconDecomposition`` @@ -694,11 +693,6 @@ estimator (WooldridgeDiD survey support is planned for Phase 10f): - -- - -- -.. note:: - - ``WooldridgeDiD`` does not yet accept ``survey_design``. Survey support - is planned for Phase 10f. See :doc:`/survey-roadmap` for details. - **Legend:** - **Full**: All weight types (pweight/fweight/aweight) + strata/PSU/FPC + Taylor Series Linearization variance diff --git a/docs/methodology/REGISTRY.md b/docs/methodology/REGISTRY.md index 26c311a1..42f0fa5f 100644 --- a/docs/methodology/REGISTRY.md +++ b/docs/methodology/REGISTRY.md @@ -1174,6 +1174,14 @@ where `g(·)` is the link inverse (logistic or exp), `η_i` is the individual li - [x] Both control groups: not_yet_treated, never_treated - [x] Anticipation parameter support - [x] Multiplier bootstrap (Rademacher/Webb/Mammen) for OLS overall SE +- [x] Survey design support (strata/PSU/FPC with TSL variance) + +**Survey design notes:** +- **OLS path:** Survey-weighted within-transformation + WLS via `solve_ols(weights=...)` + TSL vcov via `compute_survey_vcov()`. +- **Logit/Poisson paths:** Survey-weighted IRLS via `solve_logit(weights=...)`/`solve_poisson(weights=...)` + X_tilde linearization trick for TSL vcov: `X_tilde = X * sqrt(V)`, `r_tilde = (y - mu) / sqrt(V)`, then `compute_survey_vcov(X_tilde, r_tilde, resolved)` gives correct QMLE sandwich. ASF means and gradients use survey-weighted averaging. +- **Note:** Only `pweight` (probability weights) are supported; `fweight`/`aweight` raise `ValueError` because the composed survey/QMLE weighting changes their semantics. +- **Note:** Replicate-weight variance is not yet supported (`NotImplementedError`). Use TSL (strata/PSU/FPC) instead. +- **Note:** Bootstrap inference (`n_bootstrap > 0`) cannot be combined with `survey_design` — no survey-aware bootstrap variant is implemented. --- diff --git a/docs/survey-roadmap.md b/docs/survey-roadmap.md index 685dd2e9..10a3de48 100644 --- a/docs/survey-roadmap.md +++ b/docs/survey-roadmap.md @@ -206,12 +206,14 @@ under survey weighting. The survey statistics side (Binder 1983, Rao & Wu 1988) is established and doesn't need a survey methodologist to co-sign. -### 10f. WooldridgeDiD Survey Support (MEDIUM priority) +### 10f. WooldridgeDiD Survey Support — SHIPPED -WooldridgeDiD (ETWFE) is the only estimator without `survey_design` -support. The OLS path is straightforward (same as TWFE); the logit/Poisson -paths require survey-weighted IRLS. Should be added to the compatibility -matrix regardless of whether support is implemented before announcement. +WooldridgeDiD (ETWFE) now supports `survey_design` for all three methods +(OLS, logit, Poisson) with `pweight` only (`fweight`/`aweight` rejected). +OLS uses survey-weighted within-transformation + WLS + TSL vcov. +Logit/Poisson use survey-weighted IRLS + X_tilde linearization for TSL +vcov. Replicate-weight designs raise `NotImplementedError`; bootstrap + +survey is rejected. ### 10g. Practitioner Guidance (LOW priority) @@ -229,7 +231,8 @@ the limitation and suggested alternative. | Estimator | Limitation | Alternative | |-----------|-----------|-------------| -| WooldridgeDiD | No `survey_design` support | Not yet implemented (see 10f) | +| WooldridgeDiD | Replicate weights | Use strata/PSU/FPC design with TSL variance | +| WooldridgeDiD | Bootstrap + survey | Use analytical survey SEs (set `n_bootstrap=0`) | | SyntheticDiD | Replicate weights | Use strata/PSU/FPC design with Rao-Wu rescaled bootstrap | | TROP | Replicate weights | Use strata/PSU/FPC design with Rao-Wu rescaled bootstrap | | BaconDecomposition | Replicate weights | Diagnostic only, no inference | diff --git a/tests/test_wooldridge.py b/tests/test_wooldridge.py index 250d1301..19027fce 100644 --- a/tests/test_wooldridge.py +++ b/tests/test_wooldridge.py @@ -1307,3 +1307,289 @@ def test_covariates_affect_ols_att(self, cov_data): assert r_cov.overall_att != r_nocov.overall_att, ( "Covariate-adjusted ATT should differ from unadjusted" ) + + +class TestWooldridgeSurvey: + """Survey design support for WooldridgeDiD.""" + + @pytest.fixture + def survey_panel(self): + """Panel data with survey design columns.""" + rng = np.random.default_rng(99) + n_units, n_periods = 80, 5 + rows = [] + for u in range(n_units): + cohort = 3 if u < 30 else (4 if u < 50 else 0) + stratum = u % 4 + psu = u # globally unique PSU per unit + weight = 1.0 + rng.exponential(0.5) + for t in range(1, n_periods + 1): + treated = int(cohort > 0 and t >= cohort) + y_cont = 1.0 + 2.0 * treated + 0.3 * rng.standard_normal() + eta = -0.5 + 1.0 * treated + 0.1 * rng.standard_normal() + y_bin = int(rng.random() < 1 / (1 + np.exp(-eta))) + mu = np.exp(0.5 + 0.5 * treated + 0.1 * rng.standard_normal()) + y_count = float(rng.poisson(mu)) + rows.append({ + "unit": u, "time": t, "cohort": cohort, + "y": y_cont, "y_bin": y_bin, "y_count": y_count, + "stratum": stratum, "psu": psu, "weight": weight, + }) + return pd.DataFrame(rows) + + def test_ols_survey_runs(self, survey_panel): + """OLS with full survey design completes.""" + from diff_diff.survey import SurveyDesign + sd = SurveyDesign(weights="weight", strata="stratum", psu="psu") + r = WooldridgeDiD().fit( + survey_panel, outcome="y", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + assert np.isfinite(r.overall_att) + assert np.isfinite(r.overall_se) + assert r.overall_se > 0 + + def test_ols_survey_se_differs_from_naive(self, survey_panel): + """Survey SE should differ from naive (unweighted) SE.""" + from diff_diff.survey import SurveyDesign + sd = SurveyDesign(weights="weight", strata="stratum", psu="psu") + r_survey = WooldridgeDiD().fit( + survey_panel, outcome="y", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + r_naive = WooldridgeDiD().fit( + survey_panel, outcome="y", unit="unit", time="time", + cohort="cohort", + ) + assert r_survey.overall_se != r_naive.overall_se + + def test_logit_survey_runs(self, survey_panel): + """Logit with survey design completes.""" + from diff_diff.survey import SurveyDesign + sd = SurveyDesign(weights="weight", strata="stratum", psu="psu") + r = WooldridgeDiD(method="logit").fit( + survey_panel, outcome="y_bin", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + assert np.isfinite(r.overall_att) + assert np.isfinite(r.overall_se) + assert r.overall_se > 0 + + def test_poisson_survey_runs(self, survey_panel): + """Poisson with survey design completes.""" + from diff_diff.survey import SurveyDesign + sd = SurveyDesign(weights="weight", strata="stratum", psu="psu") + r = WooldridgeDiD(method="poisson").fit( + survey_panel, outcome="y_count", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + assert np.isfinite(r.overall_att) + assert np.isfinite(r.overall_se) + assert r.overall_se > 0 + + def test_bootstrap_survey_rejected(self, survey_panel): + """n_bootstrap > 0 with survey_design raises ValueError.""" + from diff_diff.survey import SurveyDesign + sd = SurveyDesign(weights="weight") + with pytest.raises(ValueError, match="Bootstrap inference is not supported with survey_design"): + WooldridgeDiD(n_bootstrap=100).fit( + survey_panel, outcome="y", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + + def test_weights_only_survey(self, survey_panel): + """Weights-only survey (no strata/PSU) works.""" + from diff_diff.survey import SurveyDesign + sd = SurveyDesign(weights="weight") + r = WooldridgeDiD().fit( + survey_panel, outcome="y", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + assert np.isfinite(r.overall_att) + assert np.isfinite(r.overall_se) + assert r.survey_metadata is not None + + def test_survey_metadata_present(self, survey_panel): + """survey_metadata is populated with correct fields.""" + from diff_diff.survey import SurveyDesign + sd = SurveyDesign(weights="weight", strata="stratum", psu="psu") + r = WooldridgeDiD().fit( + survey_panel, outcome="y", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + sm = r.survey_metadata + assert sm is not None + assert sm.weight_type == "pweight" + assert sm.effective_n > 0 + assert sm.design_effect > 0 + assert sm.n_strata is not None + assert sm.n_psu is not None + assert sm.df_survey is not None + + def test_replicate_weights_rejected(self, survey_panel): + """Replicate-weight designs raise NotImplementedError.""" + from diff_diff.survey import SurveyDesign + # Add replicate weight columns + survey_panel["rep_w1"] = 1.0 + survey_panel["rep_w2"] = 1.0 + sd = SurveyDesign( + weights="weight", + replicate_weights=["rep_w1", "rep_w2"], + replicate_method="BRR", + ) + with pytest.raises(NotImplementedError, match="replicate-weight variance"): + WooldridgeDiD().fit( + survey_panel, outcome="y", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + + def test_weights_only_plus_cluster(self, survey_panel): + """Weights-only survey + cluster= injects cluster as PSU.""" + from diff_diff.survey import SurveyDesign + sd = SurveyDesign(weights="weight") + r = WooldridgeDiD(cluster="stratum").fit( + survey_panel, outcome="y", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + # Cluster should have been injected as PSU + n_strata = survey_panel["stratum"].nunique() + assert r.survey_metadata is not None + assert r.survey_metadata.n_psu == n_strata + + # SE should differ from same run without cluster + r_no_cluster = WooldridgeDiD().fit( + survey_panel, outcome="y", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + assert r.overall_se != r_no_cluster.overall_se + + def test_survey_gt_weights_are_counts(self, survey_panel): + """Survey aggregation uses cell counts, not survey-weight sums.""" + from diff_diff.survey import SurveyDesign + sd = SurveyDesign(weights="weight", strata="stratum", psu="unit") + r = WooldridgeDiD(method="logit").fit( + survey_panel, outcome="y_bin", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + for k, w in r._gt_weights.items(): + assert isinstance(w, int), ( + f"gt_weights[{k}] = {w} (type {type(w).__name__}); " + f"expected int (cell count)" + ) + + def test_weights_only_no_cluster_implicit_psu(self, survey_panel): + """Weights-only survey without cluster= keeps implicit per-obs PSUs.""" + from diff_diff.survey import SurveyDesign + from diff_diff.wooldridge import _filter_sample + sd = SurveyDesign(weights="weight") + r = WooldridgeDiD().fit( + survey_panel, outcome="y", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + # n_psu should equal n_obs in the filtered sample (not n_units) + sample = _filter_sample( + survey_panel.copy().assign(cohort=lambda d: d["cohort"].fillna(0)), + "unit", "time", "cohort", "not_yet_treated", 0, + ) + assert r.survey_metadata is not None + assert r.survey_metadata.n_psu == len(sample) + + def test_fweight_rejected(self, survey_panel): + """fweight raises ValueError (pweight only).""" + from diff_diff.survey import SurveyDesign + # Use integer weights so fweight validation passes in resolve(), + # and the pweight guard in _resolve_survey_for_wooldridge fires. + df = survey_panel.copy() + df["int_weight"] = 1 + sd = SurveyDesign(weights="int_weight", weight_type="fweight") + with pytest.raises(ValueError, match="weight_type='pweight'"): + WooldridgeDiD().fit( + df, outcome="y", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + + def test_poisson_zero_weight_cell(self, survey_panel): + """Poisson survey fit handles zero-weight treated cells cleanly.""" + from diff_diff.survey import SurveyDesign + df = survey_panel.copy() + # Zero out weights for one treated cohort so some cells have zero weight + df.loc[df["cohort"] == 3, "weight"] = 0.0 + sd = SurveyDesign(weights="weight", strata="stratum", psu="unit") + r = WooldridgeDiD(method="poisson").fit( + df, outcome="y_count", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + assert np.isfinite(r.overall_att) + assert np.isfinite(r.overall_se) + + def test_ols_survey_rank_deficient(self, survey_panel): + """Survey OLS handles rank-deficient all-eventually-treated designs.""" + from diff_diff.survey import SurveyDesign + # Remove never-treated (cohort=0) to create rank-deficient design + df = survey_panel[survey_panel["cohort"] > 0].copy() + sd = SurveyDesign(weights="weight", strata="stratum", psu="unit") + r = WooldridgeDiD(control_group="not_yet_treated").fit( + df, outcome="y", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + assert np.isfinite(r.overall_att) + assert np.isfinite(r.overall_se) + + def test_ols_survey_zero_weight_unit_rejected(self, survey_panel): + """Zero-weight unit raises ValueError before within_transform.""" + from diff_diff.survey import SurveyDesign + df = survey_panel.copy() + # Zero out all weights for unit 0 + df.loc[df["unit"] == 0, "weight"] = 0.0 + sd = SurveyDesign(weights="weight", strata="stratum", psu="unit") + with pytest.raises(ValueError, match="Survey weights sum to zero for unit"): + WooldridgeDiD().fit( + df, outcome="y", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + + def test_logit_survey_zero_weight_cell(self, survey_panel): + """Logit survey fit skips zero-weight treated cells cleanly.""" + from diff_diff.survey import SurveyDesign + df = survey_panel.copy() + df.loc[df["cohort"] == 3, "weight"] = 0.0 + sd = SurveyDesign(weights="weight", strata="stratum", psu="unit") + r = WooldridgeDiD(method="logit").fit( + df, outcome="y_bin", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + assert np.isfinite(r.overall_att) + assert np.isfinite(r.overall_se) + + def test_ols_survey_non_range_index(self, survey_panel): + """OLS survey zero-weight guard works with non-RangeIndex DataFrames.""" + from diff_diff.survey import SurveyDesign + df = survey_panel.copy() + df.index = df.index + 1000 # shift to non-zero-based index + df.loc[df["unit"] == 0, "weight"] = 0.0 + sd = SurveyDesign(weights="weight", strata="stratum", psu="unit") + with pytest.raises(ValueError, match="Survey weights sum to zero for unit"): + WooldridgeDiD().fit( + df, outcome="y", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + + def test_survey_aggregate_and_summary(self, survey_panel): + """Survey aggregate() uses df_survey and summary() shows survey block.""" + from diff_diff.survey import SurveyDesign + sd = SurveyDesign(weights="weight", strata="stratum", psu="unit") + r = WooldridgeDiD().fit( + survey_panel, outcome="y", unit="unit", time="time", + cohort="cohort", survey_design=sd, + ) + # aggregate() should use t-distribution with survey df + r.aggregate("group") + assert r.group_effects is not None + assert r._df_survey is not None + for eff in r.group_effects.values(): + assert np.isfinite(eff["p_value"]) + + # summary() should include survey design block + s = r.summary() + assert "Survey Design" in s + assert "pweight" in s