Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 77 additions & 9 deletions dpsynth/data_generation_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class TabularSynthesizer(primitives.DPMechanism):
default_factory=discrete_mechanisms.MSTMechanism
)
initializers: dict[str, primitives.DPMechanism] | None = None
total_count_mechanism: primitives.DPGaussianCount | None = None
cross_attribute_constraints: Sequence[constraints.Constraint] = ()

def calibrate(
Expand Down Expand Up @@ -182,26 +183,56 @@ def calibrate(
init_budget_fraction,
)

def _resolve_init_budget(self, numerical_bins, init_delta):
"""Returns (initializers, needs_total_count).

Resolves initializers from self.initializers or creates them from the
domain. needs_total_count is True when no CategoricalInitializer is
present to provide a total count estimate.
"""
inits = (
dict(self.initializers)
if self.initializers is not None
else _create_initializers(self.domains, numerical_bins, init_delta)
)
has_total_source = any(
isinstance(i, initialization.CategoricalInitializer)
for i in inits.values()
)
return inits, not has_total_source

def _calibrate_zcdp(
self, zcdp_rho, numerical_bins, init_delta, init_budget_fraction
):
"""Simple additive zCDP budget split."""
inits = self.initializers or _create_initializers(
self.domains, numerical_bins, init_delta
all_inits, needs_total_count = self._resolve_init_budget(
numerical_bins, init_delta
)

init_rho = init_budget_fraction * zcdp_rho
per_col_rho = init_rho / len(inits)
# If no CategoricalInitializer can provide a total count estimate,
# allocate one extra share of init budget for a DPGaussianCount.
num_shares = len(all_inits) + (1 if needs_total_count else 0)
per_col_rho = init_rho / num_shares
discrete_rho = zcdp_rho - init_rho

calibrated_inits = {
col: init.calibrate(zcdp_rho=per_col_rho) for col, init in inits.items()
col: init.calibrate(zcdp_rho=per_col_rho)
for col, init in all_inits.items()
}
calibrated_total = (
primitives.DPGaussianCount().calibrate(zcdp_rho=per_col_rho)
if needs_total_count
else None
)
calibrated_discrete = self.discrete_mechanism.calibrate(
zcdp_rho=discrete_rho
)
return dataclasses.replace(
self,
initializers=calibrated_inits,
discrete_mechanism=calibrated_discrete,
total_count_mechanism=calibrated_total,
)

def _calibrate_approx_dp(
Expand Down Expand Up @@ -234,10 +265,10 @@ def _calibrate_approx_dp(
Returns:
A new TabularSynthesizer instance with calibrated sub-mechanisms.
"""
inits = self.initializers or _create_initializers(
self.domains, numerical_bins, init_delta
inits, needs_total_count = self._resolve_init_budget(
numerical_bins, init_delta
)
num_columns = len(inits)
num_columns = len(inits) + (1 if needs_total_count else 0)

# Stage 1: Convert (epsilon, remaining_delta) to zCDP and calibrate
# initializers with init_budget_fraction of that budget.
Expand All @@ -252,11 +283,17 @@ def _calibrate_approx_dp(
calibrated_inits = {
col: init.calibrate(zcdp_rho=per_col_rho) for col, init in inits.items()
}

calibrated_total = (
primitives.DPGaussianCount().calibrate(zcdp_rho=per_col_rho)
if needs_total_count
else None
)
# Stage 2: With init dp_events fixed, find the tightest discrete budget.
# The accountant handles ApproximateDpEvent deltas from open-set
# initializers automatically.
init_events = [init.dp_event for init in calibrated_inits.values()]
if calibrated_total is not None:
init_events.append(calibrated_total.dp_event)

# Determine accountant type based on discrete mechanism's dp_event.
probe_event = self.discrete_mechanism.calibrate(zcdp_rho=1.0).dp_event
Expand Down Expand Up @@ -285,6 +322,7 @@ def make_event_from_param(discrete_rho):
self,
initializers=calibrated_inits,
discrete_mechanism=calibrated_discrete,
total_count_mechanism=calibrated_total,
)

@property
Expand All @@ -300,6 +338,8 @@ def dp_event(self) -> dp_accounting.DpEvent:
if self.initializers is None:
raise ValueError('Must call calibrate() before accessing dp_event.')
events = [init.dp_event for init in self.initializers.values()]
if self.total_count_mechanism is not None:
events.append(self.total_count_mechanism.dp_event)
events.append(self.discrete_mechanism.dp_event)
return dp_accounting.ComposedDpEvent(events)

Expand Down Expand Up @@ -329,9 +369,37 @@ def __call__(
)

# Phase 1: Per-column initialization.
# Non-numerical columns first; CategoricalInitializer measurements give
# a total count estimate for the heuristic numerical measurement.
col_results: dict[str, initialization.ColumnMeasurement] = {}
categorical_measurements = []
for col, init in self.initializers.items():
if not isinstance(init, initialization.NumericalInitializer):
col_results[col] = init(rng, data[col].values)
if (
isinstance(init, initialization.CategoricalInitializer)
and col_results[col].measurement is not None
):
categorical_measurements.append(col_results[col].measurement)

# Estimate total from categorical measurements or DPGaussianCount.
estimated_total = None
if categorical_measurements:
estimated_total = mbi.estimation.minimum_variance_unbiased_total(
categorical_measurements
)
elif self.total_count_mechanism is not None:
# Pick an arbitrary column to count records.
any_col = next(iter(self.domains))
estimated_total = max(
1.0, self.total_count_mechanism(rng, data[any_col].values)
)

for col, init in self.initializers.items():
col_results[col] = init(rng, data[col].values)
if isinstance(init, initialization.NumericalInitializer):
col_results[col] = init(
rng, data[col].values, estimated_total=estimated_total
)

# Phase 2: Encode data to discrete domain.
discrete_domains = {}
Expand Down
5 changes: 4 additions & 1 deletion dpsynth/local_mode/initialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ def __call__(
measurement = None
if estimated_total is not None:
rho = self._zcdp_rho
uniform_counts = bin_weights * (estimated_total / self.num_partitions)
if not self.attribute.clip_to_range:
# Prepend zero weight for the OUT_OF_DOMAIN slot at index 0.
bin_weights = np.r_[0, bin_weights]
uniform_counts = bin_weights * (estimated_total / bin_weights.sum())
stddev = 1.0 / np.sqrt(rho)
measurement = mbi.LinearMeasurement(
uniform_counts, (self.name,), stddev=stddev
Expand Down
141 changes: 53 additions & 88 deletions dpsynth/local_mode/primitives.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,62 +306,6 @@ def _get_threshold(delta, sigma, max_part):
return thresholds.max()


def select_partitions_gaussian_thresholding(
rng: np.random.Generator,
data: np.ndarray,
gdp_budget: float,
delta: float,
) -> tuple[np.ndarray, np.ndarray, float]:
"""Selects partitions using Gaussian Thresholding (Weighted Gaussian).

This implements Algorithm 2 from the DP-SIPS paper (Swanberg et al., 2023)
under item-level DP. It is the simplest partition selection mechanism:

1. Compute the histogram of partition counts.
2. Add Gaussian noise calibrated to the privacy budget.
3. Return partitions whose noisy count exceeds a threshold chosen to
bound the false-positive probability per empty partition at delta.

Under item-level DP each record is treated as a distinct user contributing
to exactly one partition, so the histogram has L2 sensitivity 1. The
threshold is T = 1 + sigma * Phi^{-1}(1 - delta), following the paper's
formula with max_part = 1.

Args:
rng: A numpy random number generator.
data: 1D array of integers, where each element is a partition ID.
gdp_budget: Privacy budget in terms of squared Gaussian DP mu parameter
(gdp_budget = mu^2 = 1 / sigma^2).
delta: Failure probability (false positive bound per empty partition).

Returns:
A tuple containing:
- selected_partitions: 1D array of partition IDs that passed the
threshold.
- estimated_counts: 1D array of noisy counts for each selected
partition.
- sigma: The standard deviation of the Gaussian noise added.
"""
if gdp_budget <= 0 or delta <= 0:
raise ValueError(f'{gdp_budget=} and {delta=} must be positive.')

sigma = 1.0 / np.sqrt(gdp_budget)

if data.size == 0:
return np.empty(0, dtype=data.dtype), np.empty(0, dtype=float), sigma

unique_parts, counts = np.unique(data, return_counts=True)
noisy_counts = counts + rng.normal(scale=sigma, size=counts.size)

# Threshold: ensures that an empty partition (true count 0) passes with
# probability at most delta. For max_part=1 this simplifies to:
# T = 1/sqrt(1) + sigma * Phi^{-1}(1 - delta) = 1 + sigma * ppf(1-delta)
threshold = 1.0 + sigma * scipy.stats.norm.ppf(1.0 - delta)
passed = noisy_counts >= threshold

return unique_parts[passed], noisy_counts[passed], sigma


def _select_partitions_sips(
rng: np.random.Generator,
data: np.ndarray,
Expand Down Expand Up @@ -468,32 +412,6 @@ def _select_partitions_sips(
return selected_partitions, selected_counts, max_sigma


def _gaussian_histogram(
rng: np.random.Generator,
data: np.ndarray,
domain_size: int,
sigma: float,
) -> np.ndarray:
"""Computes a noisy histogram over a closed domain using the Gaussian mechanism.

The histogram query has L2 sensitivity 1 under item-level DP (each record
contributes +1 to exactly one bin). Gaussian noise with the given standard
deviation is added independently to each bin count.

Args:
rng: A numpy random number generator.
data: 1D array of integer-encoded categorical values in [0, domain_size).
domain_size: Number of categories in the closed domain.
sigma: Standard deviation of the Gaussian noise added to each bin.

Returns:
A length-`domain_size` array of noisy counts.
"""
return np.bincount(data, minlength=domain_size) + rng.normal(
scale=sigma, size=domain_size
)


# ---------------------------------------------------------------------------
# DPMechanism subclasses
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -616,10 +534,50 @@ def __call__(
if self.sigma is None:
raise ValueError(_UNCALIBRATED_MSG.format(param='sigma'))
return HistogramResult(
counts=_gaussian_histogram(rng, data, self.domain_size, self.sigma)
counts=np.bincount(data, minlength=self.domain_size)
+ rng.normal(scale=self.sigma, size=self.domain_size)
)


@dataclasses.dataclass
class DPGaussianCount(DPMechanism):
"""Differentially private count via the Gaussian mechanism.

Returns a noisy count of the number of records in the input data. The
conversion from zCDP is ``sigma = sqrt(0.5 / zcdp_rho)``.

Attributes:
sigma: Gaussian noise standard deviation. Set directly or via ``calibrate``.
"""

sigma: float | None = None

def calibrate(self, *, zcdp_rho: float) -> DPGaussianCount:
"""Returns a copy with sigma derived from the zCDP budget."""
return dataclasses.replace(self, sigma=math.sqrt(0.5 / zcdp_rho))

@property
def dp_event(self) -> dp_accounting.DpEvent:
"""Returns the Gaussian privacy event for this mechanism."""
if self.sigma is None:
raise ValueError(_UNCALIBRATED_MSG.format(param='sigma'))
return dp_accounting.GaussianDpEvent(noise_multiplier=self.sigma)

def __call__(self, rng: np.random.Generator, data: np.ndarray) -> float:
"""Returns a differentially private count of records.

Args:
rng: A numpy random number generator.
data: 1D array of data records.

Returns:
The noisy count (true count + Gaussian noise).
"""
if self.sigma is None:
raise ValueError(_UNCALIBRATED_MSG.format(param='sigma'))
return float(len(data) + rng.normal(scale=self.sigma))


@dataclasses.dataclass
class DPPartitionSelection(DPMechanism):
"""Differentially private partition selection via Gaussian Thresholding.
Expand Down Expand Up @@ -663,10 +621,17 @@ def __call__(
"""
if self.sigma is None:
raise ValueError(_UNCALIBRATED_MSG.format(param='sigma'))
gdp_budget = np.inf if self.sigma == 0.0 else 1.0 / (self.sigma**2)
parts, counts, _ = select_partitions_gaussian_thresholding(
rng, data, gdp_budget, self.delta
)
if data.size == 0:
return PartitionSelectionResult(
selected_partitions=np.empty(0, dtype=data.dtype),
estimated_counts=np.empty(0, dtype=float),
)
unique_parts, counts = np.unique(data, return_counts=True)
noisy_counts = counts + rng.normal(scale=self.sigma, size=counts.size)
# Threshold ensures an empty partition passes with probability <= delta.
threshold = 1.0 + self.sigma * scipy.stats.norm.ppf(1.0 - self.delta)
passed = noisy_counts >= threshold
return PartitionSelectionResult(
selected_partitions=parts, estimated_counts=counts
selected_partitions=unique_parts[passed],
estimated_counts=noisy_counts[passed],
)
32 changes: 32 additions & 0 deletions tests/data_generation_v3_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,38 @@ def test_calibrate_small_epsilon(self):
self.assertIsInstance(synthetic_df, pd.DataFrame)
self.assertListEqual(synthetic_df.columns.tolist(), ['A', 'B'])

def test_numerical_only_uses_dp_count(self):
"""Numerical-only domains should allocate a DPGaussianCount for total."""
domains = {
'A': domain.NumericalAttribute(min_value=0, max_value=10),
'B': domain.NumericalAttribute(min_value=-10, max_value=10),
}
df = pd.DataFrame({'A': [5, 5, 0], 'B': [5, -10, -5]}, dtype=float)
rng = np.random.default_rng(0)
calibrated = TabularSynthesizer(domains=domains).calibrate(zcdp_rho=100.0)

# total_count_mechanism should be set for numerical-only domains.
self.assertIsNotNone(calibrated.total_count_mechanism)
synthetic_df = calibrated(rng, df).synthetic_data
self.assertListEqual(synthetic_df.columns.tolist(), ['A', 'B'])

def test_mixed_domain_no_dp_count(self):
"""Mixed domains estimate total from categorical measurements."""
domains = {
'A': domain.CategoricalAttribute(
possible_values=['a', 'b', 'c'], out_of_domain_index=0
),
'B': domain.NumericalAttribute(min_value=0, max_value=10),
}
df = pd.DataFrame({'A': ['a', 'b', 'c'], 'B': [1.0, 5.0, 10.0]})
rng = np.random.default_rng(0)
calibrated = TabularSynthesizer(domains=domains).calibrate(zcdp_rho=100.0)

# No DPGaussianCount needed — total comes from categorical measurement.
self.assertIsNone(calibrated.total_count_mechanism)
synthetic_df = calibrated(rng, df).synthetic_data
self.assertListEqual(synthetic_df.columns.tolist(), ['A', 'B'])


if __name__ == '__main__':
absltest.main()
Loading
Loading