Skip to content

Commit 048869b

Browse files
[*] учитываются только основные партиции
1 parent d2b1192 commit 048869b

File tree

2 files changed

+53
-24
lines changed

2 files changed

+53
-24
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -695,7 +695,7 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
695695
}
696696

697697
// in order to answer only after all parts are ready to work
698-
Y_ABORT_UNLESS(ConfigInited && AllPartitionsInited());
698+
Y_ABORT_UNLESS(ConfigInited && AllOriginalPartitionsInited());
699699

700700
ApplyNewConfig(NewConfig, ctx);
701701
ClearNewConfig();
@@ -708,11 +708,12 @@ void TPersQueue::ApplyNewConfigAndReply(const TActorContext& ctx)
708708
for (const auto& partition : Config.GetPartitions()) {
709709
const TPartitionId partitionId(partition.GetPartitionId());
710710
if (Partitions.find(partitionId) == Partitions.end()) {
711-
Partitions.emplace(partitionId,
712-
TPartitionInfo(ctx.Register(CreatePartitionActor(partitionId, TopicConverter, Config, true, ctx)),
713-
GetPartitionKeyRange(Config, partition),
714-
*Counters
715-
));
711+
CreateOriginalPartition(Config,
712+
partition,
713+
TopicConverter,
714+
partitionId,
715+
true,
716+
ctx);
716717
}
717718
}
718719

@@ -791,7 +792,7 @@ void TPersQueue::EndWriteConfig(const NKikimrClient::TResponse& resp, const TAct
791792

792793
Y_ABORT_UNLESS(resp.WriteResultSize() >= 1);
793794
Y_ABORT_UNLESS(resp.GetWriteResult(0).GetStatus() == NKikimrProto::OK);
794-
if (ConfigInited && AllPartitionsInited()) //all partitions are working well - can apply new config
795+
if (ConfigInited && AllOriginalPartitionsInited()) //all partitions are working well - can apply new config
795796
ApplyNewConfigAndReply(ctx);
796797
else
797798
NewConfigShouldBeApplied = true; //when config will be inited with old value new config will be applied
@@ -895,6 +896,26 @@ void TPersQueue::ReadTxWrites(const NKikimrClient::TKeyValueResponse::TReadResul
895896
}
896897
}
897898

899+
void TPersQueue::CreateOriginalPartition(const NKikimrPQ::TPQTabletConfig& config,
900+
const NKikimrPQ::TPQTabletConfig::TPartition& partition,
901+
NPersQueue::TTopicConverterPtr topicConverter,
902+
const TPartitionId& partitionId,
903+
bool newPartition,
904+
const TActorContext& ctx)
905+
{
906+
TActorId actorId = ctx.Register(CreatePartitionActor(partitionId,
907+
topicConverter,
908+
config,
909+
newPartition,
910+
ctx));
911+
Partitions.emplace(std::piecewise_construct,
912+
std::forward_as_tuple(partitionId),
913+
std::forward_as_tuple(actorId,
914+
GetPartitionKeyRange(config, partition),
915+
*Counters));
916+
++OriginalPartitionsCount;
917+
}
918+
898919
void TPersQueue::AddSupportivePartition(const TPartitionId& partitionId)
899920
{
900921
Partitions.emplace(partitionId,
@@ -1007,11 +1028,12 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
10071028

10081029
for (const auto& partition : Config.GetPartitions()) { // no partitions will be created with empty config
10091030
const TPartitionId partitionId(partition.GetPartitionId());
1010-
Partitions.emplace(partitionId, TPartitionInfo(
1011-
ctx.Register(CreatePartitionActor(partitionId, TopicConverter, Config, false, ctx)),
1012-
GetPartitionKeyRange(Config, partition),
1013-
*Counters
1014-
));
1031+
CreateOriginalPartition(Config,
1032+
partition,
1033+
TopicConverter,
1034+
partitionId,
1035+
false,
1036+
ctx);
10151037
}
10161038

10171039
ConfigInited = true;
@@ -1334,9 +1356,9 @@ void TPersQueue::Handle(TEvPQ::TEvTabletCacheCounters::TPtr& ev, const TActorCon
13341356
<< "Counters. CacheSize " << CacheCounters.CacheSizeBytes << " CachedBlobs " << CacheCounters.CacheSizeBlobs);
13351357
}
13361358

1337-
bool TPersQueue::AllPartitionsInited() const
1359+
bool TPersQueue::AllOriginalPartitionsInited() const
13381360
{
1339-
return PartitionsInited == Partitions.size();
1361+
return PartitionsInited == OriginalPartitionsCount;
13401362
}
13411363

13421364
void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& ctx)
@@ -1359,7 +1381,7 @@ void TPersQueue::Handle(TEvPQ::TEvInitComplete::TPtr& ev, const TActorContext& c
13591381
++PartitionsInited;
13601382
Y_ABORT_UNLESS(ConfigInited);//partitions are inited only after config
13611383

1362-
auto allInitialized = AllPartitionsInited();
1384+
auto allInitialized = AllOriginalPartitionsInited();
13631385
if (!InitCompleted && allInitialized) {
13641386
OnInitComplete(ctx);
13651387
}
@@ -2884,6 +2906,7 @@ TPersQueue::TPersQueue(const TActorId& tablet, TTabletStorageInfo *info)
28842906
: TKeyValueFlat(tablet, info)
28852907
, ConfigInited(false)
28862908
, PartitionsInited(0)
2909+
, OriginalPartitionsCount(0)
28872910
, NewConfigShouldBeApplied(false)
28882911
, TabletState(NKikimrPQ::ENormal)
28892912
, Counters(nullptr)
@@ -4085,7 +4108,7 @@ void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config,
40854108
{
40864109
EnsurePartitionsAreNotDeleted(config);
40874110

4088-
Y_ABORT_UNLESS(ConfigInited && AllPartitionsInited());
4111+
Y_ABORT_UNLESS(ConfigInited && AllOriginalPartitionsInited());
40894112

40904113
if (!config.PartitionsSize()) {
40914114
for (const auto partitionId : config.GetPartitionIds()) {
@@ -4099,13 +4122,12 @@ void TPersQueue::CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config,
40994122
continue;
41004123
}
41014124

4102-
TActorId actorId = ctx.Register(CreatePartitionActor(partitionId, topicConverter, config, true, ctx));
4103-
4104-
Partitions.emplace(std::piecewise_construct,
4105-
std::forward_as_tuple(partitionId),
4106-
std::forward_as_tuple(actorId,
4107-
GetPartitionKeyRange(config, partition),
4108-
*Counters));
4125+
CreateOriginalPartition(config,
4126+
partition,
4127+
topicConverter,
4128+
partitionId,
4129+
true,
4130+
ctx);
41094131
}
41104132
}
41114133

ydb/core/persqueue/pq_impl.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
189189
private:
190190
bool ConfigInited;
191191
ui32 PartitionsInited;
192+
ui32 OriginalPartitionsCount;
192193
bool InitCompleted = false;
193194
THashMap<TPartitionId, TPartitionInfo> Partitions;
194195
THashMap<TString, TIntrusivePtr<TEvTabletCounters::TInFlightCookie>> CounterEventsInflight;
@@ -355,6 +356,12 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
355356
void CreateNewPartitions(NKikimrPQ::TPQTabletConfig& config,
356357
NPersQueue::TTopicConverterPtr topicConverter,
357358
const TActorContext& ctx);
359+
void CreateOriginalPartition(const NKikimrPQ::TPQTabletConfig& config,
360+
const NKikimrPQ::TPQTabletConfig::TPartition& partition,
361+
NPersQueue::TTopicConverterPtr topicConverter,
362+
const TPartitionId& partitionId,
363+
bool newPartition,
364+
const TActorContext& ctx);
358365
void EnsurePartitionsAreNotDeleted(const NKikimrPQ::TPQTabletConfig& config) const;
359366

360367
void BeginWriteConfig(const NKikimrPQ::TPQTabletConfig& cfg,
@@ -458,7 +465,7 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
458465
void CreateSupportivePartitionActor(const TPartitionId& shadowPartitionId, const TActorContext& ctx);
459466
void SubscribeWriteId(ui64 writeId, const TActorContext& ctx);
460467

461-
bool AllPartitionsInited() const;
468+
bool AllOriginalPartitionsInited() const;
462469

463470
void Handle(NLongTxService::TEvLongTxService::TEvLockStatus::TPtr& ev, const TActorContext& ctx);
464471
};

0 commit comments

Comments
 (0)