Skip to content

Commit e562a67

Browse files
authored
CDC initial scan settings (#10536)
1 parent 997b273 commit e562a67

File tree

6 files changed

+42
-2
lines changed

6 files changed

+42
-2
lines changed

ydb/core/protos/config.proto

+12-1
Original file line numberDiff line numberDiff line change
@@ -1194,6 +1194,17 @@ message TImmediateControlsConfig {
11941194
MinValue: 0,
11951195
MaxValue: 134217728,
11961196
DefaultValue: 0 }];
1197+
1198+
optional uint64 CdcInitialScanReadAheadLo = 19 [(ControlOptions) = {
1199+
Description: "Override for CDC initial scan readahead (low watermark)",
1200+
MinValue: 0,
1201+
MaxValue: 67108864,
1202+
DefaultValue: 0 }];
1203+
optional uint64 CdcInitialScanReadAheadHi = 20 [(ControlOptions) = {
1204+
Description: "Override for CDC initial scan readahead (high watermark)",
1205+
MinValue: 0,
1206+
MaxValue: 134217728,
1207+
DefaultValue: 0 }];
11971208
}
11981209

11991210
message TTxLimitControls {
@@ -1658,7 +1669,7 @@ message TSchemeShardConfig {
16581669
repeated TInFlightCounterConfig InFlightCounterConfig = 4;
16591670

16601671
// number of shards per table
1661-
optional uint32 MaxCdcInitialScanShardsInFlight = 5 [default = 10];
1672+
optional uint32 MaxCdcInitialScanShardsInFlight = 5 [default = 32];
16621673
}
16631674

16641675
message TCompactionConfig {

ydb/core/protos/datashard_config.proto

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ message TDataShardConfig {
2020
optional uint64 RestoreReadBufferSizeLimit = 16 [default = 268435456]; // 256 MB
2121
optional string CdcInitialScanTaskName = 17 [default = "cdc_initial_scan"];
2222
optional uint32 CdcInitialScanTaskPriority = 18 [default = 10];
23+
optional uint64 CdcInitialScanReadAheadLo = 22 [default = 524288];
24+
optional uint64 CdcInitialScanReadAheadHi = 23 [default = 1048576];
2325
optional bool DisabledOnSchemeShard = 19 [default = false];
2426
optional uint64 IncrementalRestoreReadAheadLo = 20 [default = 524288];
2527
optional uint64 IncrementalRestoreReadAheadHi = 21 [default = 1048576];

ydb/core/tablet/resource_broker.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -1423,7 +1423,7 @@ NKikimrResourceBroker::TResourceBrokerConfig MakeDefaultConfig()
14231423
queue = config.AddQueues();
14241424
queue->SetName("queue_cdc_initial_scan");
14251425
queue->SetWeight(100);
1426-
queue->MutableLimit()->SetCpu(4);
1426+
queue->MutableLimit()->SetCpu(2);
14271427

14281428
queue = config.AddQueues();
14291429
queue->SetName("queue_statistics_scan");

ydb/core/tx/datashard/cdc_stream_scan.cpp

+11
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,16 @@ class TDataShard::TTxCdcStreamScanRun: public TTransactionBase<TDataShard> {
661661
const auto& taskName = appData->DataShardConfig.GetCdcInitialScanTaskName();
662662
const auto taskPrio = appData->DataShardConfig.GetCdcInitialScanTaskPriority();
663663

664+
ui64 readAheadLo = appData->DataShardConfig.GetCdcInitialScanReadAheadLo();
665+
if (ui64 readAheadLoOverride = Self->GetCdcInitialScanReadAheadLoOverride(); readAheadLoOverride > 0) {
666+
readAheadLo = readAheadLoOverride;
667+
}
668+
669+
ui64 readAheadHi = appData->DataShardConfig.GetCdcInitialScanReadAheadHi();
670+
if (ui64 readAheadHiOverride = Self->GetCdcInitialScanReadAheadHiOverride(); readAheadHiOverride > 0) {
671+
readAheadHi = readAheadHiOverride;
672+
}
673+
664674
const auto snapshotVersion = TRowVersion(snapshotKey.Step, snapshotKey.TxId);
665675
Y_ABORT_UNLESS(info->SnapshotVersion == snapshotVersion);
666676

@@ -673,6 +683,7 @@ class TDataShard::TTxCdcStreamScanRun: public TTransactionBase<TDataShard> {
673683
const ui64 scanId = Self->QueueScan(table->LocalTid, scan.Release(), localTxId,
674684
TScanOptions()
675685
.SetResourceBroker(taskName, taskPrio)
686+
.SetReadAhead(readAheadLo, readAheadHi)
676687
.SetSnapshotRowVersion(snapshotVersion)
677688
);
678689
Self->CdcStreamScanManager.Enqueue(streamPathId, localTxId, scanId);

ydb/core/tx/datashard/datashard.cpp

+5
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,8 @@ TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info)
151151
, TtlReadAheadHi(0, 0, 128*1024*1024)
152152
, IncrementalRestoreReadAheadLo(0, 0, 64*1024*1024)
153153
, IncrementalRestoreReadAheadHi(0, 0, 128*1024*1024)
154+
, CdcInitialScanReadAheadLo(0, 0, 64*1024*1024)
155+
, CdcInitialScanReadAheadHi(0, 0, 128*1024*1024)
154156
, EnableLockedWrites(1, 0, 1)
155157
, MaxLockedWritesPerKey(1000, 0, 1000000)
156158
, EnableLeaderLeases(1, 0, 1)
@@ -328,6 +330,9 @@ void TDataShard::IcbRegister() {
328330
appData->Icb->RegisterSharedControl(IncrementalRestoreReadAheadLo, "DataShardControls.IncrementalRestoreReadAheadLo");
329331
appData->Icb->RegisterSharedControl(IncrementalRestoreReadAheadHi, "DataShardControls.IncrementalRestoreReadAheadHi");
330332

333+
appData->Icb->RegisterSharedControl(CdcInitialScanReadAheadLo, "DataShardControls.CdcInitialScanReadAheadLo");
334+
appData->Icb->RegisterSharedControl(CdcInitialScanReadAheadHi, "DataShardControls.CdcInitialScanReadAheadHi");
335+
331336
IcbRegistered = true;
332337
}
333338
}

ydb/core/tx/datashard/datashard_impl.h

+11
Original file line numberDiff line numberDiff line change
@@ -1715,6 +1715,14 @@ class TDataShard
17151715
return IncrementalRestoreReadAheadHi;
17161716
}
17171717

1718+
ui64 GetCdcInitialScanReadAheadLoOverride() const {
1719+
return CdcInitialScanReadAheadLo;
1720+
}
1721+
1722+
ui64 GetCdcInitialScanReadAheadHiOverride() const {
1723+
return CdcInitialScanReadAheadHi;
1724+
}
1725+
17181726
bool GetEnableLockedWrites() const {
17191727
ui64 value = EnableLockedWrites;
17201728
return value != 0;
@@ -2744,6 +2752,9 @@ class TDataShard
27442752
TControlWrapper IncrementalRestoreReadAheadLo;
27452753
TControlWrapper IncrementalRestoreReadAheadHi;
27462754

2755+
TControlWrapper CdcInitialScanReadAheadLo;
2756+
TControlWrapper CdcInitialScanReadAheadHi;
2757+
27472758
TControlWrapper EnableLockedWrites;
27482759
TControlWrapper MaxLockedWritesPerKey;
27492760

0 commit comments

Comments
 (0)