Skip to content

Commit 23be8b4

Browse files
committed
Configurable MaxCdcInitialScanShardsInFlight
1 parent 0bcb6b0 commit 23be8b4

File tree

5 files changed

+8
-3
lines changed

5 files changed

+8
-3
lines changed

ydb/core/protos/config.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1531,6 +1531,9 @@ message TSchemeShardConfig {
15311531
optional uint32 StatsMaxExecuteMs = 3 [default = 10];
15321532

15331533
repeated TInFlightCounterConfig InFlightCounterConfig = 4;
1534+
1535+
// number of shards per table
1536+
optional uint32 MaxCdcInitialScanShardsInFlight = 5 [default = 10];
15341537
}
15351538

15361539
message TCompactionConfig {

ydb/core/tx/schemeshard/schemeshard_cdc_stream_scan.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ struct TSchemeShard::TCdcStreamScan::TTxProgress: public TTransactionBase<TSchem
172172
}
173173

174174
while (!streamInfo->PendingShards.empty()) {
175-
if (streamInfo->InProgressShards.size() >= streamInfo->MaxInProgressShards) {
175+
if (streamInfo->InProgressShards.size() >= Self->MaxCdcInitialScanShardsInFlight) {
176176
break;
177177
}
178178

ydb/core/tx/schemeshard/schemeshard_impl.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4450,6 +4450,7 @@ void TSchemeShard::OnActivateExecutor(const TActorContext &ctx) {
44504450
ConfigureCompactionQueues(appData->CompactionConfig, ctx);
44514451
ConfigureStatsBatching(appData->SchemeShardConfig, ctx);
44524452
ConfigureStatsOperations(appData->SchemeShardConfig, ctx);
4453+
MaxCdcInitialScanShardsInFlight = appData->SchemeShardConfig.GetMaxCdcInitialScanShardsInFlight();
44534454

44544455
ConfigureBackgroundCleaningQueue(appData->BackgroundCleaningConfig, ctx);
44554456

@@ -6951,6 +6952,7 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi
69516952
const auto& schemeShardConfig = appConfig.GetSchemeShardConfig();
69526953
ConfigureStatsBatching(schemeShardConfig, ctx);
69536954
ConfigureStatsOperations(schemeShardConfig, ctx);
6955+
MaxCdcInitialScanShardsInFlight = schemeShardConfig.GetMaxCdcInitialScanShardsInFlight();
69546956
}
69556957

69566958
if (appConfig.HasTableProfilesConfig()) {

ydb/core/tx/schemeshard/schemeshard_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,9 @@ class TSchemeShard
338338
TActorId SysPartitionStatsCollector;
339339

340340
TActorId TabletMigrator;
341+
341342
TActorId CdcStreamScanFinalizer;
343+
ui32 MaxCdcInitialScanShardsInFlight = 10;
342344

343345
TDuration StatsMaxExecuteTime;
344346
TDuration StatsBatchTimeout;

ydb/core/tx/schemeshard/schemeshard_info_types.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2395,8 +2395,6 @@ struct TCdcStreamInfo : public TSimpleRefCount<TCdcStreamInfo> {
23952395
{}
23962396
};
23972397

2398-
static constexpr ui32 MaxInProgressShards = 10;
2399-
24002398
TCdcStreamInfo(ui64 version, EMode mode, EFormat format, bool vt, const TDuration& rt, const TString& awsRegion, EState state)
24012399
: AlterVersion(version)
24022400
, Mode(mode)

0 commit comments

Comments
 (0)