Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions go/dpagg/select_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,15 @@ type PreAggSelectPartitionOptions struct {
// Optional.
PreThreshold int64
// MaxPartitionsContributed is the number of distinct partitions a single
// privacy unit can contribute to. Required.
// privacy unit can contribute to.
//
// Mutually exclusive with MaxContributions. One of the two options is required.
MaxPartitionsContributed int64
// MaxContributions is the number of distinct contributions a single
// privacy unit can make.
//
// Mutually exclusive with MaxPartitionsContributed. One of the two options is required.
MaxContributions int64
}

// NewPreAggSelectPartition constructs a new PreAggSelectPartition from opt.
Expand All @@ -148,11 +155,17 @@ func NewPreAggSelectPartition(opt *PreAggSelectPartitionOptions) (*PreAggSelectP
opt.PreThreshold = 1
}

var l0Sensitivity int64
if opt.MaxContributions > 0 {
l0Sensitivity = opt.MaxContributions
} else {
l0Sensitivity = opt.MaxPartitionsContributed
}
s := PreAggSelectPartition{
epsilon: opt.Epsilon,
delta: opt.Delta,
preThreshold: opt.PreThreshold,
l0Sensitivity: opt.MaxPartitionsContributed,
l0Sensitivity: l0Sensitivity,
}

if err := checks.CheckDeltaStrict(s.delta); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go/dpagg/select_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func TestNewPreAggSelectPartition(t *testing.T) {
want *PreAggSelectPartition
wantErr bool
}{
{"MaxPartitionsContributed is not set",
{"MaxPartitionsContributed and MaxContributions are not set",
&PreAggSelectPartitionOptions{
Epsilon: ln3,
Delta: tenten,
Expand Down
33 changes: 27 additions & 6 deletions go/dpagg/sum.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,28 @@ type BoundedSumInt64Options struct {
// Defaults to 1. This is only needed for other aggregation functions using BoundedSum;
// which is why the option is not exported.
maxContributionsPerPartition int64
// How many times may a single privacy unit contribute in total to all partitions?
//
// Mutually exclusive with MaxPartitionsContributed. One of the two options is required.
MaxContributions int64
}

// NewBoundedSumInt64 returns a new BoundedSumInt64, whose sum is initialized at 0.
func NewBoundedSumInt64(opt *BoundedSumInt64Options) (*BoundedSumInt64, error) {
if opt == nil {
opt = &BoundedSumInt64Options{} // Prevents panicking due to a nil pointer dereference.
}
// TODO: Add validation for MaxContributions as well. Currently only for MaxPartitionsContributed, Lower and Upper are validated.
if opt.MaxContributions == 0 && opt.MaxPartitionsContributed == 0 {
return nil, fmt.Errorf("NewBoundedSumInt64: Either MaxPartitionsContributed or MaxContributions must be set")
}

l0 := opt.MaxPartitionsContributed
if l0 == 0 {
return nil, fmt.Errorf("NewBoundedSumInt64: MaxPartitionsContributed must be set")
var l0 int64
if opt.MaxContributions > 0 {
// When using MaxContributions, l0Sensitivity is used to pass the L1 sensitivity to the noise layer.
l0 = opt.MaxContributions
} else {
l0 = opt.MaxPartitionsContributed
}

maxContributionsPerPartition := opt.maxContributionsPerPartition
Expand All @@ -108,8 +119,8 @@ func NewBoundedSumInt64(opt *BoundedSumInt64Options) (*BoundedSumInt64, error) {
}
// Check bounds & use them to compute L_∞ sensitivity
lower, upper := opt.Lower, opt.Upper
if lower == 0 && upper == 0 {
return nil, fmt.Errorf("NewBoundedSumInt64: Lower and Upper must be set (automatic bounds determination is not implemented yet). Lower and Upper cannot be both 0")
if opt.MaxPartitionsContributed > 0 && lower == 0 && upper == 0 {
return nil, fmt.Errorf("NewBoundedSumInt64: When using MaxPartitionsContributed, Lower and Upper must be set (automatic bounds determination is not implemented yet). Lower and Upper cannot be both 0")
}
var err error
switch noise.ToKind(opt.Noise) {
Expand All @@ -121,7 +132,17 @@ func NewBoundedSumInt64(opt *BoundedSumInt64Options) (*BoundedSumInt64, error) {
if err != nil {
return nil, fmt.Errorf("NewBoundedSumInt64: %w", err)
}
lInf, err := getLInfInt(lower, upper, maxContributionsPerPartition)

var lInf int64
if opt.MaxContributions > 0 {
// When using MaxContributions, lInfSensitivity is set to 1 because l0Sensitivity is used to pass
// L1 sensitivity to the noise layer.
lInf = 1
// The per-partition contribution is clamped so that it does not exceed the total contribution bound.
upper = opt.MaxContributions
} else {
lInf, err = getLInfInt(lower, upper, maxContributionsPerPartition)
}
if err != nil {
if noise.ToKind(opt.Noise) == noise.Unrecognised {
// Ignore sensitivity overflows if noise is not recognised.
Expand Down
62 changes: 60 additions & 2 deletions go/dpagg/sum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func TestNewBoundedSumInt64(t *testing.T) {
want *BoundedSumInt64
wantErr bool
}{
{"MaxPartitionsContributed is not set",
{"MaxPartitionsContributed is not set when using maxContributionsPerPartition",
&BoundedSumInt64Options{
Epsilon: ln3,
Delta: tenten,
Expand All @@ -298,7 +298,7 @@ func TestNewBoundedSumInt64(t *testing.T) {
},
nil,
true},
{"maxContributionsPerPartition is not set",
{"maxContributionsPerPartition is not set when using MaxPartitionsContributed",
&BoundedSumInt64Options{
Epsilon: ln3,
Delta: 0,
Expand All @@ -320,6 +320,16 @@ func TestNewBoundedSumInt64(t *testing.T) {
state: defaultState,
},
false},
{"MaxContributions is not set when not using maxContributionsPerPartition and MaxPartitionsContributed",
&BoundedSumInt64Options{
Epsilon: ln3,
Delta: 0,
Lower: -1,
Upper: 5,
Noise: noNoise{},
},
nil,
true},
{"Noise is not set",
&BoundedSumInt64Options{
Epsilon: ln3,
Expand Down Expand Up @@ -707,6 +717,24 @@ func TestCheckMergeBoundedSumInt64Compatibility(t *testing.T) {
maxContributionsPerPartition: 2,
},
false},
{"same options, all fields filled while using MaxContributions",
&BoundedSumInt64Options{
Epsilon: ln3,
Delta: tenten,
Lower: -1,
Upper: 5,
Noise: noise.Gaussian(),
MaxContributions: 2,
},
&BoundedSumInt64Options{
Epsilon: ln3,
Delta: tenten,
Lower: -1,
Upper: 5,
Noise: noise.Gaussian(),
MaxContributions: 2,
},
false},
{"same options, only required fields filled",
&BoundedSumInt64Options{
Epsilon: ln3,
Expand All @@ -721,6 +749,20 @@ func TestCheckMergeBoundedSumInt64Compatibility(t *testing.T) {
MaxPartitionsContributed: 1,
},
false},
{"same options, only required fields filled while using MaxContributions",
&BoundedSumInt64Options{
Epsilon: ln3,
Lower: -1,
Upper: 5,
MaxContributions: 2,
},
&BoundedSumInt64Options{
Epsilon: ln3,
Lower: -1,
Upper: 5,
MaxContributions: 2,
},
false},
{"different epsilon",
&BoundedSumInt64Options{
Epsilon: ln3,
Expand Down Expand Up @@ -783,6 +825,22 @@ func TestCheckMergeBoundedSumInt64Compatibility(t *testing.T) {
MaxPartitionsContributed: 1,
},
true},
{"different MaxContributions",
&BoundedSumInt64Options{
Epsilon: ln3,
Lower: -1,
Upper: 5,
MaxPartitionsContributed: 1,
MaxContributions: 2,
},
&BoundedSumInt64Options{
Epsilon: ln3,
Lower: -1,
Upper: 5,
MaxPartitionsContributed: 1,
MaxContributions: 5,
},
true},
{"different lower bound",
&BoundedSumInt64Options{
Epsilon: ln3,
Expand Down
53 changes: 52 additions & 1 deletion privacy-on-beam/pbeam/aggregations.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func randBool(_, _ beam.V) bool {
// boundContributions takes a PCollection<K,V> as input, and for each key, selects and returns
// at most contributionLimit records with this key. The selection is "mostly random":
// the records returned are selected randomly, but the randomness isn't secure.
// This is fine to use in the cross-partition bounding stage or in the per-partition bounding stage,
// This is fine to use in the cross-partition bounding stage, the per-partition bounding stage,
// or per-privacy identifier contribution bounding stage,
// since the privacy guarantee doesn't depend on the privacy unit contributions being selected randomly.
//
// In order to do the cross-partition contribution bounding we need:
Expand All @@ -111,6 +112,13 @@ func randBool(_, _ beam.V) bool {
// 1. the key to be the pair = {privacy ID, partition ID}.
// 2. the value to be just the value which is associated with that {privacy ID, partition ID} pair
// (there could be multiple entries with the same key).
//
// In order to do per-privacy-ID contribution bounding (L1 norm) we need:
// 1. each record to represent a contribution of 1, such as Count. It cannot be used for aggregations
// such as Sum since the function can only bound the number of contributions, not the value of the
// contributions.
// 2. the key should be the privacy ID.
// 3. the value should be the partition ID.
func boundContributions(s beam.Scope, kvCol beam.PCollection, contributionLimit int64) beam.PCollection {
s = s.Scope("boundContributions")
// Transform the PCollection<K,V> into a PCollection<K,[]V>, where
Expand Down Expand Up @@ -299,6 +307,7 @@ type boundedSumInt64Fn struct {
MaxPartitionsContributed int64
Lower int64
Upper int64
MaxContributions int64
NoiseKind noise.Kind
noise noise.Noise // Set during Setup phase according to NoiseKind.
PublicPartitions bool
Expand All @@ -319,6 +328,7 @@ func newBoundedSumInt64Fn(spec PrivacySpec, params SumParams, noiseKind noise.Ki
MaxPartitionsContributed: params.MaxPartitionsContributed,
Lower: int64(params.MinValue),
Upper: int64(params.MaxValue),
MaxContributions: params.maxContributions,
NoiseKind: noiseKind,
PublicPartitions: publicPartitions,
TestMode: spec.testMode,
Expand All @@ -345,6 +355,7 @@ func (fn *boundedSumInt64Fn) CreateAccumulator() (boundedSumAccumInt64, error) {
MaxPartitionsContributed: fn.MaxPartitionsContributed,
Lower: fn.Lower,
Upper: fn.Upper,
MaxContributions: fn.MaxContributions,
Noise: fn.noise,
})
if err != nil {
Expand All @@ -357,6 +368,7 @@ func (fn *boundedSumInt64Fn) CreateAccumulator() (boundedSumAccumInt64, error) {
Delta: fn.PartitionSelectionDelta,
PreThreshold: fn.PreThreshold,
MaxPartitionsContributed: fn.MaxPartitionsContributed,
MaxContributions: fn.MaxContributions,
})
}
return accum, err
Expand Down Expand Up @@ -900,6 +912,45 @@ func checkMaxPartitionsContributedPartitionSelection(maxPartitionsContributed in
return nil
}

// checkContributionBounding checks that either MaxContributions or MaxValue/MaxPartitionsContributed is set, but not both.
// If MaxContributions is set, MaxValue and MaxPartitionsContributed must be 0.
// If MaxValue/MaxPartitionsContributed is set, MaxContributions must be 0.
func checkContributionBounding(maxContributions int64, maxValue int64, maxPartitionsContributed int64) error {
if maxContributions < 0 {
return fmt.Errorf("MaxContributions must be non-negative, was %d instead", maxContributions)
}
if maxValue < 0 {
return fmt.Errorf("MaxValue must be non-negative, was %d instead", maxValue)
}
if maxPartitionsContributed < 0 {
return fmt.Errorf("MaxPartitionsContributed must be non-negative, was %d instead", maxPartitionsContributed)
}

maxContributionsSet := maxContributions > 0
maxValueSet := maxValue > 0
maxPartitionsContributedSet := maxPartitionsContributed > 0

if maxContributionsSet {
// MaxContributions configuration must be used
if maxValue != 0 || maxPartitionsContributed != 0 {
return fmt.Errorf("when MaxContributions is set, MaxValue and MaxPartitionsContributed must be 0")
}
return nil
} else {
// MaxValue/MaxPartitionsContributed configuration must be used
if !maxValueSet && !maxPartitionsContributedSet {
return fmt.Errorf("when MaxContributions is not set, both MaxValue and MaxPartitionsContributed must be set to a positive value")
}
if !maxValueSet {
return fmt.Errorf("MaxValue must be set to a positive value, was %d instead", maxValue)
}
if !maxPartitionsContributedSet {
return fmt.Errorf("MaxPartitionsContributed must be set to a positive value, was %d instead", maxPartitionsContributed)
}
return nil
}
}

// checkNumericType returns an error if t is not a numeric type.
func checkNumericType(t typex.FullType) error {
switch t.Type().Kind() {
Expand Down
39 changes: 28 additions & 11 deletions privacy-on-beam/pbeam/count.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ type CountParams struct {
// aggregations is scaled according to maxPartitionsContributed, it also
// means that more noise is added to each count.
//
// Required.
// MaxPartitionsContributed and MaxValue are mutually exclusive with MaxContributions.
// One of the two options is required.
MaxPartitionsContributed int64
// The maximum number of times that a privacy identifier can contribute to
// a single count (or, equivalently, the maximum value that a privacy
Expand All @@ -92,7 +93,8 @@ type CountParams struct {
// There is an inherent trade-off when choosing MaxValue: a larger
// parameter means that fewer records are lost, but a larger noise is added.
//
// Required.
// MaxValue and MaxPartitionsContributed are mutually exclusive with MaxContributions.
// One of the two options is required.
MaxValue int64
// Allow negative counts in the output. Most users would expect a count
// aggregation to return non-negative values. However, to get better
Expand All @@ -104,6 +106,17 @@ type CountParams struct {
//
// Optional.
AllowNegativeOutputs bool
// The maximum number of times that a privacy identifier can contribute to all
// counts in total. If a privacy identifier is associated with more values, random
// values will be dropped. There is an inherent trade-off when
// choosing this parameter: a larger MaxContributions leads to less
// data loss due to contribution bounding, but since the noise added in
// aggregations is scaled according to MaxContributions, it also
// means that more noise is added to each count.
//
// MaxContributions is mutually exclusive with MaxPartitionsContributed and MaxValue.
// One of the two options is required.
MaxContributions int64
}

// Count counts the number of times a value appears in a PrivatePCollection,
Expand Down Expand Up @@ -157,18 +170,24 @@ func Count(s beam.Scope, pcol PrivatePCollection, params CountParams) beam.PColl
log.Fatalf("Couldn't drop non-public partitions for Count: %v", err)
}

// First, encode KV pairs, count how many times each one appears,
// First, if MaxContributions is set and not in test mode without contribution bounding,
// do per-privacy identifier contribution bounding.
if params.MaxContributions > 0 && spec.testMode != TestModeWithoutContributionBounding {
pcol.col = boundContributions(s, pcol.col, params.MaxContributions)
}
// Second, encode KV pairs, count how many times each one appears,
// and re-key by the original privacy key.
coded := beam.ParDo(s, kv.NewEncodeFn(idT, partitionT), pcol.col)
kvCounts := stats.Count(s, coded)
counts64 := beam.ParDo(s, convertToInt64Fn, kvCounts)
rekeyed := beam.ParDo(s, rekeyInt64, counts64)
// Second, do cross-partition contribution bounding if not in test mode without contribution bounding.
if spec.testMode != TestModeWithoutContributionBounding {
// Third, do cross-partition contribution bounding if MaxContributions is not set, and
// not in test mode without contribution bounding.
if params.MaxContributions == 0 && spec.testMode != TestModeWithoutContributionBounding {
rekeyed = boundContributions(s, rekeyed, params.MaxPartitionsContributed)
}
// Third, now that contribution bounding is done, remove the privacy keys,
// decode the value, and sum all the counts bounded by MaxValue.
// Fourth, now that contribution bounding is done, remove the privacy keys,
// decode the value, and sum all the counts bounded by MaxValue or MaxContributions.
countPairs := beam.DropKey(s, rekeyed)
countsKV := beam.ParDo(s,
newDecodePairInt64Fn(partitionT.Type()),
Expand Down Expand Up @@ -224,10 +243,7 @@ func checkCountParams(params CountParams, noiseKind noise.Kind, partitionType re
if err != nil {
return err
}
if params.MaxValue <= 0 {
return fmt.Errorf("MaxValue should be strictly positive, got %d", params.MaxValue)
}
return checkMaxPartitionsContributed(params.MaxPartitionsContributed)
return checkContributionBounding(params.MaxContributions, params.MaxValue, params.MaxPartitionsContributed)
}

func addPublicPartitionsForCount(s beam.Scope, spec PrivacySpec, params CountParams, noiseKind noise.Kind, countsKV beam.PCollection) beam.PCollection {
Expand Down Expand Up @@ -259,5 +275,6 @@ func countToSumParams(params CountParams) SumParams {
MaxValue: float64(params.MaxValue),
NoiseKind: params.NoiseKind,
PublicPartitions: params.PublicPartitions,
maxContributions: params.MaxContributions,
}
}
Loading