diff --git a/diff_diff/prep_dgp.py b/diff_diff/prep_dgp.py index 914a68d9..546a5a25 100644 --- a/diff_diff/prep_dgp.py +++ b/diff_diff/prep_dgp.py @@ -1129,6 +1129,37 @@ def generate_staggered_ddd_data( return pd.DataFrame(records) +def _rank_pair_weights( + unit_weight: np.ndarray, + unit_stratum: np.ndarray, + y0: np.ndarray, + n_strata: int, +) -> None: + """Rank-pair weights with Y(0) within each stratum (in-place). + + High-outcome units receive higher weights, modeling informative sampling + where hard-to-reach (high-outcome) subpopulations are under-covered + and therefore carry larger inverse-selection-probability weights. + """ + for s in range(n_strata): + mask = unit_stratum == s + n_s = mask.sum() + if n_s <= 1: + continue + idx_s = np.where(mask)[0] + w_vals = unit_weight[idx_s].copy() + if w_vals.std() < 1e-10: + # No within-stratum variation: create rank-based weights + # scaled to preserve stratum baseline weight level + ranks = np.argsort(np.argsort(y0[idx_s])).astype(float) + 1.0 + unit_weight[idx_s] = ranks / ranks.mean() * w_vals.mean() + else: + # Rank-pair: highest Y(0) gets heaviest weight + y0_order = np.argsort(-y0[idx_s]) + w_sorted = np.sort(w_vals)[::-1] # heaviest first + unit_weight[idx_s[y0_order]] = w_sorted + + def generate_survey_did_data( n_units: int = 200, n_periods: int = 8, @@ -1149,6 +1180,15 @@ def generate_survey_did_data( add_covariates: bool = False, panel: bool = True, seed: Optional[int] = None, + # --- Research-grade DGP parameters --- + icc: Optional[float] = None, + weight_cv: Optional[float] = None, + informative_sampling: bool = False, + heterogeneous_te_by_strata: bool = False, + strata_sizes: Optional[List[int]] = None, + return_true_population_att: bool = False, + covariate_effects: Optional[tuple] = None, + te_covariate_interaction: float = 0.0, ) -> pd.DataFrame: """ Generate synthetic staggered DiD data with survey structure. @@ -1215,6 +1255,52 @@ def generate_survey_did_data( CallawaySantAnna(panel=False)). seed : int, optional Random seed for reproducibility. + icc : float, optional + Target intra-class correlation coefficient (0 < icc < 1). Overrides + ``psu_re_sd`` via the variance decomposition: + ``psu_re_sd = sqrt(icc * (sigma2_unit + sigma2_noise + sigma2_cov) / + ((1 - icc) * (1 + psu_period_factor^2)))`` where ``sigma2_cov`` + includes covariate variance when ``add_covariates=True``. + Cannot be combined with a non-default ``psu_re_sd``. + weight_cv : float, optional + Target coefficient of variation for sampling weights. Generates + LogNormal weights normalized to mean 1, bypassing ``weight_variation``. + Cannot be combined with a non-default ``weight_variation``. + informative_sampling : bool, default=False + If True, sampling weights correlate with Y(0) — high-outcome units + receive higher weights (under-coverage → larger inverse-selection- + probability weights). Uses rank-pairing within each stratum. For + panel data, ranking is done once from period-1 outcomes. For + repeated cross-sections, ranking is refreshed each period. Within + each stratum, rank-based weights are scaled to preserve the + stratum's baseline weight level from ``weight_variation``. + When ``add_covariates=True``, covariate contributions are + included in the Y(0) ranking. + heterogeneous_te_by_strata : bool, default=False + If True, treatment effect varies by stratum: + ``TE_h = TE * (1 + 0.5 * (h - mean) / std)``. Creates a gap + between unweighted and population ATT. With ``n_strata=1``, + all units receive the base ``treatment_effect``. + strata_sizes : list of int, optional + Custom per-stratum unit counts. Must have length ``n_strata`` and + sum to ``n_units``. Replaces equal allocation across strata. + return_true_population_att : bool, default=False + If True, attaches a diagnostic dict to ``df.attrs["dgp_truth"]`` + with keys: ``population_att`` (weight-weighted average of treated + true effects), ``deff_kish`` (1 + CV(w)^2), ``base_stratum_effects`` + (base stratum TEs before dynamic/covariate modifiers), + ``icc_realized`` (ANOVA-based + ICC computed on period-1 data). + covariate_effects : tuple of (float, float), optional + Coefficients ``(beta1, beta2)`` for covariates x1 and x2 in the + outcome equation ``y += beta1 * x1 + beta2 * x2``. Default uses + ``(0.5, 0.3)``. Only used when ``add_covariates=True``. The ICC + calibration automatically adjusts for the implied covariate variance. + te_covariate_interaction : float, default=0.0 + Coefficient for treatment-by-covariate interaction: + ``TE_i = base_TE + te_covariate_interaction * x1_i``. Creates + unit-level treatment effect heterogeneity driven by the continuous + covariate. Requires ``add_covariates=True``. Returns ------- @@ -1222,6 +1308,8 @@ def generate_survey_did_data( Columns: unit, period, outcome, first_treat, treated, true_effect, stratum, psu, fpc, weight. Also rep_0..rep_K if include_replicate_weights=True, and x1, x2 if add_covariates=True. + If ``return_true_population_att=True``, ``df.attrs["dgp_truth"]`` + contains DGP diagnostics. """ rng = np.random.default_rng(seed) @@ -1284,30 +1372,120 @@ def generate_survey_did_data( f"weight_variation must be one of {valid_wv}, got {weight_variation!r}" ) + # --- Validate research-grade DGP parameters --- + if icc is not None: + if not (0 < icc < 1): + raise ValueError(f"icc must be between 0 and 1 (exclusive), got {icc}") + if psu_re_sd != 2.0: + raise ValueError( + "Cannot specify both icc and a non-default psu_re_sd. " + "icc overrides psu_re_sd via the ICC formula." + ) + + if weight_cv is not None: + if not np.isfinite(weight_cv) or weight_cv <= 0: + raise ValueError( + f"weight_cv must be finite and positive, got {weight_cv}" + ) + if weight_variation != "moderate": + raise ValueError( + "Cannot specify both weight_cv and a non-default " + "weight_variation. weight_cv overrides weight_variation." + ) + + if strata_sizes is not None: + strata_sizes = list(strata_sizes) + for ss in strata_sizes: + if isinstance(ss, bool) or not isinstance(ss, (int, np.integer)): + raise ValueError( + f"strata_sizes must contain integers, got {ss!r}" + ) + if len(strata_sizes) != n_strata: + raise ValueError( + f"strata_sizes must have length n_strata={n_strata}, " + f"got {len(strata_sizes)}" + ) + if any(s < 1 for s in strata_sizes): + raise ValueError("All strata_sizes must be >= 1") + if sum(strata_sizes) != n_units: + raise ValueError( + f"strata_sizes must sum to n_units={n_units}, " + f"got {sum(strata_sizes)}" + ) + + # --- Validate and resolve covariate coefficients --- + if covariate_effects is not None: + covariate_effects = tuple(covariate_effects) + if len(covariate_effects) != 2: + raise ValueError( + f"covariate_effects must have length 2, got {len(covariate_effects)}" + ) + if not all(np.isfinite(c) for c in covariate_effects): + raise ValueError( + f"covariate_effects must be finite, got {covariate_effects}" + ) + _beta1, _beta2 = covariate_effects if covariate_effects is not None else (0.5, 0.3) + + if not np.isfinite(te_covariate_interaction): + raise ValueError( + f"te_covariate_interaction must be finite, got {te_covariate_interaction}" + ) + if te_covariate_interaction != 0.0 and not add_covariates: + raise ValueError( + "te_covariate_interaction requires add_covariates=True" + ) + + # --- ICC -> psu_re_sd resolution --- + if icc is not None: + # Covariate variance: Var(beta1*x1) + Var(beta2*x2) + # where x1 ~ N(0,1), x2 ~ Bernoulli(0.5) + cov_var = (_beta1**2 * 1.0 + _beta2**2 * 0.25) if add_covariates else 0.0 + non_psu_var = unit_fe_sd**2 + noise_sd**2 + cov_var + if non_psu_var < 1e-12: + raise ValueError( + "icc requires non-zero non-PSU variance " + "(unit_fe_sd, noise_sd, or add_covariates must contribute variance)" + ) + psu_re_sd = np.sqrt( + icc * non_psu_var + / ((1 - icc) * (1 + psu_period_factor**2)) + ) + # --- Survey structure: assign units to strata and PSUs --- n_psu_total = n_strata * psu_per_stratum - units_per_stratum = n_units // n_strata - remainder = n_units % n_strata + + if strata_sizes is not None: + stratum_n = strata_sizes + else: + units_per_stratum = n_units // n_strata + remainder = n_units % n_strata + stratum_n = [ + units_per_stratum + (1 if s < remainder else 0) + for s in range(n_strata) + ] unit_stratum = np.empty(n_units, dtype=int) unit_psu = np.empty(n_units, dtype=int) idx = 0 for s in range(n_strata): - # Distribute remainder units across first strata - n_s = units_per_stratum + (1 if s < remainder else 0) + n_s = stratum_n[s] unit_stratum[idx : idx + n_s] = s - - # Assign PSUs within this stratum psu_start = s * psu_per_stratum for j in range(n_s): unit_psu[idx + j] = psu_start + (j % psu_per_stratum) idx += n_s - # Sampling weights: vary by stratum (inverse selection probability) - scale_map = {"none": 0.0, "moderate": 1.0, "high": 3.0} - scale = scale_map.get(weight_variation, 1.0) - denom = max(n_strata - 1, 1) - unit_weight = 1.0 + scale * (unit_stratum / denom) + # Sampling weights + if weight_cv is not None: + sigma_ln = np.sqrt(np.log(1 + weight_cv**2)) + raw_w = rng.lognormal(-sigma_ln**2 / 2, sigma_ln, size=n_units) + unit_weight = raw_w / raw_w.mean() + else: + # Stratum-based weights (inverse selection probability) + scale_map = {"none": 0.0, "moderate": 1.0, "high": 3.0} + scale = scale_map.get(weight_variation, 1.0) + denom = max(n_strata - 1, 1) + unit_weight = 1.0 + scale * (unit_stratum / denom) # --- Treatment assignment (cohort structure) --- n_never = int(n_units * never_treated_frac) @@ -1344,6 +1522,37 @@ def generate_survey_did_data( 0, psu_re_sd * psu_period_factor, size=(n_psu_total, n_periods) ) + # --- Informative sampling (panel path): pre-draw FEs, rank-pair weights --- + if informative_sampling and panel: + _panel_unit_fe = rng.normal(0, unit_fe_sd, size=n_units) + y0_period1 = ( + _panel_unit_fe + + psu_re[unit_psu] + + psu_period_re[unit_psu, 0] + + 0.5 + ) + if add_covariates: + _panel_x1 = rng.normal(0, 1, size=n_units) + _panel_x2 = rng.choice([0, 1], size=n_units) + y0_period1 = y0_period1 + _beta1 * _panel_x1 + _beta2 * _panel_x2 + _rank_pair_weights(unit_weight, unit_stratum, y0_period1, n_strata) + + # Save base weights for cross-section informative sampling (reset each period) + if informative_sampling and not panel: + _base_weight = unit_weight.copy() + + # --- Heterogeneous treatment effects by stratum --- + if heterogeneous_te_by_strata: + if n_strata == 1: + te_by_stratum = np.array([treatment_effect]) + else: + strata_idx = np.arange(n_strata, dtype=float) + te_by_stratum = treatment_effect * ( + 1 + 0.5 * (strata_idx - strata_idx.mean()) / strata_idx.std() + ) + else: + te_by_stratum = None + # --- Generate panel or repeated cross-sections --- records = [] for t in range(1, n_periods + 1): @@ -1351,21 +1560,47 @@ def generate_survey_did_data( unit_fe = rng.normal(0, unit_fe_sd, size=n_units) if panel and t > 1: pass # reuse unit_fe from first period (set below) - if panel and t == 1: + if informative_sampling and panel: + unit_fe = _panel_unit_fe # use pre-drawn FEs + elif panel and t == 1: _panel_unit_fe = unit_fe # save for reuse - if panel and t > 1: + elif panel and t > 1: unit_fe = _panel_unit_fe # type: ignore[possibly-undefined] - x1 = rng.normal(0, 1, size=n_units) if add_covariates else None - if panel and t > 1 and add_covariates: + # Cross-section informative sampling: re-rank weights each period + if informative_sampling and not panel: + # Draw covariates early so they can be included in Y(0) ranking + if add_covariates: + x1 = rng.normal(0, 1, size=n_units) + x2 = rng.choice([0, 1], size=n_units) + unit_weight = _base_weight.copy() # type: ignore[possibly-undefined] + y0_t = ( + unit_fe + + psu_re[unit_psu] + + psu_period_re[unit_psu, t - 1] + + 0.5 * t + ) + if add_covariates: + y0_t = y0_t + _beta1 * x1 + _beta2 * x2 + _rank_pair_weights(unit_weight, unit_stratum, y0_t, n_strata) + + # Covariates — may already be drawn by informative sampling above + if informative_sampling and panel and add_covariates: + x1 = _panel_x1 # pre-drawn before loop for ranking + x2 = _panel_x2 + elif informative_sampling and not panel and add_covariates: + pass # x1, x2 already drawn in cross-section ranking block + elif add_covariates: + x1 = rng.normal(0, 1, size=n_units) + x2 = rng.choice([0, 1], size=n_units) + else: + x1 = None + x2 = None + if not informative_sampling and panel and t > 1 and add_covariates: x1 = _panel_x1 # type: ignore[possibly-undefined] - elif panel and t == 1 and add_covariates: - _panel_x1 = x1 - - x2 = rng.choice([0, 1], size=n_units) if add_covariates else None - if panel and t > 1 and add_covariates: x2 = _panel_x2 # type: ignore[possibly-undefined] - elif panel and t == 1 and add_covariates: + elif not informative_sampling and panel and t == 1 and add_covariates: + _panel_x1 = x1 _panel_x2 = x2 for i in range(n_units): @@ -1374,12 +1609,17 @@ def generate_survey_did_data( y = unit_fe[i] + psu_re[unit_psu[i]] + psu_period_re[unit_psu[i], t - 1] + 0.5 * t if add_covariates: - y += 0.5 * x1[i] + 0.3 * x2[i] + y += _beta1 * x1[i] + _beta2 * x2[i] treated = int(g_i > 0 and t >= g_i) true_eff = 0.0 if treated: - true_eff = treatment_effect + if te_by_stratum is not None: + true_eff = float(te_by_stratum[unit_stratum[i]]) + else: + true_eff = treatment_effect + if te_covariate_interaction != 0.0: + true_eff += te_covariate_interaction * x1[i] if dynamic_effects: true_eff *= 1 + effect_growth * (t - g_i) y += true_eff @@ -1426,4 +1666,53 @@ def generate_survey_did_data( w_r[w_r > 0] *= n_rep / (n_rep - 1) df[f"rep_{r}"] = w_r + # --- DGP truth diagnostics --- + if return_true_population_att: + treated_mask = df["treated"] == 1 + if treated_mask.any(): + w_treated = df.loc[treated_mask, "weight"].values + te_treated = df.loc[treated_mask, "true_effect"].values + population_att = float(np.average(te_treated, weights=w_treated)) + else: + population_att = float("nan") + + if te_by_stratum is not None: + stratum_effects = { + int(s): float(te_by_stratum[s]) for s in range(n_strata) + } + else: + stratum_effects = { + int(s): float(treatment_effect) for s in range(n_strata) + } + + # Kish DEFF from weight variation + w_all = df.groupby("unit")["weight"].first().values + cv_w = float(w_all.std() / w_all.mean()) if w_all.mean() > 0 else 0.0 + deff_kish = 1 + cv_w**2 + + # Realized ICC (ANOVA-based, period-1 only to avoid TE contamination) + _p1 = df[df["period"] == 1] + _groups = _p1.groupby("psu")["outcome"] + _n_total = len(_p1) + _n_groups = _groups.ngroups + # ICC undefined with < 2 groups or no within-group replication + if _n_groups < 2 or _n_total <= _n_groups: + icc_realized = float("nan") + else: + _n_bar = _n_total / _n_groups + _grand_mean = _p1["outcome"].mean() + _ssb = (_groups.size() * (_groups.mean() - _grand_mean) ** 2).sum() + _msb = _ssb / (_n_groups - 1) + _ssw = _groups.apply(lambda x: ((x - x.mean()) ** 2).sum()).sum() + _msw = _ssw / (_n_total - _n_groups) + _denom = _msb + (_n_bar - 1) * _msw + icc_realized = float((_msb - _msw) / _denom) if _denom > 0 else float("nan") + + df.attrs["dgp_truth"] = { + "population_att": population_att, + "deff_kish": float(deff_kish), + "base_stratum_effects": stratum_effects, + "icc_realized": icc_realized, + } + return df diff --git a/docs/methodology/REGISTRY.md b/docs/methodology/REGISTRY.md index 14ad53f1..26c311a1 100644 --- a/docs/methodology/REGISTRY.md +++ b/docs/methodology/REGISTRY.md @@ -2515,6 +2515,19 @@ The 8-step workflow in `docs/llms-practitioner.txt` is adapted from Baker et al. covariates). Paper's Step 8 is "Keep learning." The mandatory with/without covariate comparison is a diff-diff convention. +### Survey DGP (`generate_survey_did_data`) + +- **Note:** The `icc` parameter calibrates `psu_re_sd` using the full variance + decomposition `Var(Y) = sigma²_psu * (1 + psu_period_factor²) + sigma²_unit + + sigma²_noise + sigma²_cov`. When `add_covariates=True`, covariate variance + `sigma²_cov = beta1² * Var(x1) + beta2² * Var(x2)` is included, where + `(beta1, beta2)` defaults to `(0.5, 0.3)` but is configurable via + `covariate_effects`. +- **Note:** When `informative_sampling=True` and `add_covariates=True`, covariate + contributions are included in the Y(0) ranking used for weight assignment. + Covariates are pre-drawn before the ranking step (panel: once before the loop; + cross-section: each period) and reused in the outcome generation. + --- # Version History diff --git a/tests/test_prep.py b/tests/test_prep.py index 72491a25..0674186a 100644 --- a/tests/test_prep.py +++ b/tests/test_prep.py @@ -1504,3 +1504,448 @@ def test_psu_period_factor_validation(self): generate_survey_did_data(psu_period_factor=math.nan, seed=42) with pytest.raises(ValueError, match="psu_period_factor"): generate_survey_did_data(psu_period_factor=math.inf, seed=42) + + +class TestSurveyDGPResearchGrade: + """Tests for research-grade DGP parameters added to generate_survey_did_data.""" + + def test_icc_parameter(self): + """Realized ICC should be within 50% relative tolerance of target.""" + from diff_diff.prep_dgp import generate_survey_did_data + + target_icc = 0.3 + df = generate_survey_did_data( + n_units=1000, icc=target_icc, seed=42 + ) + # ANOVA-based ICC on period 1 (pre-treatment, no TE contamination) + p1 = df[df["period"] == 1] + groups = p1.groupby("psu")["outcome"] + grand_mean = p1["outcome"].mean() + n_total = len(p1) + n_groups = groups.ngroups + n_bar = n_total / n_groups + ssb = (groups.size() * (groups.mean() - grand_mean) ** 2).sum() + msb = ssb / (n_groups - 1) + ssw = groups.apply(lambda x: ((x - x.mean()) ** 2).sum()).sum() + msw = ssw / (n_total - n_groups) + realized_icc = (msb - msw) / (msb + (n_bar - 1) * msw) + assert abs(realized_icc - target_icc) / target_icc < 0.50 + + def test_icc_zero_variance_rejected(self): + """icc with zero non-PSU variance should raise ValueError.""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="non-zero non-PSU variance"): + generate_survey_did_data( + icc=0.3, unit_fe_sd=0, noise_sd=0, add_covariates=False, seed=42 + ) + + def test_icc_and_psu_re_sd_conflict(self): + """Cannot specify both icc and a non-default psu_re_sd.""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="Cannot specify both icc"): + generate_survey_did_data(icc=0.3, psu_re_sd=3.0, seed=42) + + def test_icc_out_of_range(self): + """icc must be in (0, 1).""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="icc must be between"): + generate_survey_did_data(icc=0.0, seed=42) + with pytest.raises(ValueError, match="icc must be between"): + generate_survey_did_data(icc=1.0, seed=42) + + def test_weight_cv_parameter(self): + """Realized weight CV should be within 0.15 of target.""" + from diff_diff.prep_dgp import generate_survey_did_data + + target_cv = 0.5 + df = generate_survey_did_data( + n_units=1000, weight_cv=target_cv, seed=42 + ) + weights = df.groupby("unit")["weight"].first().values + realized_cv = weights.std() / weights.mean() + assert abs(realized_cv - target_cv) < 0.15 + + def test_weight_cv_and_weight_variation_conflict(self): + """Cannot specify both weight_cv and a non-default weight_variation.""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="Cannot specify both weight_cv"): + generate_survey_did_data( + weight_cv=0.5, weight_variation="high", seed=42 + ) + + def test_weight_cv_nan_inf(self): + """weight_cv must reject NaN and Inf.""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="weight_cv must be finite"): + generate_survey_did_data(weight_cv=np.nan, seed=42) + with pytest.raises(ValueError, match="weight_cv must be finite"): + generate_survey_did_data(weight_cv=np.inf, seed=42) + + def test_informative_sampling_panel(self): + """Informative sampling should create weight-outcome correlation.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=1000, + informative_sampling=True, + weight_cv=0.5, + seed=42, + ) + # Period-1 outcomes: weighted mean should differ from unweighted + p1 = df[df["period"] == 1] + unwt_mean = p1["outcome"].mean() + wt_mean = np.average(p1["outcome"], weights=p1["weight"]) + assert abs(wt_mean - unwt_mean) > 0.1 + # Positive correlation: higher outcome → heavier weight + corr = np.corrcoef(p1["weight"], p1["outcome"])[0, 1] + assert corr > 0.1 + + def test_informative_sampling_default_weights(self): + """Informative sampling preserves stratum-level weight structure.""" + from diff_diff.prep_dgp import generate_survey_did_data + + # Generate with informative_sampling but default weight_variation + df = generate_survey_did_data( + n_units=1000, + informative_sampling=True, + seed=42, + ) + # Reference: expected stratum mean weights from weight_variation="moderate" + # Formula: 1.0 + 1.0 * (s / max(n_strata-1, 1)) for s=0..4 + p1 = df[df["period"] == 1] + for s in range(5): + expected_mean = 1.0 + 1.0 * (s / 4) + stratum_weights = p1.loc[p1["stratum"] == s, "weight"] + assert abs(stratum_weights.mean() - expected_mean) < 0.15, ( + f"Stratum {s}: expected mean ~{expected_mean}, " + f"got {stratum_weights.mean():.3f}" + ) + # Within-stratum variation should exist (informative sampling) + assert stratum_weights.std() > 0.01 + + def test_informative_sampling_cross_section(self): + """Cross-section informative sampling: per-period positive correlation. + + Under w_i = 1/pi_i, under-covered (high-outcome) units get heavier + weights, so weight and outcome should be positively correlated. + """ + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=1000, + informative_sampling=True, + weight_cv=0.5, + panel=False, + seed=42, + ) + # Check correlation for period 1 + p1 = df[df["period"] == 1] + corr = np.corrcoef(p1["weight"], p1["outcome"])[0, 1] + assert corr > 0.1 + + def test_informative_sampling_cross_section_default_weights(self): + """Cross-section informative sampling with default weight_variation.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=1000, + informative_sampling=True, + panel=False, + seed=42, + ) + p1 = df[df["period"] == 1] + for s in range(5): + expected_mean = 1.0 + 1.0 * (s / 4) + stratum_weights = p1.loc[p1["stratum"] == s, "weight"] + assert abs(stratum_weights.mean() - expected_mean) < 0.15 + assert stratum_weights.std() > 0.01 + + def test_icc_with_covariates(self): + """ICC calibration should account for covariate variance.""" + from diff_diff.prep_dgp import generate_survey_did_data + + target_icc = 0.3 + df = generate_survey_did_data( + n_units=1000, icc=target_icc, add_covariates=True, seed=42 + ) + # ANOVA-based ICC on period 1 + p1 = df[df["period"] == 1] + groups = p1.groupby("psu")["outcome"] + grand_mean = p1["outcome"].mean() + n_total = len(p1) + n_groups = groups.ngroups + n_bar = n_total / n_groups + ssb = (groups.size() * (groups.mean() - grand_mean) ** 2).sum() + msb = ssb / (n_groups - 1) + ssw = groups.apply(lambda x: ((x - x.mean()) ** 2).sum()).sum() + msw = ssw / (n_total - n_groups) + realized_icc = (msb - msw) / (msb + (n_bar - 1) * msw) + assert abs(realized_icc - target_icc) / target_icc < 0.50 + + def test_informative_sampling_with_covariates_panel(self): + """Informative sampling includes covariates in Y(0) ranking (panel).""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=1000, + informative_sampling=True, + add_covariates=True, + seed=42, + ) + p1 = df[df["period"] == 1] + # Positive weight-outcome correlation preserved with covariates + corr = np.corrcoef(p1["weight"], p1["outcome"])[0, 1] + assert corr > 0.1 + # Covariates should be present + assert "x1" in df.columns + assert "x2" in df.columns + + def test_informative_sampling_with_covariates_cross_section(self): + """Informative sampling includes covariates in Y(0) ranking (cross-section).""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=1000, + informative_sampling=True, + add_covariates=True, + panel=False, + seed=42, + ) + p1 = df[df["period"] == 1] + corr = np.corrcoef(p1["weight"], p1["outcome"])[0, 1] + assert corr > 0.1 + assert "x1" in df.columns + + def test_informative_sampling_covariate_ranking_direct(self): + """Verify covariates actually affect weight assignment in ranking. + + Use large covariate effects with tiny unit_fe_sd/psu_re_sd so + covariates dominate Y(0). Weights with nonzero vs zero covariate + effects should differ. + """ + from diff_diff.prep_dgp import generate_survey_did_data + + # Covariates dominate: large beta, tiny structural variance + df_with = generate_survey_did_data( + n_units=200, + informative_sampling=True, + add_covariates=True, + covariate_effects=(5.0, 0.0), + unit_fe_sd=0.01, + psu_re_sd=0.01, + noise_sd=0.01, + seed=42, + ) + df_without = generate_survey_did_data( + n_units=200, + informative_sampling=True, + add_covariates=True, + covariate_effects=(0.0, 0.0), + unit_fe_sd=0.01, + psu_re_sd=0.01, + noise_sd=0.01, + seed=42, + ) + # Weight assignments should differ when covariates dominate ranking + w_with = df_with[df_with["period"] == 1]["weight"].values + w_without = df_without[df_without["period"] == 1]["weight"].values + assert not np.allclose(w_with, w_without, atol=0.01) + + def test_heterogeneous_te_by_strata(self): + """Unweighted mean TE should differ from population ATT.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=1000, + heterogeneous_te_by_strata=True, + strata_sizes=[400, 200, 200, 100, 100], + return_true_population_att=True, + seed=42, + ) + treated = df[df["treated"] == 1] + unwt_mean_te = treated["true_effect"].mean() + pop_att = df.attrs["dgp_truth"]["population_att"] + # With unequal strata sizes + heterogeneous TE, these should differ + assert abs(unwt_mean_te - pop_att) > 0.01 + + def test_heterogeneous_te_single_stratum(self): + """n_strata=1 with heterogeneous TE should not crash.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=50, + n_strata=1, + psu_per_stratum=8, + fpc_per_stratum=200.0, + heterogeneous_te_by_strata=True, + seed=42, + ) + treated = df[df["treated"] == 1] + # All treated units should have the base treatment_effect + assert np.allclose(treated["true_effect"].unique(), [2.0], atol=0.01) + + def test_return_true_population_att(self): + """dgp_truth dict should have expected keys and reasonable values.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=1000, + icc=0.3, + return_true_population_att=True, + seed=42, + ) + truth = df.attrs["dgp_truth"] + assert "population_att" in truth + assert "deff_kish" in truth + assert "base_stratum_effects" in truth + assert "icc_realized" in truth + assert truth["deff_kish"] >= 1.0 + assert truth["icc_realized"] >= 0.0 + # icc_realized should track the target ICC (ANOVA-based, same formula) + assert abs(truth["icc_realized"] - 0.3) / 0.3 < 0.50 + + def test_population_att_nan_no_treated(self): + """population_att should be NaN when there are no treated units.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=50, + never_treated_frac=1.0, + return_true_population_att=True, + seed=42, + ) + assert np.isnan(df.attrs["dgp_truth"]["population_att"]) + + def test_icc_realized_nan_no_replication(self): + """icc_realized should be NaN when period-1 has no within-PSU replication.""" + from diff_diff.prep_dgp import generate_survey_did_data + + # 5 units across 5 strata with 8 PSUs each = 1 unit per PSU (no replication) + df = generate_survey_did_data( + n_units=5, + n_strata=5, + psu_per_stratum=8, + return_true_population_att=True, + seed=42, + ) + assert np.isnan(df.attrs["dgp_truth"]["icc_realized"]) + + def test_strata_sizes(self): + """Custom strata_sizes should produce correct per-stratum counts.""" + from diff_diff.prep_dgp import generate_survey_did_data + + sizes = [60, 50, 40, 30, 20] + df = generate_survey_did_data( + n_units=200, strata_sizes=sizes, seed=42 + ) + for s, expected in enumerate(sizes): + actual = df[df["period"] == 1]["stratum"].value_counts().get(s, 0) + assert actual == expected + + def test_strata_sizes_sum_mismatch(self): + """strata_sizes must sum to n_units.""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="strata_sizes must sum"): + generate_survey_did_data( + n_units=200, strata_sizes=[50, 50, 50, 50, 49], seed=42 + ) + + def test_strata_sizes_float_rejected(self): + """strata_sizes must contain integers, not floats.""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="strata_sizes must contain integers"): + generate_survey_did_data( + n_units=200, strata_sizes=[40.0, 40.0, 40.0, 40.0, 40.0], seed=42 + ) + + def test_backward_compatibility(self): + """Default params with same seed produce identical DataFrames.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df1 = generate_survey_did_data(seed=123) + df2 = generate_survey_did_data(seed=123) + pd.testing.assert_frame_equal(df1, df2) + + def test_covariate_effects_custom(self): + """Custom covariate coefficients should change outcome variance.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df_default = generate_survey_did_data( + n_units=500, add_covariates=True, seed=42 + ) + df_large = generate_survey_did_data( + n_units=500, add_covariates=True, + covariate_effects=(2.0, 1.0), seed=42, + ) + # Larger coefficients → larger outcome variance + assert df_large["outcome"].var() > df_default["outcome"].var() + + def test_covariate_effects_zero(self): + """Zero covariate effects should produce same variance as no covariates.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df_no_cov = generate_survey_did_data( + n_units=500, add_covariates=False, seed=42 + ) + df_zero = generate_survey_did_data( + n_units=500, add_covariates=True, + covariate_effects=(0.0, 0.0), seed=42, + ) + # Outcome variance should be similar (covariates contribute nothing) + assert abs(df_zero["outcome"].var() - df_no_cov["outcome"].var()) < 0.5 + + def test_te_covariate_interaction(self): + """Covariate interaction should create unit-level TE heterogeneity.""" + from diff_diff.prep_dgp import generate_survey_did_data + + df = generate_survey_did_data( + n_units=500, + add_covariates=True, + te_covariate_interaction=1.0, + seed=42, + ) + treated = df[df["treated"] == 1] + # true_effect should vary across treated units (not constant) + assert treated["true_effect"].std() > 0.1 + + def test_te_covariate_interaction_requires_covariates(self): + """te_covariate_interaction without add_covariates should raise.""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="te_covariate_interaction requires"): + generate_survey_did_data( + te_covariate_interaction=0.5, add_covariates=False, seed=42 + ) + + def test_covariate_effects_validation(self): + """covariate_effects must be length 2 and finite.""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="covariate_effects must have length 2"): + generate_survey_did_data( + add_covariates=True, covariate_effects=(1.0,), seed=42 + ) + with pytest.raises(ValueError, match="covariate_effects must be finite"): + generate_survey_did_data( + add_covariates=True, covariate_effects=(np.nan, 0.3), seed=42 + ) + with pytest.raises(ValueError, match="covariate_effects must be finite"): + generate_survey_did_data( + add_covariates=True, covariate_effects=(0.5, np.inf), seed=42 + ) + + def test_te_covariate_interaction_validation(self): + """te_covariate_interaction must be finite.""" + from diff_diff.prep_dgp import generate_survey_did_data + + with pytest.raises(ValueError, match="te_covariate_interaction must be finite"): + generate_survey_did_data( + add_covariates=True, te_covariate_interaction=np.nan, seed=42 + )