@@ -2037,7 +2037,8 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& t,
2037
2037
} else if (t->ProposeConfig ) {
2038
2038
Y_ABORT_UNLESS (ChangingConfig);
2039
2039
ChangeConfig = MakeSimpleShared<TEvPQ::TEvChangePartitionConfig>(TopicConverter,
2040
- t->ProposeConfig ->Config );
2040
+ t->ProposeConfig ->Config ,
2041
+ t->ProposeConfig ->BootstrapConfig );
2041
2042
PendingPartitionConfig = GetPartitionConfig (ChangeConfig->Config );
2042
2043
SendChangeConfigReply = false ;
2043
2044
}
@@ -2123,7 +2124,8 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event)
2123
2124
{
2124
2125
ChangeConfig =
2125
2126
MakeSimpleShared<TEvPQ::TEvChangePartitionConfig>(TopicConverter,
2126
- event.Config );
2127
+ event.Config ,
2128
+ event.BootstrapConfig );
2127
2129
PendingPartitionConfig = GetPartitionConfig (ChangeConfig->Config );
2128
2130
2129
2131
SendChangeConfigReply = false ;
@@ -2360,6 +2362,7 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx)
2360
2362
2361
2363
if (ChangeConfig) {
2362
2364
EndChangePartitionConfig (std::move (ChangeConfig->Config ),
2365
+ std::move (ChangeConfig->BootstrapConfig ),
2363
2366
ChangeConfig->TopicConverter ,
2364
2367
ctx);
2365
2368
}
@@ -2426,12 +2429,24 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx)
2426
2429
}
2427
2430
2428
2431
void TPartition::EndChangePartitionConfig (NKikimrPQ::TPQTabletConfig&& config,
2432
+ NKikimrPQ::TBootstrapConfig&& bootstrapConfig,
2429
2433
NPersQueue::TTopicConverterPtr topicConverter,
2430
2434
const TActorContext& ctx)
2431
2435
{
2432
2436
Config = std::move (config);
2433
2437
PartitionConfig = GetPartitionConfig (Config);
2434
2438
PartitionGraph = MakePartitionGraph (Config);
2439
+
2440
+ for (const auto & mg : bootstrapConfig.GetExplicitMessageGroups ()) {
2441
+ TMaybe<TPartitionKeyRange> keyRange;
2442
+ if (mg.HasKeyRange ()) {
2443
+ keyRange = TPartitionKeyRange::Parse (mg.GetKeyRange ());
2444
+ }
2445
+
2446
+ TSourceIdInfo sourceId (0 , 0 , ctx.Now (), std::move (keyRange), false );
2447
+ SourceIdStorage.RegisterSourceIdInfo (mg.GetId (), std::move (sourceId), true );
2448
+ }
2449
+
2435
2450
TopicConverter = topicConverter;
2436
2451
NewPartition = false ;
2437
2452
@@ -2441,14 +2456,15 @@ void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config,
2441
2456
InitSplitMergeSlidingWindow ();
2442
2457
}
2443
2458
2444
- Send (ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig (TopicConverter, Config));
2445
- Send (WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig (TopicConverter, Config));
2459
+ Send (ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig (TopicConverter, Config, bootstrapConfig ));
2460
+ Send (WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig (TopicConverter, Config, bootstrapConfig ));
2446
2461
TotalPartitionWriteSpeed = config.GetPartitionConfig ().GetWriteSpeedInBytesPerSecond ();
2447
2462
2448
2463
if (Config.GetPartitionConfig ().HasMirrorFrom ()) {
2449
2464
if (Mirrorer) {
2450
2465
ctx.Send (Mirrorer->Actor , new TEvPQ::TEvChangePartitionConfig (TopicConverter,
2451
- Config));
2466
+ Config,
2467
+ bootstrapConfig));
2452
2468
} else {
2453
2469
CreateMirrorerActor ();
2454
2470
}
0 commit comments