6
6
7
7
#include < ydb/core/change_exchange/change_sender.h>
8
8
#include < ydb/core/change_exchange/change_sender_monitoring.h>
9
+ #include < ydb/core/change_exchange/util.h>
9
10
#include < ydb/core/persqueue/writer/source_id_encoding.h>
10
11
#include < ydb/core/persqueue/writer/writer.h>
11
12
#include < ydb/core/tx/scheme_cache/helpers.h>
@@ -440,16 +441,6 @@ class TCdcChangeSenderMain
440
441
return false ;
441
442
}
442
443
443
- static TVector<ui64> MakePartitionIds (const TVector<NKikimr::TKeyDesc::TPartitionInfo>& partitions) {
444
- TVector<ui64> result (Reserve (partitions.size ()));
445
-
446
- for (const auto & partition : partitions) {
447
- result.push_back (partition.ShardId );
448
- }
449
-
450
- return result;
451
- }
452
-
453
444
// / ResolveCdcStream
454
445
455
446
void ResolveCdcStream () {
@@ -571,6 +562,14 @@ class TCdcChangeSenderMain
571
562
return ;
572
563
}
573
564
565
+ const auto topicVersion = entry.Self ->Info .GetVersion ().GetGeneralVersion ();
566
+ if (TopicVersion && TopicVersion == topicVersion) {
567
+ CreateSenders ();
568
+ return Become (&TThis::StateMain);
569
+ }
570
+
571
+ TopicVersion = topicVersion;
572
+
574
573
const auto & pqDesc = entry.PQGroupInfo ->Description ;
575
574
const auto & pqConfig = pqDesc.GetPQTabletConfig ();
576
575
@@ -579,12 +578,7 @@ class TCdcChangeSenderMain
579
578
PartitionToShard.emplace (partition.GetPartitionId (), partition.GetTabletId ());
580
579
}
581
580
582
- const auto topicVersion = entry.Self ->Info .GetVersion ().GetGeneralVersion ();
583
- const bool versionChanged = !TopicVersion || TopicVersion != topicVersion;
584
- TopicVersion = topicVersion;
585
-
586
- auto topicAutoPartitioning = ::NKikimrPQ::TPQTabletConfig::TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED != pqConfig.GetPartitionStrategy ().GetPartitionStrategyType ();
587
-
581
+ const bool topicAutoPartitioning = IsTopicAutoPartitioningEnabled (pqConfig.GetPartitionStrategy ().GetPartitionStrategyType ());
588
582
Y_ABORT_UNLESS (topicAutoPartitioning || entry.PQGroupInfo ->Schema );
589
583
KeyDesc = NKikimr::TKeyDesc::CreateMiniKeyDesc (entry.PQGroupInfo ->Schema );
590
584
Y_ABORT_UNLESS (entry.PQGroupInfo ->Partitioning );
@@ -598,10 +592,14 @@ class TCdcChangeSenderMain
598
592
SetPartitionResolver (new TMd5PartitionResolver (KeyDesc->GetPartitions ().size ()));
599
593
}
600
594
601
- CreateSenders (MakePartitionIds (*KeyDesc->Partitioning ), versionChanged );
595
+ CreateSenders (NChangeExchange:: MakePartitionIds (*KeyDesc->Partitioning ));
602
596
Become (&TThis::StateMain);
603
597
}
604
598
599
+ static bool IsTopicAutoPartitioningEnabled (NKikimrPQ::TPQTabletConfig::TPartitionStrategyType strategy) {
600
+ return strategy != NKikimrPQ::TPQTabletConfig::TPartitionStrategyType::TPQTabletConfig_TPartitionStrategyType_DISABLED;
601
+ }
602
+
605
603
// / Main
606
604
607
605
STATEFN (StateMain) {
0 commit comments