Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ratelimits: Fix latency calculations #7627

Merged
merged 4 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions ratelimits/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,26 @@ type Limiter struct {
// NewLimiter returns a new *Limiter. The provided source must be safe for
// concurrent use.
func NewLimiter(clk clock.Clock, source source, stats prometheus.Registerer) (*Limiter, error) {
limiter := &Limiter{source: source, clk: clk}
limiter.spendLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
spendLatency := prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "ratelimits_spend_latency",
Help: fmt.Sprintf("Latency of ratelimit checks labeled by limit=[name] and decision=[%s|%s], in seconds", Allowed, Denied),
// Exponential buckets ranging from 0.0005s to 3s.
Buckets: prometheus.ExponentialBuckets(0.0005, 3, 8),
}, []string{"limit", "decision"})
stats.MustRegister(limiter.spendLatency)
stats.MustRegister(spendLatency)

limiter.overrideUsageGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
overrideUsageGauge := prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "ratelimits_override_usage",
Help: "Proportion of override limit used, by limit name and bucket key.",
}, []string{"limit", "bucket_key"})
stats.MustRegister(limiter.overrideUsageGauge)

return limiter, nil
stats.MustRegister(overrideUsageGauge)

return &Limiter{
source: source,
clk: clk,
spendLatency: spendLatency,
overrideUsageGauge: overrideUsageGauge,
}, nil
}

type Decision struct {
Expand Down Expand Up @@ -166,6 +170,8 @@ func (d *batchDecision) merge(in *Decision) {
// - Remaining is the smallest value of each across all Decisions, and
// - Decisions resulting from spend-only Transactions are never merged.
func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision, error) {
start := l.clk.Now()

batch, bucketKeys, err := prepareBatch(txns)
if err != nil {
return nil, err
Expand All @@ -183,9 +189,9 @@ func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision
return nil, err
}

start := l.clk.Now()
batchDecision := newBatchDecision()
newTATs := make(map[string]time.Time)
txnOutcomes := make(map[Transaction]string)

for _, txn := range batch {
tat, exists := tats[txn.bucketKey]
Expand All @@ -209,16 +215,25 @@ func (l *Limiter) BatchSpend(ctx context.Context, txns []Transaction) (*Decision
if !txn.spendOnly() {
batchDecision.merge(d)
}

txnOutcomes[txn] = Denied
if d.Allowed {
txnOutcomes[txn] = Allowed
}
}

if batchDecision.Allowed {
if batchDecision.Allowed && len(newTATs) > 0 {
err = l.source.BatchSet(ctx, newTATs)
if err != nil {
return nil, err
}
l.spendLatency.WithLabelValues("batch", Allowed).Observe(l.clk.Since(start).Seconds())
} else {
l.spendLatency.WithLabelValues("batch", Denied).Observe(l.clk.Since(start).Seconds())
}

// Observe latency equally across all transactions in the batch.
totalLatency := l.clk.Since(start)
perTxnLatency := totalLatency / time.Duration(len(txnOutcomes))
for txn, outcome := range txnOutcomes {
l.spendLatency.WithLabelValues(txn.limit.name.String(), outcome).Observe(perTxnLatency.Seconds())
}
return batchDecision.Decision, nil
}
Expand Down
79 changes: 58 additions & 21 deletions ratelimits/source_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,15 @@ func NewRedisSource(client *redis.Ring, clk clock.Clock, stats prometheus.Regist
}
}

var errMixedSuccess = errors.New("some keys not found")

// resultForError returns a string representing the result of the operation
// based on the provided error.
func resultForError(err error) string {
if errors.Is(redis.Nil, err) {
if errors.Is(errMixedSuccess, err) {
// Indicates that some of the keys in a batchset operation were not found.
return "mixedSuccess"
} else if errors.Is(redis.Nil, err) {
// Bucket key does not exist.
return "notFound"
} else if errors.Is(err, context.DeadlineExceeded) {
Expand All @@ -68,6 +73,14 @@ func resultForError(err error) string {
return "failed"
}

func (r *RedisSource) observeLatency(call string, latency time.Duration, err error) {
result := "success"
if err != nil {
result = resultForError(err)
}
r.latency.With(prometheus.Labels{"call": call, "result": result}).Observe(latency.Seconds())
}

// BatchSet stores TATs at the specified bucketKeys using a pipelined Redis
// Transaction in order to reduce the number of round-trips to each Redis shard.
// An error is returned if the operation failed and nil otherwise.
Expand All @@ -80,11 +93,17 @@ func (r *RedisSource) BatchSet(ctx context.Context, buckets map[string]time.Time
}
_, err := pipeline.Exec(ctx)
if err != nil {
r.latency.With(prometheus.Labels{"call": "batchset", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
r.observeLatency("batchset", r.clk.Since(start), err)
return err
}

r.latency.With(prometheus.Labels{"call": "batchset", "result": "success"}).Observe(time.Since(start).Seconds())
totalLatency := r.clk.Since(start)
perSetLatency := totalLatency / time.Duration(len(buckets))
for range buckets {
r.observeLatency("batchset_entry", perSetLatency, nil)
}

r.observeLatency("batchset", totalLatency, nil)
return nil
}

Expand All @@ -98,14 +117,15 @@ func (r *RedisSource) Get(ctx context.Context, bucketKey string) (time.Time, err
if err != nil {
if errors.Is(err, redis.Nil) {
// Bucket key does not exist.
r.latency.With(prometheus.Labels{"call": "get", "result": "notFound"}).Observe(time.Since(start).Seconds())
r.observeLatency("get", r.clk.Since(start), err)
return time.Time{}, ErrBucketNotFound
}
r.latency.With(prometheus.Labels{"call": "get", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
// An error occurred while retrieving the TAT.
r.observeLatency("get", r.clk.Since(start), err)
return time.Time{}, err
}

r.latency.With(prometheus.Labels{"call": "get", "result": "success"}).Observe(time.Since(start).Seconds())
r.observeLatency("get", r.clk.Since(start), nil)
return time.Unix(0, tatNano).UTC(), nil
}

Expand All @@ -121,28 +141,44 @@ func (r *RedisSource) BatchGet(ctx context.Context, bucketKeys []string) (map[st
pipeline.Get(ctx, bucketKey)
}
results, err := pipeline.Exec(ctx)
if err != nil {
r.latency.With(prometheus.Labels{"call": "batchget", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
if !errors.Is(err, redis.Nil) {
return nil, err
}
if err != nil && !errors.Is(err, redis.Nil) {
r.observeLatency("batchget", r.clk.Since(start), err)
return nil, err
}

totalLatency := r.clk.Since(start)
perEntryLatency := totalLatency / time.Duration(len(bucketKeys))

tats := make(map[string]time.Time, len(bucketKeys))
notFoundCount := 0
for i, result := range results {
tatNano, err := result.(*redis.StringCmd).Int64()
if err != nil {
if errors.Is(err, redis.Nil) {
// Bucket key does not exist.
continue
if !errors.Is(err, redis.Nil) {
// This should never happen as any errors should have been
// caught after the pipeline.Exec() call.
r.observeLatency("batchget", r.clk.Since(start), err)
return nil, err
}
r.latency.With(prometheus.Labels{"call": "batchget", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
return nil, err
// Bucket key does not exist.
r.observeLatency("batchget_entry", perEntryLatency, err)
notFoundCount++
continue
}
tats[bucketKeys[i]] = time.Unix(0, tatNano).UTC()
r.observeLatency("batchget_entry", perEntryLatency, nil)
}

r.latency.With(prometheus.Labels{"call": "batchget", "result": "success"}).Observe(time.Since(start).Seconds())
var batchErr error
if notFoundCount < len(results) {
// Some keys were not found.
batchErr = errMixedSuccess
} else if notFoundCount == len(results) {
// All keys were not found.
batchErr = redis.Nil
}

r.observeLatency("batchget", totalLatency, batchErr)
return tats, nil
}

Expand All @@ -154,11 +190,11 @@ func (r *RedisSource) Delete(ctx context.Context, bucketKey string) error {

err := r.client.Del(ctx, bucketKey).Err()
if err != nil {
r.latency.With(prometheus.Labels{"call": "delete", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
r.observeLatency("delete", r.clk.Since(start), err)
return err
}

r.latency.With(prometheus.Labels{"call": "delete", "result": "success"}).Observe(time.Since(start).Seconds())
r.observeLatency("delete", r.clk.Since(start), nil)
return nil
}

Expand All @@ -171,9 +207,10 @@ func (r *RedisSource) Ping(ctx context.Context) error {
return shard.Ping(ctx).Err()
})
if err != nil {
r.latency.With(prometheus.Labels{"call": "ping", "result": resultForError(err)}).Observe(time.Since(start).Seconds())
r.observeLatency("ping", r.clk.Since(start), err)
return err
}
r.latency.With(prometheus.Labels{"call": "ping", "result": "success"}).Observe(time.Since(start).Seconds())

r.observeLatency("ping", r.clk.Since(start), nil)
return nil
}