diff --git a/docs/RFCS/20220120_row_level_ttl.md b/docs/RFCS/20220120_row_level_ttl.md
index e41f8b1cdf7b..293db662088e 100644
--- a/docs/RFCS/20220120_row_level_ttl.md
+++ b/docs/RFCS/20220120_row_level_ttl.md
@@ -137,6 +137,8 @@ TTL metadata is stored on the TableDescriptor:
```protobuf
message TableDescriptor {
message RowLevelTTL {
+ option (gogoproto.equal) = true;
+
// DurationExpr is the automatically assigned interval for when the TTL should apply to a row.
optional string duration_expr = 1 [(gogoproto.nullable)=false, (gogoproto.casttype)="Expression"];
// SelectBatchSize is the amount of rows that should be fetched at a time
@@ -147,8 +149,8 @@ message TableDescriptor {
optional string deletion_cron = 4 [(gogoproto.nullable)=false];
// ScheduleID is the ID of the row-level TTL job schedules.
optional int64 schedule_id = 5 [(gogoproto.customname)="ScheduleID",(gogoproto.nullable)=false];
- // RangeConcurrency is the number of ranges to process at a time.
- optional int64 range_concurrency = 6 [(gogoproto.nullable)=false];
+ // RangeConcurrency is based on the number of spans and is no longer configurable.
+ reserved 6;
// DeleteRateLimit is the maximum amount of rows to delete per second.
optional int64 delete_rate_limit = 7 [(gogoproto.nullable)=false];
// Pause is set if the TTL job should not run.
@@ -180,7 +182,6 @@ the following options to control the TTL job:
| `ttl_expiration_expression` | If set, uses the expression specified as the TTL expiration. Defaults to just using the `crdb_internal_expiration` column. |
| `ttl_select_batch_size` | How many rows to fetch from the range that have expired at a given time. Defaults to 500. Must be at least `1`. |
| `ttl_delete_batch_size` | How many rows to delete at a time. Defaults to 100. Must be at least `1`. |
-| `ttl_range_concurrency` | How many concurrent ranges are being worked on at a time. Defaults to `cpu_core_count`. Must be at least `1`. |
| `ttl_delete_rate_limit` | Maximum number of rows to be deleted per second (acts as the rate limit). Defaults to 0 (signifying none). |
| `ttl_row_stats_poll_interval` | Whilst the TTL job is running, counts rows and expired rows on the table to report as prometheus metrics. By default unset, meaning no stats are fetched. |
| `ttl_pause` | Stops the TTL job from executing. |
@@ -291,8 +292,7 @@ are additional knobs a user can use to control how effective the deletion
performs:
* how often the deletion job runs (controls amount of "junk" data left)
* table GC time (when tombstones are removed and space is therefore reclaimed)
-* the size of the ranges on the table, which has knock on effects for
- `ttl_range_concurrency`.
+* the distribution of the ranges on the table
### Admission Control
To ensure the deletion job does not affect foreground traffic, we plan on using
diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt
index 543b2b86514a..57904d2e927f 100644
--- a/docs/generated/settings/settings-for-tenants.txt
+++ b/docs/generated/settings/settings-for-tenants.txt
@@ -282,7 +282,6 @@ sql.trace.stmt.enable_threshold duration 0s enables tracing on all statements; s
sql.trace.txn.enable_threshold duration 0s enables tracing on all transactions; transactions open for longer than this duration will have their trace logged (set to 0 to disable); note that enabling this may have a negative performance impact; this setting is coarser-grained than sql.trace.stmt.enable_threshold because it applies to all statements within a transaction as well as client communication (e.g. retries)
sql.ttl.default_delete_batch_size integer 100 default amount of rows to delete in a single query during a TTL job
sql.ttl.default_delete_rate_limit integer 0 default delete rate limit for all TTL jobs. Use 0 to signify no rate limit.
-sql.ttl.default_range_concurrency integer 1 default amount of ranges to process at once during a TTL delete
sql.ttl.default_select_batch_size integer 500 default amount of rows to select in a single query during a TTL job
sql.ttl.job.enabled boolean true whether the TTL job is enabled
sql.txn_fingerprint_id_cache.capacity integer 100 the maximum number of txn fingerprint IDs stored
diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html
index 2b322f690678..a7c2a1839498 100644
--- a/docs/generated/settings/settings.html
+++ b/docs/generated/settings/settings.html
@@ -218,7 +218,6 @@
sql.trace.txn.enable_threshold | duration | 0s | enables tracing on all transactions; transactions open for longer than this duration will have their trace logged (set to 0 to disable); note that enabling this may have a negative performance impact; this setting is coarser-grained than sql.trace.stmt.enable_threshold because it applies to all statements within a transaction as well as client communication (e.g. retries) |
sql.ttl.default_delete_batch_size | integer | 100 | default amount of rows to delete in a single query during a TTL job |
sql.ttl.default_delete_rate_limit | integer | 0 | default delete rate limit for all TTL jobs. Use 0 to signify no rate limit. |
-sql.ttl.default_range_concurrency | integer | 1 | default amount of ranges to process at once during a TTL delete |
sql.ttl.default_select_batch_size | integer | 500 | default amount of rows to select in a single query during a TTL job |
sql.ttl.job.enabled | boolean | true | whether the TTL job is enabled |
sql.txn_fingerprint_id_cache.capacity | integer | 100 | the maximum number of txn fingerprint IDs stored |
diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto
index 5f35cfdf5178..e0d412af9aba 100644
--- a/pkg/jobs/jobspb/jobs.proto
+++ b/pkg/jobs/jobspb/jobs.proto
@@ -1015,12 +1015,18 @@ message RowLevelTTLDetails {
}
message RowLevelTTLProgress {
+
// JobRowCount is the number of deleted rows for the entire TTL job.
int64 job_row_count = 1;
+
// ProcessorProgresses is the progress per DistSQL processor.
repeated RowLevelTTLProcessorProgress processor_progresses = 2 [(gogoproto.nullable)=false];
+ // UseDistSQL is true if the TTL job distributed the work to DistSQL processors (requires cluster v22.2).
bool use_dist_sql = 3 [(gogoproto.customname) = "UseDistSQL"];
+
+ // JobSpanCount is the number of spans for the entire TTL job.
+ int64 job_span_count = 4;
}
message RowLevelTTLProcessorProgress {
@@ -1037,6 +1043,12 @@ message RowLevelTTLProcessorProgress {
// ProcessorRowCount is the row count of the DistSQL processor.
int64 processor_row_count = 3;
+
+ // ProcessorSpanCount is the number of spans of the DistSQL processor;
+ int64 processor_span_count = 4;
+
+ // ProcessorConcurrency is the number parallel tasks the processor will do at once.
+ int64 processor_concurrency = 5;
}
message SchemaTelemetryDetails {
diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go
index 994c5e6ac9b8..db483924d32b 100644
--- a/pkg/settings/registry.go
+++ b/pkg/settings/registry.go
@@ -145,6 +145,9 @@ var retiredSettings = map[string]struct{}{
"kv.refresh_range.time_bound_iterators.enabled": {},
"sql.defaults.datestyle.enabled": {},
"sql.defaults.intervalstyle.enabled": {},
+
+ // removed as of 22.2.1
+ "sql.ttl.default_range_concurrency": {},
}
// sqlDefaultSettings is the list of "grandfathered" existing sql.defaults
diff --git a/pkg/sql/catalog/catpb/catalog.proto b/pkg/sql/catalog/catpb/catalog.proto
index 405ec6e79435..fd022f919d06 100644
--- a/pkg/sql/catalog/catpb/catalog.proto
+++ b/pkg/sql/catalog/catpb/catalog.proto
@@ -204,8 +204,8 @@ message RowLevelTTL {
optional string deletion_cron = 4 [(gogoproto.nullable)=false];
// ScheduleID is the ID of the row-level TTL job schedules.
optional int64 schedule_id = 5 [(gogoproto.customname)="ScheduleID",(gogoproto.nullable)=false];
- // RangeConcurrency is the number of ranges to process at a time.
- optional int64 range_concurrency = 6 [(gogoproto.nullable)=false];
+ // RangeConcurrency is based on the number of spans and is no longer configurable.
+ reserved 6;
// DeleteRateLimit is the maximum amount of rows to delete per second.
optional int64 delete_rate_limit = 7 [(gogoproto.nullable)=false];
// Pause is set if the TTL job should not run.
diff --git a/pkg/sql/catalog/tabledesc/structured.go b/pkg/sql/catalog/tabledesc/structured.go
index be3fe0775bd5..2c08b38d7052 100644
--- a/pkg/sql/catalog/tabledesc/structured.go
+++ b/pkg/sql/catalog/tabledesc/structured.go
@@ -2622,9 +2622,6 @@ func (desc *wrapper) GetStorageParams(spaceBetweenEqual bool) []string {
if bs := ttl.DeleteBatchSize; bs != 0 {
appendStorageParam(`ttl_delete_batch_size`, fmt.Sprintf(`%d`, bs))
}
- if rc := ttl.RangeConcurrency; rc != 0 {
- appendStorageParam(`ttl_range_concurrency`, fmt.Sprintf(`%d`, rc))
- }
if rl := ttl.DeleteRateLimit; rl != 0 {
appendStorageParam(`ttl_delete_rate_limit`, fmt.Sprintf(`%d`, rl))
}
diff --git a/pkg/sql/catalog/tabledesc/ttl.go b/pkg/sql/catalog/tabledesc/ttl.go
index 6d4c07914e9f..a9561d466c40 100644
--- a/pkg/sql/catalog/tabledesc/ttl.go
+++ b/pkg/sql/catalog/tabledesc/ttl.go
@@ -51,11 +51,6 @@ func ValidateRowLevelTTL(ttl *catpb.RowLevelTTL) error {
return err
}
}
- if ttl.RangeConcurrency != 0 {
- if err := ValidateTTLRangeConcurrency("ttl_range_concurrency", ttl.RangeConcurrency); err != nil {
- return err
- }
- }
if ttl.DeleteRateLimit != 0 {
if err := ValidateTTLRateLimit("ttl_delete_rate_limit", ttl.DeleteRateLimit); err != nil {
return err
@@ -155,18 +150,6 @@ func ValidateTTLBatchSize(key string, val int64) error {
return nil
}
-// ValidateTTLRangeConcurrency validates the batch size of a TTL.
-func ValidateTTLRangeConcurrency(key string, val int64) error {
- if val <= 0 {
- return pgerror.Newf(
- pgcode.InvalidParameterValue,
- `"%s" must be at least 1`,
- key,
- )
- }
- return nil
-}
-
// ValidateTTLCronExpr validates the cron expression of TTL.
func ValidateTTLCronExpr(key string, str string) error {
if _, err := cron.ParseStandard(str); err != nil {
diff --git a/pkg/sql/execinfrapb/processors_ttl.proto b/pkg/sql/execinfrapb/processors_ttl.proto
index 5a2c1d2e871d..db8d4b924e34 100644
--- a/pkg/sql/execinfrapb/processors_ttl.proto
+++ b/pkg/sql/execinfrapb/processors_ttl.proto
@@ -59,9 +59,8 @@ message TTLSpec {
// flow.
repeated roachpb.Span spans = 5 [(gogoproto.nullable) = false];
- // RangeConcurrency controls how many ranges a single ttlProcessor processes
- // in parallel.
- optional int64 range_concurrency = 6 [(gogoproto.nullable) = false];
+ // RangeConcurrency is based on the number of spans and is no longer configurable.
+ reserved 6;
// SelectBatchSize controls the batch size for SELECTs.
optional int64 select_batch_size = 7 [(gogoproto.nullable) = false];
diff --git a/pkg/sql/logictest/testdata/logic_test/row_level_ttl b/pkg/sql/logictest/testdata/logic_test/row_level_ttl
index 37e9dcede27a..f8c77bf18711 100644
--- a/pkg/sql/logictest/testdata/logic_test/row_level_ttl
+++ b/pkg/sql/logictest/testdata/logic_test/row_level_ttl
@@ -40,7 +40,7 @@ CREATE TABLE tbl (id INT PRIMARY KEY, text TEXT) WITH (ttl = 'on')
subtest end
-subtest todo_add_subtests
+subtest ttl_automatic_column_notice
query T noticetrace
CREATE TABLE tbl_ttl_automatic_column (id INT PRIMARY KEY, text TEXT) WITH (ttl_automatic_column = 'on')
@@ -52,6 +52,24 @@ ALTER TABLE tbl_ttl_automatic_column RESET (ttl_automatic_column)
----
NOTICE: ttl_automatic_column is no longer used. Setting ttl_expire_after automatically creates a TTL column. Resetting ttl_expire_after removes the automatically created column.
+subtest end
+
+subtest ttl_range_concurrency_notice
+
+query T noticetrace
+CREATE TABLE tbl_ttl_range_concurrency (id INT PRIMARY KEY, text TEXT) WITH (ttl_range_concurrency = 2)
+----
+NOTICE: ttl_range_concurrency is no longer configurable.
+
+query T noticetrace
+ALTER TABLE tbl_ttl_range_concurrency RESET (ttl_range_concurrency)
+----
+NOTICE: ttl_range_concurrency is no longer configurable.
+
+subtest end
+
+subtest todo_add_subtests
+
statement error expected DEFAULT expression of crdb_internal_expiration to be current_timestamp\(\):::TIMESTAMPTZ \+ '00:10:00':::INTERVAL
CREATE TABLE tbl (
id INT PRIMARY KEY,
@@ -432,12 +450,12 @@ CREATE TABLE tbl (
id INT PRIMARY KEY,
text TEXT,
FAMILY (id, text)
-) WITH (ttl_expire_after = '10 minutes', ttl_select_batch_size = 50, ttl_range_concurrency = 2, ttl_delete_rate_limit = 100, ttl_pause = true, ttl_row_stats_poll_interval = '1 minute', ttl_label_metrics = true)
+) WITH (ttl_expire_after = '10 minutes', ttl_select_batch_size = 50, ttl_delete_rate_limit = 100, ttl_pause = true, ttl_row_stats_poll_interval = '1 minute', ttl_label_metrics = true)
query T
SELECT reloptions FROM pg_class WHERE relname = 'tbl'
----
-{ttl='on',ttl_expire_after='00:10:00':::INTERVAL,ttl_job_cron='@hourly',ttl_select_batch_size=50,ttl_range_concurrency=2,ttl_delete_rate_limit=100,ttl_pause=true,ttl_row_stats_poll_interval='1m0s',ttl_label_metrics=true}
+{ttl='on',ttl_expire_after='00:10:00':::INTERVAL,ttl_job_cron='@hourly',ttl_select_batch_size=50,ttl_delete_rate_limit=100,ttl_pause=true,ttl_row_stats_poll_interval='1m0s',ttl_label_metrics=true}
query T
SELECT create_statement FROM [SHOW CREATE TABLE tbl]
@@ -448,7 +466,7 @@ CREATE TABLE public.tbl (
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_pkey PRIMARY KEY (id ASC),
FAMILY fam_0_id_text_crdb_internal_expiration (id, text, crdb_internal_expiration)
-) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_job_cron = '@hourly', ttl_select_batch_size = 50, ttl_range_concurrency = 2, ttl_delete_rate_limit = 100, ttl_pause = true, ttl_row_stats_poll_interval = '1m0s', ttl_label_metrics = true)
+) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_job_cron = '@hourly', ttl_select_batch_size = 50, ttl_delete_rate_limit = 100, ttl_pause = true, ttl_row_stats_poll_interval = '1m0s', ttl_label_metrics = true)
statement ok
ALTER TABLE tbl SET (ttl_delete_batch_size = 100)
@@ -462,7 +480,7 @@ CREATE TABLE public.tbl (
crdb_internal_expiration TIMESTAMPTZ NOT VISIBLE NOT NULL DEFAULT current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL ON UPDATE current_timestamp():::TIMESTAMPTZ + '00:10:00':::INTERVAL,
CONSTRAINT tbl_pkey PRIMARY KEY (id ASC),
FAMILY fam_0_id_text_crdb_internal_expiration (id, text, crdb_internal_expiration)
-) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_job_cron = '@hourly', ttl_select_batch_size = 50, ttl_delete_batch_size = 100, ttl_range_concurrency = 2, ttl_delete_rate_limit = 100, ttl_pause = true, ttl_row_stats_poll_interval = '1m0s', ttl_label_metrics = true)
+) WITH (ttl = 'on', ttl_expire_after = '00:10:00':::INTERVAL, ttl_job_cron = '@hourly', ttl_select_batch_size = 50, ttl_delete_batch_size = 100, ttl_delete_rate_limit = 100, ttl_pause = true, ttl_row_stats_poll_interval = '1m0s', ttl_label_metrics = true)
statement error "ttl_select_batch_size" must be at least 1
ALTER TABLE tbl SET (ttl_select_batch_size = -1)
@@ -470,9 +488,6 @@ ALTER TABLE tbl SET (ttl_select_batch_size = -1)
statement error "ttl_delete_batch_size" must be at least 1
ALTER TABLE tbl SET (ttl_delete_batch_size = -1)
-statement error "ttl_range_concurrency" must be at least 1
-ALTER TABLE tbl SET (ttl_range_concurrency = -1)
-
statement error "ttl_delete_rate_limit" must be at least 1
ALTER TABLE tbl SET (ttl_delete_rate_limit = -1)
@@ -480,7 +495,7 @@ statement error "ttl_row_stats_poll_interval" must be at least 1
ALTER TABLE tbl SET (ttl_row_stats_poll_interval = '-1 second')
statement ok
-ALTER TABLE tbl RESET (ttl_delete_batch_size, ttl_select_batch_size, ttl_range_concurrency, ttl_delete_rate_limit, ttl_pause, ttl_row_stats_poll_interval)
+ALTER TABLE tbl RESET (ttl_delete_batch_size, ttl_select_batch_size, ttl_delete_rate_limit, ttl_pause, ttl_row_stats_poll_interval)
query T
SELECT create_statement FROM [SHOW CREATE TABLE tbl]
diff --git a/pkg/sql/storageparam/tablestorageparam/table_storage_param.go b/pkg/sql/storageparam/tablestorageparam/table_storage_param.go
index 51aa0397331e..b9b5e848864a 100644
--- a/pkg/sql/storageparam/tablestorageparam/table_storage_param.go
+++ b/pkg/sql/storageparam/tablestorageparam/table_storage_param.go
@@ -125,6 +125,8 @@ var ttlAutomaticColumnNotice = pgnotice.Newf("ttl_automatic_column is no longer
"Setting ttl_expire_after automatically creates a TTL column. " +
"Resetting ttl_expire_after removes the automatically created column.")
+var ttlRangeConcurrencyNotice = pgnotice.Newf("ttl_range_concurrency is no longer configurable.")
+
var tableParams = map[string]tableParam{
`fillfactor`: {
onSet: func(po *Setter, semaCtx *tree.SemaContext, evalCtx *eval.Context, key string, datum tree.Datum) error {
@@ -307,23 +309,14 @@ var tableParams = map[string]tableParam{
return nil
},
},
+ // todo(wall): remove in 23.1
`ttl_range_concurrency`: {
onSet: func(po *Setter, semaCtx *tree.SemaContext, evalCtx *eval.Context, key string, datum tree.Datum) error {
- val, err := paramparse.DatumAsInt(evalCtx, key, datum)
- if err != nil {
- return err
- }
- if err := tabledesc.ValidateTTLRangeConcurrency(key, val); err != nil {
- return err
- }
- rowLevelTTL := po.getOrCreateRowLevelTTL()
- rowLevelTTL.RangeConcurrency = val
+ evalCtx.ClientNoticeSender.BufferClientNotice(evalCtx.Context, ttlRangeConcurrencyNotice)
return nil
},
onReset: func(po *Setter, evalCtx *eval.Context, key string) error {
- if po.hasRowLevelTTL() {
- po.UpdatedRowLevelTTL.RangeConcurrency = 0
- }
+ evalCtx.ClientNoticeSender.BufferClientNotice(evalCtx.Context, ttlRangeConcurrencyNotice)
return nil
},
},
diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go
index c69d9998f8ae..dc814bbf67f9 100644
--- a/pkg/sql/ttl/ttljob/ttljob.go
+++ b/pkg/sql/ttl/ttljob/ttljob.go
@@ -54,13 +54,6 @@ var (
100,
settings.PositiveInt,
).WithPublic()
- defaultRangeConcurrency = settings.RegisterIntSetting(
- settings.TenantWritable,
- "sql.ttl.default_range_concurrency",
- "default amount of ranges to process at once during a TTL delete",
- 1,
- settings.PositiveInt,
- ).WithPublic()
defaultDeleteRateLimit = settings.RegisterIntSetting(
settings.TenantWritable,
"sql.ttl.default_delete_rate_limit",
@@ -229,7 +222,6 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
}
jobID := t.job.ID()
- rangeConcurrency := getRangeConcurrency(settingsValues, rowLevelTTL)
selectBatchSize := getSelectBatchSize(settingsValues, rowLevelTTL)
deleteBatchSize := getDeleteBatchSize(settingsValues, rowLevelTTL)
deleteRateLimit := getDeleteRateLimit(settingsValues, rowLevelTTL)
@@ -240,7 +232,6 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
AOST: aost,
TTLExpr: ttlExpr,
Spans: spans,
- RangeConcurrency: rangeConcurrency,
SelectBatchSize: selectBatchSize,
DeleteBatchSize: deleteBatchSize,
DeleteRateLimit: deleteRateLimit,
@@ -250,6 +241,11 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
}
}
+ jobSpanCount := 0
+ for _, spanPartition := range spanPartitions {
+ jobSpanCount += len(spanPartition.Spans)
+ }
+
useDistSQL := execCfg.Settings.Version.IsActive(ctx, clusterversion.TTLDistSQL)
jobRegistry := execCfg.JobRegistry
if err := jobRegistry.UpdateJobWithTxn(
@@ -259,7 +255,9 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
true, /* useReadLock */
func(_ *kv.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
progress := md.Progress
- progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL.UseDistSQL = useDistSQL
+ rowLevelTTL := progress.Details.(*jobspb.Progress_RowLevelTTL).RowLevelTTL
+ rowLevelTTL.UseDistSQL = useDistSQL
+ rowLevelTTL.JobSpanCount = int64(jobSpanCount)
ju.UpdateProgress(progress)
return nil
},
@@ -267,7 +265,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err
return err
}
- if !useDistSQL {
+ if !useDistSQL { // TODO(ewall): Remove !useDistSQL block and ttlProcessorOverride in 23.1
var spans []roachpb.Span
for _, spanPartition := range spanPartitions {
spans = append(spans, spanPartition.Spans...)
@@ -377,14 +375,6 @@ func getDeleteBatchSize(sv *settings.Values, ttl catpb.RowLevelTTL) int64 {
return bs
}
-func getRangeConcurrency(sv *settings.Values, ttl catpb.RowLevelTTL) int64 {
- rc := ttl.RangeConcurrency
- if rc == 0 {
- rc = defaultRangeConcurrency.Get(sv)
- }
- return rc
-}
-
func getDeleteRateLimit(sv *settings.Values, ttl catpb.RowLevelTTL) int64 {
rl := ttl.DeleteRateLimit
if rl == 0 {
diff --git a/pkg/sql/ttl/ttljob/ttljob_processor.go b/pkg/sql/ttl/ttljob/ttljob_processor.go
index 7bd806a9c8bf..cf66e1e3efb4 100644
--- a/pkg/sql/ttl/ttljob/ttljob_processor.go
+++ b/pkg/sql/ttl/ttljob/ttljob_processor.go
@@ -12,6 +12,7 @@ package ttljob
import (
"context"
+ "runtime"
"sync/atomic"
"time"
@@ -76,7 +77,7 @@ func (t *ttlProcessor) getWorkFields() (
return flowCtx.Descriptors, serverCfg.DB, serverCfg.Codec, serverCfg.JobRegistry, flowCtx.NodeID.SQLInstanceID()
}
-func (t *ttlProcessor) getRangeFields() (
+func (t *ttlProcessor) getSpanFields() (
*settings.Values,
sqlutil.InternalExecutor,
*kv.DB,
@@ -102,7 +103,6 @@ func (t *ttlProcessor) work(ctx context.Context) error {
ttlSpec := t.ttlSpec
descsCol, db, codec, jobRegistry, sqlInstanceID := t.getWorkFields()
details := ttlSpec.RowLevelTTLDetails
- rangeConcurrency := ttlSpec.RangeConcurrency
deleteRateLimit := ttlSpec.DeleteRateLimit
deleteRateLimiter := quotapool.NewRateLimiter(
@@ -162,28 +162,33 @@ func (t *ttlProcessor) work(ctx context.Context) error {
)
group := ctxgroup.WithContext(ctx)
+ processorSpanCount := int64(len(ttlSpec.Spans))
+ processorConcurrency := int64(runtime.GOMAXPROCS(0))
+ if processorSpanCount < processorConcurrency {
+ processorConcurrency = processorSpanCount
+ }
err := func() error {
- rangeChan := make(chan rangeToProcess, rangeConcurrency)
- defer close(rangeChan)
- for i := int64(0); i < rangeConcurrency; i++ {
+ spanChan := make(chan spanToProcess, processorConcurrency)
+ defer close(spanChan)
+ for i := int64(0); i < processorConcurrency; i++ {
group.GoCtx(func(ctx context.Context) error {
- for rangeToProcess := range rangeChan {
+ for spanToProcess := range spanChan {
start := timeutil.Now()
- rangeRowCount, err := t.runTTLOnRange(
+ spanRowCount, err := t.runTTLOnSpan(
ctx,
metrics,
- rangeToProcess,
+ spanToProcess,
pkColumns,
relationName,
deleteRateLimiter,
)
// add before returning err in case of partial success
- atomic.AddInt64(&processorRowCount, rangeRowCount)
+ atomic.AddInt64(&processorRowCount, spanRowCount)
metrics.RangeTotalDuration.RecordValue(int64(timeutil.Since(start)))
if err != nil {
// Continue until channel is fully read.
// Otherwise, the keys input will be blocked.
- for rangeToProcess = range rangeChan {
+ for spanToProcess = range spanChan {
}
return err
}
@@ -192,7 +197,7 @@ func (t *ttlProcessor) work(ctx context.Context) error {
})
}
- // Iterate over every range to feed work for the goroutine processors.
+ // Iterate over every span to feed work for the goroutine processors.
var alloc tree.DatumAlloc
for _, span := range ttlSpec.Spans {
startPK, err := keyToDatums(roachpb.RKey(span.Key), codec, pkTypes, &alloc)
@@ -203,7 +208,7 @@ func (t *ttlProcessor) work(ctx context.Context) error {
if err != nil {
return err
}
- rangeChan <- rangeToProcess{
+ spanChan <- spanToProcess{
startPK: startPK,
endPK: endPK,
}
@@ -230,9 +235,11 @@ func (t *ttlProcessor) work(ctx context.Context) error {
rowLevelTTL.JobRowCount += processorRowCount
processorID := t.ProcessorID
rowLevelTTL.ProcessorProgresses = append(rowLevelTTL.ProcessorProgresses, jobspb.RowLevelTTLProcessorProgress{
- ProcessorID: processorID,
- SQLInstanceID: sqlInstanceID,
- ProcessorRowCount: processorRowCount,
+ ProcessorID: processorID,
+ SQLInstanceID: sqlInstanceID,
+ ProcessorRowCount: processorRowCount,
+ ProcessorSpanCount: processorSpanCount,
+ ProcessorConcurrency: processorConcurrency,
})
ju.UpdateProgress(progress)
log.VInfof(
@@ -246,15 +253,15 @@ func (t *ttlProcessor) work(ctx context.Context) error {
)
}
-// rangeRowCount should be checked even if the function returns an error because it may have partially succeeded
-func (t *ttlProcessor) runTTLOnRange(
+// spanRowCount should be checked even if the function returns an error because it may have partially succeeded
+func (t *ttlProcessor) runTTLOnSpan(
ctx context.Context,
metrics rowLevelTTLMetrics,
- rangeToProcess rangeToProcess,
+ spanToProcess spanToProcess,
pkColumns []string,
relationName string,
deleteRateLimiter *quotapool.RateLimiter,
-) (rangeRowCount int64, err error) {
+) (spanRowCount int64, err error) {
metrics.NumActiveRanges.Inc(1)
defer metrics.NumActiveRanges.Dec(1)
@@ -265,7 +272,7 @@ func (t *ttlProcessor) runTTLOnRange(
tableID := details.TableID
cutoff := details.Cutoff
ttlExpr := ttlSpec.TTLExpr
- settingsValues, ie, db, descsCol := t.getRangeFields()
+ settingsValues, ie, db, descsCol := t.getSpanFields()
selectBatchSize := ttlSpec.SelectBatchSize
selectBuilder := makeSelectQueryBuilder(
@@ -273,7 +280,7 @@ func (t *ttlProcessor) runTTLOnRange(
cutoff,
pkColumns,
relationName,
- rangeToProcess,
+ spanToProcess,
ttlSpec.AOST,
selectBatchSize,
ttlExpr,
@@ -299,14 +306,14 @@ func (t *ttlProcessor) runTTLOnRange(
},
preSelectStatement,
); err != nil {
- return rangeRowCount, err
+ return spanRowCount, err
}
}
for {
// Check the job is enabled on every iteration.
if err := checkEnabled(settingsValues); err != nil {
- return rangeRowCount, err
+ return spanRowCount, err
}
// Step 1. Fetch some rows we want to delete using a historical
@@ -315,7 +322,7 @@ func (t *ttlProcessor) runTTLOnRange(
expiredRowsPKs, err := selectBuilder.run(ctx, ie)
metrics.SelectDuration.RecordValue(int64(timeutil.Since(start)))
if err != nil {
- return rangeRowCount, errors.Wrapf(err, "error selecting rows to delete")
+ return spanRowCount, errors.Wrapf(err, "error selecting rows to delete")
}
numExpiredRows := int64(len(expiredRowsPKs))
metrics.RowSelections.Inc(numExpiredRows)
@@ -359,10 +366,10 @@ func (t *ttlProcessor) runTTLOnRange(
metrics.DeleteDuration.RecordValue(int64(timeutil.Since(start)))
metrics.RowDeletions.Inc(batchRowCount)
- rangeRowCount += batchRowCount
+ spanRowCount += batchRowCount
return nil
}); err != nil {
- return rangeRowCount, errors.Wrapf(err, "error during row deletion")
+ return spanRowCount, errors.Wrapf(err, "error during row deletion")
}
}
@@ -375,10 +382,10 @@ func (t *ttlProcessor) runTTLOnRange(
}
}
- return rangeRowCount, nil
+ return spanRowCount, nil
}
-// keyToDatums translates a RKey on a range for a table to the appropriate datums.
+// keyToDatums translates a RKey on a span for a table to the appropriate datums.
func keyToDatums(
key roachpb.RKey, codec keys.SQLCodec, pkTypes []*types.T, alloc *tree.DatumAlloc,
) (tree.Datums, error) {
@@ -387,7 +394,7 @@ func keyToDatums(
// Decode the datums ourselves, instead of using rowenc.DecodeKeyVals.
// We cannot use rowenc.DecodeKeyVals because we may not have the entire PK
- // as the key for the range (e.g. a PK (a, b) may only be split on (a)).
+ // as the key for the span (e.g. a PK (a, b) may only be split on (a)).
rKey, err := codec.StripTenantPrefix(rKey)
if err != nil {
return nil, errors.Wrapf(err, "error decoding tenant prefix of %x", key)
diff --git a/pkg/sql/ttl/ttljob/ttljob_query_builder.go b/pkg/sql/ttl/ttljob/ttljob_query_builder.go
index aba6791ebd7d..c46d40dd20cc 100644
--- a/pkg/sql/ttl/ttljob/ttljob_query_builder.go
+++ b/pkg/sql/ttl/ttljob/ttljob_query_builder.go
@@ -33,7 +33,7 @@ type selectQueryBuilder struct {
tableID descpb.ID
pkColumns []string
selectOpName string
- rangeToProcess rangeToProcess
+ spanToProcess spanToProcess
selectBatchSize int64
aost time.Time
ttlExpr catpb.Expression
@@ -52,7 +52,7 @@ type selectQueryBuilder struct {
endPKColumnNamesSQL string
}
-type rangeToProcess struct {
+type spanToProcess struct {
startPK, endPK tree.Datums
}
@@ -61,7 +61,7 @@ func makeSelectQueryBuilder(
cutoff time.Time,
pkColumns []string,
relationName string,
- rangeToProcess rangeToProcess,
+ spanToProcess spanToProcess,
aost time.Time,
selectBatchSize int64,
ttlExpr catpb.Expression,
@@ -70,11 +70,11 @@ func makeSelectQueryBuilder(
// is reserved for AOST, and len(pkColumns) for both start and end key.
cachedArgs := make([]interface{}, 0, 1+len(pkColumns)*2)
cachedArgs = append(cachedArgs, cutoff)
- endPK := rangeToProcess.endPK
+ endPK := spanToProcess.endPK
for _, d := range endPK {
cachedArgs = append(cachedArgs, d)
}
- startPK := rangeToProcess.startPK
+ startPK := spanToProcess.startPK
for _, d := range startPK {
cachedArgs = append(cachedArgs, d)
}
@@ -83,7 +83,7 @@ func makeSelectQueryBuilder(
tableID: tableID,
pkColumns: pkColumns,
selectOpName: fmt.Sprintf("ttl select %s", relationName),
- rangeToProcess: rangeToProcess,
+ spanToProcess: spanToProcess,
aost: aost,
selectBatchSize: selectBatchSize,
ttlExpr: ttlExpr,
@@ -98,9 +98,9 @@ func makeSelectQueryBuilder(
func (b *selectQueryBuilder) buildQuery() string {
// Generate the end key clause for SELECT, which always stays the same.
// Start from $2 as $1 is for the now clause.
- // The end key of a range is exclusive, so use <.
+ // The end key of a span is exclusive, so use <.
var endFilterClause string
- endPK := b.rangeToProcess.endPK
+ endPK := b.spanToProcess.endPK
if len(endPK) > 0 {
endFilterClause = fmt.Sprintf(" AND (%s) < (", b.endPKColumnNamesSQL)
for i := range endPK {
@@ -112,7 +112,7 @@ func (b *selectQueryBuilder) buildQuery() string {
endFilterClause += ")"
}
- startPK := b.rangeToProcess.startPK
+ startPK := b.spanToProcess.startPK
var filterClause string
if !b.isFirst {
// After the first query, we always want (col1, ...) > (cursor_col_1, ...)
@@ -199,7 +199,7 @@ func (b *selectQueryBuilder) moveCursor(rows []tree.Datums) error {
// Move the cursor forward.
if len(rows) > 0 {
lastRow := rows[len(rows)-1]
- b.cachedArgs = b.cachedArgs[:1+len(b.rangeToProcess.endPK)]
+ b.cachedArgs = b.cachedArgs[:1+len(b.spanToProcess.endPK)]
if len(lastRow) != len(b.pkColumns) {
return errors.AssertionFailedf("expected %d columns for last row, got %d", len(b.pkColumns), len(lastRow))
}
diff --git a/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go b/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go
index bbb3707d61b4..ab483097e349 100644
--- a/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go
+++ b/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go
@@ -45,7 +45,7 @@ func TestSelectQueryBuilder(t *testing.T) {
mockTime,
[]string{"col1", "col2"},
"relation_name",
- rangeToProcess{
+ spanToProcess{
startPK: tree.Datums{tree.NewDInt(100), tree.NewDInt(5)},
endPK: tree.Datums{tree.NewDInt(200), tree.NewDInt(15)},
},
@@ -111,7 +111,7 @@ LIMIT 2`,
mockTime,
[]string{"col1", "col2"},
"table_name",
- rangeToProcess{},
+ spanToProcess{},
mockTime,
2,
colinfo.TTLDefaultExpirationColumnName,
@@ -170,7 +170,7 @@ LIMIT 2`,
mockTime,
[]string{"col1", "col2"},
"table_name",
- rangeToProcess{
+ spanToProcess{
startPK: tree.Datums{tree.NewDInt(100)},
endPK: tree.Datums{tree.NewDInt(181)},
},
@@ -236,7 +236,7 @@ LIMIT 2`,
mockTime,
[]string{"col1", "col2"},
"table_name",
- rangeToProcess{
+ spanToProcess{
endPK: tree.Datums{tree.NewDInt(200), tree.NewDInt(15)},
},
mockTime,
@@ -300,7 +300,7 @@ LIMIT 2`,
mockTime,
[]string{"col1", "col2"},
"table_name",
- rangeToProcess{
+ spanToProcess{
startPK: tree.Datums{tree.NewDInt(100), tree.NewDInt(5)},
},
mockTime,
diff --git a/pkg/sql/ttl/ttljob/ttljob_test.go b/pkg/sql/ttl/ttljob/ttljob_test.go
index 94c743586ff8..358c6db7826c 100644
--- a/pkg/sql/ttl/ttljob/ttljob_test.go
+++ b/pkg/sql/ttl/ttljob/ttljob_test.go
@@ -238,9 +238,14 @@ func (h *rowLevelTTLTestJobTestHelper) verifyExpiredRowsJobOnly(
require.Equal(t, 1, jobCount)
}
+type processor struct {
+ spanCount int64
+ rowCount int64
+}
+
func (h *rowLevelTTLTestJobTestHelper) verifyExpiredRows(
t *testing.T,
- expectedSQLInstanceIDToProcessorRowCountMap map[base.SQLInstanceID]int64,
+ expectedSQLInstanceIDToProcessorMap map[base.SQLInstanceID]*processor,
expectedUseDistSQL bool,
) {
rows := h.sqlDB.Query(t, `
@@ -264,6 +269,7 @@ func (h *rowLevelTTLTestJobTestHelper) verifyExpiredRows(
processorProgresses := rowLevelTTLProgress.ProcessorProgresses
processorIDs := make(map[int32]struct{}, len(processorProgresses))
sqlInstanceIDs := make(map[base.SQLInstanceID]struct{}, len(processorProgresses))
+ expectedJobSpanCount := int64(0)
expectedJobRowCount := int64(0)
for i, processorProgress := range rowLevelTTLProgress.ProcessorProgresses {
processorID := processorProgress.ProcessorID
@@ -273,12 +279,18 @@ func (h *rowLevelTTLTestJobTestHelper) verifyExpiredRows(
require.NotContains(t, sqlInstanceIDs, sqlInstanceID, i)
sqlInstanceIDs[sqlInstanceID] = struct{}{}
- expectedProcessorRowCount, ok := expectedSQLInstanceIDToProcessorRowCountMap[sqlInstanceID]
+ expectedProcessor, ok := expectedSQLInstanceIDToProcessorMap[sqlInstanceID]
require.True(t, ok, i)
- require.Equal(t, expectedProcessorRowCount, processorProgress.ProcessorRowCount)
+ expectedProcessorSpanCount := expectedProcessor.spanCount
+ require.Equal(t, expectedProcessorSpanCount, processorProgress.ProcessorSpanCount)
+ expectedJobSpanCount += expectedProcessorSpanCount
+
+ expectedProcessorRowCount := expectedProcessor.rowCount
+ require.Equal(t, expectedProcessorRowCount, processorProgress.ProcessorRowCount)
expectedJobRowCount += expectedProcessorRowCount
}
+ require.Equal(t, expectedJobSpanCount, rowLevelTTLProgress.JobSpanCount)
require.Equal(t, expectedJobRowCount, rowLevelTTLProgress.JobRowCount)
require.Equal(t, expectedUseDistSQL, rowLevelTTLProgress.UseDistSQL)
jobCount++
@@ -314,7 +326,7 @@ func TestRowLevelTTLInterruptDuringExecution(t *testing.T) {
createTable := `CREATE TABLE t (
id INT PRIMARY KEY
-) WITH (ttl_expire_after = '10 minutes', ttl_range_concurrency = 2);
+) WITH (ttl_expire_after = '10 minutes');
ALTER TABLE t SPLIT AT VALUES (1), (2);
INSERT INTO t (id, crdb_internal_expiration) VALUES (1, now() - '1 month'), (2, now() - '1 month');`
@@ -379,7 +391,7 @@ func TestRowLevelTTLJobDisabled(t *testing.T) {
}
return fmt.Sprintf(`CREATE TABLE t (
id INT PRIMARY KEY
-) WITH (ttl_expire_after = '10 minutes', ttl_range_concurrency = 2%s);
+) WITH (ttl_expire_after = '10 minutes'%s);
INSERT INTO t (id, crdb_internal_expiration) VALUES (1, now() - '1 month'), (2, now() - '1 month');`, pauseStr)
}
@@ -515,7 +527,9 @@ func TestRowLevelTTLJobMultipleNodes(t *testing.T) {
}
require.NotEqual(t, -1, leaseHolderServerIdx)
- const rowsPerRange = 10
+ const expiredRowsPerRange = 5
+ const nonExpiredRowsPerRange = 5
+ const rowsPerRange = expiredRowsPerRange + nonExpiredRowsPerRange
type rangeSplit struct {
sqlInstanceID base.SQLInstanceID
offset int
@@ -559,7 +573,7 @@ func TestRowLevelTTLJobMultipleNodes(t *testing.T) {
nonExpiredTs := ts.Add(time.Hour * 24 * 30)
expiredTs := ts.Add(-time.Hour)
const insertStatement = `INSERT INTO tbl VALUES ($1, $2)`
- expectedSQLInstanceIDToProcessorRowCountMap := make(map[base.SQLInstanceID]int64, numRanges)
+ expectedSQLInstanceIDToProcessorMap := make(map[base.SQLInstanceID]*processor, numRanges)
expectedUseDistSQL := version == 0
for _, rangeSplit := range rangeSplits {
offset := rangeSplit.offset
@@ -569,13 +583,19 @@ func TestRowLevelTTLJobMultipleNodes(t *testing.T) {
expectedNumNonExpiredRows++
sqlDB.Exec(t, insertStatement, i, expiredTs)
i++
- expectedSQLInstanceID := rangeSplit.sqlInstanceID
- // one node deletes all rows in 22.1
- if !expectedUseDistSQL {
- expectedSQLInstanceID = leaseHolderSQLInstanceID
- }
- expectedSQLInstanceIDToProcessorRowCountMap[expectedSQLInstanceID]++
}
+ expectedSQLInstanceID := rangeSplit.sqlInstanceID
+ // one node deletes all rows in 22.1
+ if !expectedUseDistSQL {
+ expectedSQLInstanceID = leaseHolderSQLInstanceID
+ }
+ expectedProcessor, ok := expectedSQLInstanceIDToProcessorMap[expectedSQLInstanceID]
+ if !ok {
+ expectedProcessor = &processor{}
+ expectedSQLInstanceIDToProcessorMap[expectedSQLInstanceID] = expectedProcessor
+ }
+ expectedProcessor.spanCount++
+ expectedProcessor.rowCount += expiredRowsPerRange
}
// Force the schedule to execute.
@@ -583,7 +603,7 @@ func TestRowLevelTTLJobMultipleNodes(t *testing.T) {
// Verify results
th.verifyNonExpiredRows(t, tableName, expirationExpr, expectedNumNonExpiredRows)
- th.verifyExpiredRows(t, expectedSQLInstanceIDToProcessorRowCountMap, expectedUseDistSQL)
+ th.verifyExpiredRows(t, expectedSQLInstanceIDToProcessorMap, expectedUseDistSQL)
})
}
}
@@ -668,7 +688,7 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) {
createTable: `CREATE TABLE tbl (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
text TEXT
-) WITH (ttl_expire_after = '30 days', ttl_select_batch_size = 50, ttl_delete_batch_size = 10, ttl_range_concurrency = 3)`,
+) WITH (ttl_expire_after = '30 days', ttl_select_batch_size = 50, ttl_delete_batch_size = 10)`,
numExpiredRows: 1001,
numNonExpiredRows: 5,
numSplits: 10,
@@ -705,7 +725,7 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) {
"quote-kw-col" TIMESTAMPTZ,
text TEXT,
PRIMARY KEY (id, other_col, "quote-kw-col")
-) WITH (ttl_expire_after = '30 days', ttl_select_batch_size = 50, ttl_delete_batch_size = 10, ttl_range_concurrency = 3)`,
+) WITH (ttl_expire_after = '30 days', ttl_select_batch_size = 50, ttl_delete_batch_size = 10)`,
numExpiredRows: 1001,
numNonExpiredRows: 5,
numSplits: 10,
@@ -719,7 +739,7 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) {
text TEXT,
INDEX text_idx (text),
PRIMARY KEY (id, other_col, "quote-kw-col")
-) WITH (ttl_expire_after = '30 days', ttl_select_batch_size = 50, ttl_delete_batch_size = 10, ttl_range_concurrency = 3)`,
+) WITH (ttl_expire_after = '30 days', ttl_select_batch_size = 50, ttl_delete_batch_size = 10)`,
postSetup: []string{
`ALTER INDEX tbl@text_idx SPLIT AT VALUES ('bob')`,
},
@@ -758,12 +778,11 @@ func TestRowLevelTTLJobRandomEntries(t *testing.T) {
rand_col_2 %s,
text TEXT,
PRIMARY KEY (id, rand_col_1, rand_col_2)
-) WITH (ttl_expire_after = '30 days', ttl_select_batch_size = %d, ttl_delete_batch_size = %d, ttl_range_concurrency = %d)`,
+) WITH (ttl_expire_after = '30 days', ttl_select_batch_size = %d, ttl_delete_batch_size = %d)`,
randgen.RandTypeFromSlice(rng, indexableTyps).SQLString(),
randgen.RandTypeFromSlice(rng, indexableTyps).SQLString(),
1+rng.Intn(100),
1+rng.Intn(100),
- 1+rng.Intn(3),
),
numSplits: 1 + rng.Intn(9),
numExpiredRows: rng.Intn(2000),
diff --git a/pkg/workload/ttlbench/ttlbench.go b/pkg/workload/ttlbench/ttlbench.go
index 320f281bae9a..cb5987b0bcfb 100644
--- a/pkg/workload/ttlbench/ttlbench.go
+++ b/pkg/workload/ttlbench/ttlbench.go
@@ -47,7 +47,8 @@ type ttlBench struct {
rowMessageLength int
expiredRowPercentage int
ttlBatchSize int
- ttlRangeConcurrency int
+ rangeMinBytes int
+ rangeMaxBytes int
}
var ttlBenchMeta = workload.Meta{
@@ -73,7 +74,8 @@ Note: Ops is a no-op and no histograms are used. Benchmarking is done inside Hoo
flags.IntVar(&g.rowMessageLength, `row-message-length`, 128, `length of row message`)
flags.IntVar(&g.expiredRowPercentage, `expired-row-percentage`, 50, `percentage of rows that are expired`)
flags.IntVar(&g.ttlBatchSize, `ttl-batch-size`, 500, `size of TTL SELECT and DELETE batches`)
- flags.IntVar(&g.ttlRangeConcurrency, `ttl-range-concurrency`, 1, `number of concurrent ranges to process per node`)
+ flags.IntVar(&g.rangeMinBytes, `range-min-bytes`, 134217728, `minimum number of bytes in range before merging`)
+ flags.IntVar(&g.rangeMaxBytes, `range-max-bytes`, 536870912, `maximum number of bytes in range before splitting`)
g.connFlags = workload.NewConnFlags(flags)
return g
},
@@ -100,7 +102,8 @@ func printJobState(ctx context.Context, db *gosql.DB) (retErr error) {
FROM crdb_internal.jobs AS crdb_j
JOIN system.jobs as sys_j ON crdb_j.job_id = sys_j.id
WHERE crdb_j.job_type = 'ROW LEVEL TTL'
- ORDER BY crdb_j.finished DESC
+ ORDER BY crdb_j.finished DESC
+ LIMIT 1;
`)
if err != nil {
return err
@@ -132,6 +135,7 @@ func printJobState(ctx context.Context, db *gosql.DB) (retErr error) {
func getLeaseholderToRangeIDsString(leaseholderToRangeIDs map[string][]string) (string, error) {
sortedLeaseholders := make([]int, len(leaseholderToRangeIDs))
i := 0
+ maxRangeIDLength := 0
for leaseholder := range leaseholderToRangeIDs {
leaseholderInt, err := strconv.Atoi(leaseholder)
if err != nil {
@@ -139,18 +143,30 @@ func getLeaseholderToRangeIDsString(leaseholderToRangeIDs map[string][]string) (
}
sortedLeaseholders[i] = leaseholderInt
i++
+ for _, rangeID := range leaseholderToRangeIDs[leaseholder] {
+ rangeIDLength := len(rangeID)
+ if rangeIDLength > maxRangeIDLength {
+ maxRangeIDLength = rangeIDLength
+ }
+ }
}
sort.Ints(sortedLeaseholders)
var sb strings.Builder
+ numLeaseholders := len(sortedLeaseholders)
+ numLeaseholdersDigits := len(strconv.Itoa(numLeaseholders))
+ leaseholderPadding := strconv.Itoa(numLeaseholdersDigits)
+ rangeIDPadding := strconv.Itoa(maxRangeIDLength)
for _, leaseholder := range sortedLeaseholders {
leaseholderString := strconv.Itoa(leaseholder)
- sb.WriteString(fmt.Sprintf("leaseholder=%-3s", leaseholderString))
- sb.WriteString(" rangeIDs=[\n")
+ sb.WriteString(fmt.Sprintf("leaseholder=%-"+leaseholderPadding+"s rangeIDs=[\n", leaseholderString))
for i, rangeID := range leaseholderToRangeIDs[leaseholderString] {
- sb.WriteString(fmt.Sprintf("%6s", rangeID))
+ sb.WriteString(fmt.Sprintf("%"+rangeIDPadding+"s ", rangeID))
const rangesPerLine = 10
- if i%rangesPerLine == rangesPerLine-1 {
+ rangeLineIdx := i % rangesPerLine
+ // Add newline after rangesPerLine ranges have been printed
+ // unless it's the last range because the newline will be printed below.
+ if rangeLineIdx == rangesPerLine-1 && rangeLineIdx != len(leaseholderToRangeIDs)-1 {
sb.WriteString("\n")
}
}
@@ -228,9 +244,16 @@ func waitForDistribution(ctx context.Context, db *gosql.DB) error {
func (t *ttlBench) Hooks() workload.Hooks {
return workload.Hooks{
- // Clear the table in case a previous run left records.
PreCreate: func(db *gosql.DB) error {
- _, err := db.Exec(fmt.Sprintf("DROP TABLE IF EXISTS %s", ttlTableName))
+ // Clear the table in case a previous run left records.
+ if _, err := db.Exec(fmt.Sprintf("DROP TABLE IF EXISTS %s", ttlTableName)); err != nil {
+ return err
+ }
+ // Configure min/max range bytes.
+ _, err := db.Exec(fmt.Sprintf(
+ "ALTER DATABASE %s CONFIGURE ZONE USING range_min_bytes = %d, range_max_bytes = %d",
+ ttlBenchMeta.Name, t.rangeMinBytes, t.rangeMaxBytes,
+ ))
return err
},
// The actual benchmarking happens here.
@@ -255,8 +278,8 @@ func (t *ttlBench) Hooks() workload.Hooks {
// Enable TTL job after records have been inserted.
ttlStatement := fmt.Sprintf(
- "ALTER TABLE %s SET (ttl_expiration_expression = 'expire_at', ttl_label_metrics = true, ttl_job_cron = '* * * * *', ttl_select_batch_size=%d, ttl_delete_batch_size=%d, ttl_range_concurrency=%d);",
- ttlTableName, t.ttlBatchSize, t.ttlBatchSize, t.ttlRangeConcurrency,
+ "ALTER TABLE %s SET (ttl_expiration_expression = 'expire_at', ttl_label_metrics = true, ttl_job_cron = '* * * * *', ttl_select_batch_size=%d, ttl_delete_batch_size=%d);",
+ ttlTableName, t.ttlBatchSize, t.ttlBatchSize,
)
_, err = db.ExecContext(ctx, ttlStatement)
if err != nil {
@@ -353,10 +376,6 @@ func (t *ttlBench) Tables() []workload.Table {
if ttlBatchSize < 0 {
panic(fmt.Sprintf("invalid ttl-batch-size %d", ttlBatchSize))
}
- ttlRangeConcurrency := t.ttlRangeConcurrency
- if ttlRangeConcurrency < 0 {
- panic(fmt.Sprintf("invalid ttl-range-concurrency %d", ttlRangeConcurrency))
- }
rowCount := int64(0)
return []workload.Table{
{