Skip to content

Commit

Permalink
ratelimits: Fix latency calculations (#7627)
Browse files Browse the repository at this point in the history
  • Loading branch information
beautifulentropy authored Jul 25, 2024
1 parent ff851f7 commit a6e0fdc
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 33 deletions.
39 changes: 27 additions & 12 deletions ratelimits/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,26 @@ type Limiter struct {
// NewLimiter returns a new *Limiter. The provided source must be safe for
// concurrent use.
func NewLimiter(clk clock.Clock, source source, stats prometheus.Registerer) (*Limiter, error) {
limiter := &Limiter{source: source, clk: clk}
limiter.spendLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
spendLatency := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "ratelimits_spend_latency",
Help: fmt.Sprintf("Latency of ratelimit checks labeled by limit=[name] and decision=[%s|%s], in seconds", Allowed, Denied),
// Exponential buckets ranging from 0.0005s to 3s.
Buckets: prometheus.ExponentialBuckets(0.0005, 3, 8),
}, []string{"limit", "decision"})
stats.MustRegister(limiter.spendLatency)
stats.MustRegister(spendLatency)

limiter.overrideUsageGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
overrideUsageGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "ratelimits_override_usage",
Help: "Proportion of override limit used, by limit name and bucket key.",
}, []string{"limit", "bucket_key"})
stats.MustRegister(limiter.overrideUsageGauge)

return limiter, nil
stats.MustRegister(overrideUsageGauge)

return &Limiter{
source: source,
clk: clk,
spendLatency: spendLatency,
overrideUsageGauge: overrideUsageGauge,
}, nil
}

type Decision struct {
Expand Down Expand Up @@ -166,6 +170,8 @@ func (d *batchDecision) merge(in *Decision) {
// - Remaining is the smallest value of each across all Decisions, and
// - Decisions resulting from spend-only Transactions are never merged.
func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision, error) {
start := l.clk.Now()

batch, bucketKeys, err := prepareBatch(txns)
if err != nil {
return nil, err
Expand All @@ -183,9 +189,9 @@ func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision
return nil, err
}

start := l.clk.Now()
batchDecision := newBatchDecision()
newTATs := make(map[string]time.Time)
txnOutcomes := make(map[Transaction]string)

for _, txn := range batch {
tat, exists := tats[txn.bucketKey]
Expand All @@ -209,16 +215,25 @@ func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision
if !txn.spendOnly() {
batchDecision.merge(d)
}

txnOutcomes[txn] = Denied
if d.Allowed {
txnOutcomes[txn] = Allowed
}
}

if batchDecision.Allowed {
if batchDecision.Allowed && len(newTATs) > 0 {
err = l.source.BatchSet(ctx, newTATs)
if err != nil {
return nil, err
}
l.spendLatency.WithLabelValues("batch", Allowed).Observe(l.clk.Since(start).Seconds())
} else {
l.spendLatency.WithLabelValues("batch", Denied).Observe(l.clk.Since(start).Seconds())
}

// Observe latency equally across all transactions in the batch.
totalLatency := l.clk.Since(start)
perTxnLatency := totalLatency / time.Duration(len(txnOutcomes))
for txn, outcome := range txnOutcomes {
l.spendLatency.WithLabelValues(txn.limit.name.String(), outcome).Observe(perTxnLatency.Seconds())
}
return batchDecision.Decision, nil
}
Expand Down
79 changes: 58 additions & 21 deletions ratelimits/source_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,15 @@ func NewRedisSource(client *redis.Ring, clk clock.Clock, stats prometheus.Regist
}
}

var errMixedSuccess = errors.New("some keys not found")

// resultForError returns a string representing the result of the operation
// based on the provided error.
func resultForError(err error) string {
if errors.Is(redis.Nil, err) {
if errors.Is(errMixedSuccess, err) {
// Indicates that some of the keys in a batchset operation were not found.
return "mixedSuccess"
} else if errors.Is(redis.Nil, err) {
// Bucket key does not exist.
return "notFound"
} else if errors.Is(err, context.DeadlineExceeded) {
Expand All @@ -68,6 +73,14 @@ func resultForError(err error) string {
return "failed"
}

func (r *RedisSource) observeLatency(call string, latency time.Duration, err error) {
result := "success"
if err != nil {
result = resultForError(err)
}
r.latency.With(prometheus.Labels{"call": call, "result": result}).Observe(latency.Seconds())
}

// BatchSet stores TATs at the specified bucketKeys using a pipelined Redis
// Transaction in order to reduce the number of round-trips to each Redis shard.
// An error is returned if the operation failed and nil otherwise.
Expand All @@ -80,11 +93,17 @@ func (r *RedisSource) BatchSet(ctx context.Context, buckets map[string]time.Time
}
_, err := pipeline.Exec(ctx)
if err != nil {
r.latency.With(prometheus.Labels{"call": "batchset", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
r.observeLatency("batchset", r.clk.Since(start), err)
return err
}

r.latency.With(prometheus.Labels{"call": "batchset", "result": "success"}).Observe(time.Since(start).Seconds())
totalLatency := r.clk.Since(start)
perSetLatency := totalLatency / time.Duration(len(buckets))
for range buckets {
r.observeLatency("batchset_entry", perSetLatency, nil)
}

r.observeLatency("batchset", totalLatency, nil)
return nil
}

Expand All @@ -98,14 +117,15 @@ func (r *RedisSource) Get(ctx context.Context, bucketKey string) (time.Time, err
if err != nil {
if errors.Is(err, redis.Nil) {
// Bucket key does not exist.
r.latency.With(prometheus.Labels{"call": "get", "result": "notFound"}).Observe(time.Since(start).Seconds())
r.observeLatency("get", r.clk.Since(start), err)
return time.Time{}, ErrBucketNotFound
}
r.latency.With(prometheus.Labels{"call": "get", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
// An error occurred while retrieving the TAT.
r.observeLatency("get", r.clk.Since(start), err)
return time.Time{}, err
}

r.latency.With(prometheus.Labels{"call": "get", "result": "success"}).Observe(time.Since(start).Seconds())
r.observeLatency("get", r.clk.Since(start), nil)
return time.Unix(0, tatNano).UTC(), nil
}

Expand All @@ -121,28 +141,44 @@ func (r *RedisSource) BatchGet(ctx context.Context, bucketKeys []string) (map[st
pipeline.Get(ctx, bucketKey)
}
results, err := pipeline.Exec(ctx)
if err != nil {
r.latency.With(prometheus.Labels{"call": "batchget", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
if !errors.Is(err, redis.Nil) {
return nil, err
}
if err != nil && !errors.Is(err, redis.Nil) {
r.observeLatency("batchget", r.clk.Since(start), err)
return nil, err
}

totalLatency := r.clk.Since(start)
perEntryLatency := totalLatency / time.Duration(len(bucketKeys))

tats := make(map[string]time.Time, len(bucketKeys))
notFoundCount := 0
for i, result := range results {
tatNano, err := result.(*redis.StringCmd).Int64()
if err != nil {
if errors.Is(err, redis.Nil) {
// Bucket key does not exist.
continue
if !errors.Is(err, redis.Nil) {
// This should never happen as any errors should have been
// caught after the pipeline.Exec() call.
r.observeLatency("batchget", r.clk.Since(start), err)
return nil, err
}
r.latency.With(prometheus.Labels{"call": "batchget", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
return nil, err
// Bucket key does not exist.
r.observeLatency("batchget_entry", perEntryLatency, err)
notFoundCount++
continue
}
tats[bucketKeys[i]] = time.Unix(0, tatNano).UTC()
r.observeLatency("batchget_entry", perEntryLatency, nil)
}

r.latency.With(prometheus.Labels{"call": "batchget", "result": "success"}).Observe(time.Since(start).Seconds())
var batchErr error
if notFoundCount < len(results) {
// Some keys were not found.
batchErr = errMixedSuccess
} else if notFoundCount == len(results) {
// All keys were not found.
batchErr = redis.Nil
}

r.observeLatency("batchget", totalLatency, batchErr)
return tats, nil
}

Expand All @@ -154,11 +190,11 @@ func (r *RedisSource) Delete(ctx context.Context, bucketKey string) error {

err := r.client.Del(ctx, bucketKey).Err()
if err != nil {
r.latency.With(prometheus.Labels{"call": "delete", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
r.observeLatency("delete", r.clk.Since(start), err)
return err
}

r.latency.With(prometheus.Labels{"call": "delete", "result": "success"}).Observe(time.Since(start).Seconds())
r.observeLatency("delete", r.clk.Since(start), nil)
return nil
}

Expand All @@ -171,9 +207,10 @@ func (r *RedisSource) Ping(ctx context.Context) error {
return shard.Ping(ctx).Err()
})
if err != nil {
r.latency.With(prometheus.Labels{"call": "ping", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
r.observeLatency("ping", r.clk.Since(start), err)
return err
}
r.latency.With(prometheus.Labels{"call": "ping", "result": "success"}).Observe(time.Since(start).Seconds())

r.observeLatency("ping", r.clk.Since(start), nil)
return nil
}

0 comments on commit a6e0fdc

Please sign in to comment.