diff --git a/pkg/sql/ttl.go b/pkg/sql/ttl.go index 673a8aa088b4..fac3a9d6e349 100644 --- a/pkg/sql/ttl.go +++ b/pkg/sql/ttl.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/sql/ttlpb" "github.com/cockroachdb/cockroach/pkg/util/metric" + "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" pbtypes "github.com/gogo/protobuf/types" @@ -54,6 +55,18 @@ var ttlDeleteBatchSize = settings.RegisterIntSetting( 100, ) +var ttlActive = settings.RegisterBoolSetting( + "job.ttl.enabled", + "whether the TTL job is enabled", + true, +) + +var ttlRateLimit = settings.RegisterIntSetting( + "job.ttl.rate_limit", + "maximum deletions per second", + 5000, +) + func (t ttlResumer) Resume(ctx context.Context, execCtx interface{}) error { p := execCtx.(JobExecContext) ie := p.ExecCfg().InternalExecutor @@ -64,8 +77,15 @@ func (t ttlResumer) Resume(ctx context.Context, execCtx interface{}) error { var pkStr string var pkTypes []string + if !ttlActive.Get(p.ExecCfg().SV()) { + return nil + } + metrics := p.ExecCfg().JobRegistry.MetricsStruct().TTL + limit := ttlRateLimit.Get(p.ExecCfg().SV()) + rl := quotapool.NewRateLimiter("ttl", quotapool.Limit(limit), limit) + // TODO(XXX): get dynamic table names. type rangeTarget struct { startKey []string @@ -244,8 +264,21 @@ func (t ttlResumer) Resume(ctx context.Context, execCtx interface{}) error { until = len(rows) } deleteBatch := rows[i:until] + a, err := rl.Acquire(ctx, int64(len(deleteBatch))) + if err != nil { + return err + } if err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + defer a.Consume() + if _, err := ie.Exec( + ctx, + "ttl_delete_low_pri", + txn, + "SET TRANSACTION PRIORITY LOW", + ); err != nil { + return err + } placeholderVals := make([]interface{}, len(pks)*len(deleteBatch)) placeholderStr := "" for i, row := range deleteBatch { @@ -276,7 +309,7 @@ func (t ttlResumer) Resume(ctx context.Context, execCtx interface{}) error { return err } metrics.DeletionDeleteNanos.RecordValue(timeutil.Now().Sub(deletionStartTime).Nanoseconds()) - metrics.RowDeletions.Inc(int64(len(rows))) + metrics.RowDeletions.Inc(int64(len(deleteBatch))) return nil }); err != nil { return err