@@ -2037,7 +2037,8 @@ bool TPartition::ExecUserActionOrTransaction(TSimpleSharedPtr<TTransaction>& t,
20372037 } else if (t->ProposeConfig ) {
20382038 Y_ABORT_UNLESS (ChangingConfig);
20392039 ChangeConfig = MakeSimpleShared<TEvPQ::TEvChangePartitionConfig>(TopicConverter,
2040- t->ProposeConfig ->Config );
2040+ t->ProposeConfig ->Config ,
2041+ t->ProposeConfig ->BootstrapConfig );
20412042 PendingPartitionConfig = GetPartitionConfig (ChangeConfig->Config );
20422043 SendChangeConfigReply = false ;
20432044 }
@@ -2123,7 +2124,8 @@ bool TPartition::BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event)
21232124{
21242125 ChangeConfig =
21252126 MakeSimpleShared<TEvPQ::TEvChangePartitionConfig>(TopicConverter,
2126- event.Config );
2127+ event.Config ,
2128+ event.BootstrapConfig );
21272129 PendingPartitionConfig = GetPartitionConfig (ChangeConfig->Config );
21282130
21292131 SendChangeConfigReply = false ;
@@ -2360,6 +2362,7 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx)
23602362
23612363 if (ChangeConfig) {
23622364 EndChangePartitionConfig (std::move (ChangeConfig->Config ),
2365+ std::move (ChangeConfig->BootstrapConfig ),
23632366 ChangeConfig->TopicConverter ,
23642367 ctx);
23652368 }
@@ -2426,12 +2429,24 @@ void TPartition::OnProcessTxsAndUserActsWriteComplete(const TActorContext& ctx)
24262429}
24272430
24282431void TPartition::EndChangePartitionConfig (NKikimrPQ::TPQTabletConfig&& config,
2432+ NKikimrPQ::TBootstrapConfig&& bootstrapConfig,
24292433 NPersQueue::TTopicConverterPtr topicConverter,
24302434 const TActorContext& ctx)
24312435{
24322436 Config = std::move (config);
24332437 PartitionConfig = GetPartitionConfig (Config);
24342438 PartitionGraph = MakePartitionGraph (Config);
2439+
2440+ for (const auto & mg : bootstrapConfig.GetExplicitMessageGroups ()) {
2441+ TMaybe<TPartitionKeyRange> keyRange;
2442+ if (mg.HasKeyRange ()) {
2443+ keyRange = TPartitionKeyRange::Parse (mg.GetKeyRange ());
2444+ }
2445+
2446+ TSourceIdInfo sourceId (0 , 0 , ctx.Now (), std::move (keyRange), false );
2447+ SourceIdStorage.RegisterSourceIdInfo (mg.GetId (), std::move (sourceId), true );
2448+ }
2449+
24352450 TopicConverter = topicConverter;
24362451 NewPartition = false ;
24372452
@@ -2441,14 +2456,15 @@ void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config,
24412456 InitSplitMergeSlidingWindow ();
24422457 }
24432458
2444- Send (ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig (TopicConverter, Config));
2445- Send (WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig (TopicConverter, Config));
2459+ Send (ReadQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig (TopicConverter, Config, bootstrapConfig ));
2460+ Send (WriteQuotaTrackerActor, new TEvPQ::TEvChangePartitionConfig (TopicConverter, Config, bootstrapConfig ));
24462461 TotalPartitionWriteSpeed = config.GetPartitionConfig ().GetWriteSpeedInBytesPerSecond ();
24472462
24482463 if (Config.GetPartitionConfig ().HasMirrorFrom ()) {
24492464 if (Mirrorer) {
24502465 ctx.Send (Mirrorer->Actor , new TEvPQ::TEvChangePartitionConfig (TopicConverter,
2451- Config));
2466+ Config,
2467+ bootstrapConfig));
24522468 } else {
24532469 CreateMirrorerActor ();
24542470 }
0 commit comments