Skip to content

Commit

Permalink
ratelimits: Replace *Decision merging with always returning most rest…
Browse files Browse the repository at this point in the history
…rictive (#7667)

Fix a bug added in #7653 which sometimes attributed an "Allowed"
`Transaction` to the amalgamated "Denied" `*Decision`. Instead, always
return the most restrictive `*Decision` in the batch.

Remove a debug `fmt.Printf()` call added in #7653
  • Loading branch information
beautifulentropy authored Aug 15, 2024
1 parent 46859a2 commit 31d0ff0
Showing 1 changed file with 21 additions and 41 deletions.
62 changes: 21 additions & 41 deletions ratelimits/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ func (d *Decision) Result(now time.Time) error {
return nil
}

fmt.Printf("\n\n%#v\n\n", d.transaction)

// Add 0-3% jitter to the RetryIn duration to prevent thundering herd.
jitter := time.Duration(float64(d.retryIn) * 0.03 * rand.Float64())
retryAfter := d.retryIn + jitter
Expand Down Expand Up @@ -237,39 +235,24 @@ func prepareBatch(txns []Transaction) ([]Transaction, []string, error) {
return transactions, bucketKeys, nil
}

type batchDecision struct {
*Decision
}

func newBatchDecision() *batchDecision {
return &batchDecision{
Decision: &Decision{
allowed: true,
remaining: math.MaxInt64,
},
func stricter(existing *Decision, incoming *Decision) *Decision {
if existing.retryIn == incoming.retryIn {
if existing.remaining < incoming.remaining {
return existing
}
return incoming
}
}

func (d *batchDecision) merge(in *Decision) {
d.allowed = d.allowed && in.allowed
d.remaining = min(d.remaining, in.remaining)
d.resetIn = max(d.resetIn, in.resetIn)
if in.newTAT.After(d.newTAT) {
d.newTAT = in.newTAT
d.retryIn = in.retryIn
d.transaction = in.transaction
if existing.retryIn > incoming.retryIn {
return existing
}
return incoming
}

// BatchSpend attempts to deduct the costs from the provided buckets'
// capacities. If applicable, new bucket states are persisted to the underlying
// datastore before returning. Non-existent buckets will be initialized WITH the
// cost factored into the initial state. The following rules are applied to
// merge the Decisions for each Transaction into a single batch Decision:
// - Allowed is true if all Transactions where check is true were allowed,
// - RetryIn and ResetIn are the largest values of each across all Decisions,
// - Remaining is the smallest value of each across all Decisions, and
// - Decisions resulting from spend-only Transactions are never merged.
// cost factored into the initial state. The returned *Decision represents the
// strictest of all *Decisions reached in the batch.
func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision, error) {
start := l.clk.Now()

Expand All @@ -289,8 +272,7 @@ func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision
if err != nil {
return nil, err
}

batchDecision := newBatchDecision()
batchDecision := allowedDecision
newTATs := make(map[string]time.Time)
txnOutcomes := make(map[Transaction]string)

Expand All @@ -314,7 +296,9 @@ func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision
}

if !txn.spendOnly() {
batchDecision.merge(d)
// Spend-only Transactions are best-effort and do not contribute to
// the batchDecision.
batchDecision = stricter(batchDecision, d)
}

txnOutcomes[txn] = Denied
Expand All @@ -336,7 +320,7 @@ func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision
for txn, outcome := range txnOutcomes {
l.spendLatency.WithLabelValues(txn.limit.name.String(), outcome).Observe(perTxnLatency.Seconds())
}
return batchDecision.Decision, nil
return batchDecision, nil
}

// Refund attempts to refund all of the cost to the capacity of the specified
Expand All @@ -359,12 +343,8 @@ func (l *Limiter) Refund(ctx context.Context, txn Transaction) (*Decision, error
// buckets' capacities. Non-existent buckets will NOT be initialized. The new
// bucket state is persisted to the underlying datastore, if applicable, before
// returning. Spend-only Transactions are assumed to be refundable. Check-only
// Transactions are never refunded. The following rules are applied to merge the
// Decisions for each Transaction into a single batch Decision:
// - Allowed is true if all Transactions where check is true were allowed,
// - RetryIn and ResetIn are the largest values of each across all Decisions,
// - Remaining is the smallest value of each across all Decisions, and
// - Decisions resulting from spend-only Transactions are never merged.
// Transactions are never refunded. The returned *Decision represents the
// strictest of all *Decisions reached in the batch.
func (l *Limiter) BatchRefund(ctx context.Context, txns []Transaction) (*Decision, error) {
batch, bucketKeys, err := prepareBatch(txns)
if err != nil {
Expand All @@ -383,7 +363,7 @@ func (l *Limiter) BatchRefund(ctx context.Context, txns []Transaction) (*Decisio
return nil, err
}

batchDecision := newBatchDecision()
batchDecision := allowedDecision
newTATs := make(map[string]time.Time)

for _, txn := range batch {
Expand All @@ -398,7 +378,7 @@ func (l *Limiter) BatchRefund(ctx context.Context, txns []Transaction) (*Decisio
txn.cost = 0
}
d := maybeRefund(l.clk, txn, tat)
batchDecision.merge(d)
batchDecision = stricter(batchDecision, d)
if d.allowed && tat != d.newTAT {
// New bucket state should be persisted.
newTATs[txn.bucketKey] = d.newTAT
Expand All @@ -411,7 +391,7 @@ func (l *Limiter) BatchRefund(ctx context.Context, txns []Transaction) (*Decisio
return nil, err
}
}
return batchDecision.Decision, nil
return batchDecision, nil
}

// Reset resets the specified bucket to its maximum capacity. The new bucket
Expand Down

0 comments on commit 31d0ff0

Please sign in to comment.