From a6e0fdc80e1e35ed58def82179913c86ac3b9240 Mon Sep 17 00:00:00 2001 From: Samantha Frank Date: Wed, 24 Jul 2024 22:07:33 -0400 Subject: [PATCH] ratelimits: Fix latency calculations (#7627) --- ratelimits/limiter.go | 39 +++++++++++++------ ratelimits/source_redis.go | 79 ++++++++++++++++++++++++++++---------- 2 files changed, 85 insertions(+), 33 deletions(-) diff --git a/ratelimits/limiter.go b/ratelimits/limiter.go index 557a8330430..50b7d7ca60e 100644 --- a/ratelimits/limiter.go +++ b/ratelimits/limiter.go @@ -40,22 +40,26 @@ type Limiter struct { // NewLimiter returns a new *Limiter. The provided source must be safe for // concurrent use. func NewLimiter(clk clock.Clock, source source, stats prometheus.Registerer) (*Limiter, error) { - limiter := &Limiter{source: source, clk: clk} - limiter.spendLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + spendLatency := prometheus.NewHistogramVec(prometheus.HistogramOpts{ Name: "ratelimits_spend_latency", Help: fmt.Sprintf("Latency of ratelimit checks labeled by limit=[name] and decision=[%s|%s], in seconds", Allowed, Denied), // Exponential buckets ranging from 0.0005s to 3s. Buckets: prometheus.ExponentialBuckets(0.0005, 3, 8), }, []string{"limit", "decision"}) - stats.MustRegister(limiter.spendLatency) + stats.MustRegister(spendLatency) - limiter.overrideUsageGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + overrideUsageGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{ Name: "ratelimits_override_usage", Help: "Proportion of override limit used, by limit name and bucket key.", }, []string{"limit", "bucket_key"}) - stats.MustRegister(limiter.overrideUsageGauge) - - return limiter, nil + stats.MustRegister(overrideUsageGauge) + + return &Limiter{ + source: source, + clk: clk, + spendLatency: spendLatency, + overrideUsageGauge: overrideUsageGauge, + }, nil } type Decision struct { @@ -166,6 +170,8 @@ func (d *batchDecision) merge(in *Decision) { // - Remaining is the smallest value of each across all Decisions, and // - Decisions resulting from spend-only Transactions are never merged. func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision, error) { + start := l.clk.Now() + batch, bucketKeys, err := prepareBatch(txns) if err != nil { return nil, err @@ -183,9 +189,9 @@ func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision return nil, err } - start := l.clk.Now() batchDecision := newBatchDecision() newTATs := make(map[string]time.Time) + txnOutcomes := make(map[Transaction]string) for _, txn := range batch { tat, exists := tats[txn.bucketKey] @@ -209,16 +215,25 @@ func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision if !txn.spendOnly() { batchDecision.merge(d) } + + txnOutcomes[txn] = Denied + if d.Allowed { + txnOutcomes[txn] = Allowed + } } - if batchDecision.Allowed { + if batchDecision.Allowed && len(newTATs) > 0 { err = l.source.BatchSet(ctx, newTATs) if err != nil { return nil, err } - l.spendLatency.WithLabelValues("batch", Allowed).Observe(l.clk.Since(start).Seconds()) - } else { - l.spendLatency.WithLabelValues("batch", Denied).Observe(l.clk.Since(start).Seconds()) + } + + // Observe latency equally across all transactions in the batch. + totalLatency := l.clk.Since(start) + perTxnLatency := totalLatency / time.Duration(len(txnOutcomes)) + for txn, outcome := range txnOutcomes { + l.spendLatency.WithLabelValues(txn.limit.name.String(), outcome).Observe(perTxnLatency.Seconds()) } return batchDecision.Decision, nil } diff --git a/ratelimits/source_redis.go b/ratelimits/source_redis.go index 2c807c9d4e8..83f7046280d 100644 --- a/ratelimits/source_redis.go +++ b/ratelimits/source_redis.go @@ -42,10 +42,15 @@ func NewRedisSource(client *redis.Ring, clk clock.Clock, stats prometheus.Regist } } +var errMixedSuccess = errors.New("some keys not found") + // resultForError returns a string representing the result of the operation // based on the provided error. func resultForError(err error) string { - if errors.Is(redis.Nil, err) { + if errors.Is(errMixedSuccess, err) { + // Indicates that some of the keys in a batchset operation were not found. + return "mixedSuccess" + } else if errors.Is(redis.Nil, err) { // Bucket key does not exist. return "notFound" } else if errors.Is(err, context.DeadlineExceeded) { @@ -68,6 +73,14 @@ func resultForError(err error) string { return "failed" } +func (r *RedisSource) observeLatency(call string, latency time.Duration, err error) { + result := "success" + if err != nil { + result = resultForError(err) + } + r.latency.With(prometheus.Labels{"call": call, "result": result}).Observe(latency.Seconds()) +} + // BatchSet stores TATs at the specified bucketKeys using a pipelined Redis // Transaction in order to reduce the number of round-trips to each Redis shard. // An error is returned if the operation failed and nil otherwise. @@ -80,11 +93,17 @@ func (r *RedisSource) BatchSet(ctx context.Context, buckets map[string]time.Time } _, err := pipeline.Exec(ctx) if err != nil { - r.latency.With(prometheus.Labels{"call": "batchset", "result": resultForError(err)}).Observe(time.Since(start).Seconds()) + r.observeLatency("batchset", r.clk.Since(start), err) return err } - r.latency.With(prometheus.Labels{"call": "batchset", "result": "success"}).Observe(time.Since(start).Seconds()) + totalLatency := r.clk.Since(start) + perSetLatency := totalLatency / time.Duration(len(buckets)) + for range buckets { + r.observeLatency("batchset_entry", perSetLatency, nil) + } + + r.observeLatency("batchset", totalLatency, nil) return nil } @@ -98,14 +117,15 @@ func (r *RedisSource) Get(ctx context.Context, bucketKey string) (time.Time, err if err != nil { if errors.Is(err, redis.Nil) { // Bucket key does not exist. - r.latency.With(prometheus.Labels{"call": "get", "result": "notFound"}).Observe(time.Since(start).Seconds()) + r.observeLatency("get", r.clk.Since(start), err) return time.Time{}, ErrBucketNotFound } - r.latency.With(prometheus.Labels{"call": "get", "result": resultForError(err)}).Observe(time.Since(start).Seconds()) + // An error occurred while retrieving the TAT. + r.observeLatency("get", r.clk.Since(start), err) return time.Time{}, err } - r.latency.With(prometheus.Labels{"call": "get", "result": "success"}).Observe(time.Since(start).Seconds()) + r.observeLatency("get", r.clk.Since(start), nil) return time.Unix(0, tatNano).UTC(), nil } @@ -121,28 +141,44 @@ func (r *RedisSource) BatchGet(ctx context.Context, bucketKeys []string) (map[st pipeline.Get(ctx, bucketKey) } results, err := pipeline.Exec(ctx) - if err != nil { - r.latency.With(prometheus.Labels{"call": "batchget", "result": resultForError(err)}).Observe(time.Since(start).Seconds()) - if !errors.Is(err, redis.Nil) { - return nil, err - } + if err != nil && !errors.Is(err, redis.Nil) { + r.observeLatency("batchget", r.clk.Since(start), err) + return nil, err } + totalLatency := r.clk.Since(start) + perEntryLatency := totalLatency / time.Duration(len(bucketKeys)) + tats := make(map[string]time.Time, len(bucketKeys)) + notFoundCount := 0 for i, result := range results { tatNano, err := result.(*redis.StringCmd).Int64() if err != nil { - if errors.Is(err, redis.Nil) { - // Bucket key does not exist. - continue + if !errors.Is(err, redis.Nil) { + // This should never happen as any errors should have been + // caught after the pipeline.Exec() call. + r.observeLatency("batchget", r.clk.Since(start), err) + return nil, err } - r.latency.With(prometheus.Labels{"call": "batchget", "result": resultForError(err)}).Observe(time.Since(start).Seconds()) - return nil, err + // Bucket key does not exist. + r.observeLatency("batchget_entry", perEntryLatency, err) + notFoundCount++ + continue } tats[bucketKeys[i]] = time.Unix(0, tatNano).UTC() + r.observeLatency("batchget_entry", perEntryLatency, nil) } - r.latency.With(prometheus.Labels{"call": "batchget", "result": "success"}).Observe(time.Since(start).Seconds()) + var batchErr error + if notFoundCount < len(results) { + // Some keys were not found. + batchErr = errMixedSuccess + } else if notFoundCount == len(results) { + // All keys were not found. + batchErr = redis.Nil + } + + r.observeLatency("batchget", totalLatency, batchErr) return tats, nil } @@ -154,11 +190,11 @@ func (r *RedisSource) Delete(ctx context.Context, bucketKey string) error { err := r.client.Del(ctx, bucketKey).Err() if err != nil { - r.latency.With(prometheus.Labels{"call": "delete", "result": resultForError(err)}).Observe(time.Since(start).Seconds()) + r.observeLatency("delete", r.clk.Since(start), err) return err } - r.latency.With(prometheus.Labels{"call": "delete", "result": "success"}).Observe(time.Since(start).Seconds()) + r.observeLatency("delete", r.clk.Since(start), nil) return nil } @@ -171,9 +207,10 @@ func (r *RedisSource) Ping(ctx context.Context) error { return shard.Ping(ctx).Err() }) if err != nil { - r.latency.With(prometheus.Labels{"call": "ping", "result": resultForError(err)}).Observe(time.Since(start).Seconds()) + r.observeLatency("ping", r.clk.Since(start), err) return err } - r.latency.With(prometheus.Labels{"call": "ping", "result": "success"}).Observe(time.Since(start).Seconds()) + + r.observeLatency("ping", r.clk.Since(start), nil) return nil }