Skip to content

Commit c23f76b

Browse files
authored
Merge a021668 into ac3fad4
2 parents ac3fad4 + a021668 commit c23f76b

34 files changed

+2543
-56
lines changed

ydb/core/persqueue/actor_persqueue_client_iface.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ class IPersQueueMirrorReaderFactory {
5252
TMaybe<TLog> logger = Nothing()
5353
) const = 0;
5454

55+
virtual NThreading::TFuture<NYdb::NTopic::TDescribeTopicResult> GetTopicDescription(
56+
const NKikimrPQ::TMirrorPartitionConfig& config,
57+
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory
58+
) const = 0;
59+
5560
virtual ~IPersQueueMirrorReaderFactory() = default;
5661

5762
TDeferredActorLogBackend::TSharedAtomicActorSystemPtr GetSharedActorSystem() const {
@@ -105,6 +110,7 @@ class TPersQueueMirrorReaderFactory : public IPersQueueMirrorReaderFactory {
105110
.ConsumerName(config.GetConsumer())
106111
.MaxMemoryUsageBytes(maxMemoryUsageBytes)
107112
.Decompress(false)
113+
.AutoPartitioningSupport(true)
108114
.RetryPolicy(NYdb::NTopic::IRetryPolicy::GetNoRetryPolicy());
109115
if (logger) {
110116
settings.Log(logger.GetRef());
@@ -119,6 +125,22 @@ class TPersQueueMirrorReaderFactory : public IPersQueueMirrorReaderFactory {
119125
NYdb::NTopic::TTopicClient topicClient(*Driver, clientSettings);
120126
return topicClient.CreateReadSession(settings);
121127
}
128+
129+
NThreading::TFuture<NYdb::NTopic::TDescribeTopicResult> GetTopicDescription(
130+
const NKikimrPQ::TMirrorPartitionConfig& config,
131+
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory
132+
) const override {
133+
NYdb::NTopic::TTopicClientSettings clientSettings = NYdb::NTopic::TTopicClientSettings()
134+
.DiscoveryEndpoint(TStringBuilder() << config.GetEndpoint() << ":" << config.GetEndpointPort())
135+
.DiscoveryMode(NYdb::EDiscoveryMode::Async)
136+
.CredentialsProviderFactory(credentialsProviderFactory)
137+
.SslCredentials(NYdb::TSslCredentials(config.GetUseSecureConnection()));
138+
if (config.HasDatabase()) {
139+
clientSettings.Database(config.GetDatabase());
140+
}
141+
NYdb::NTopic::TTopicClient topicClient(*Driver, clientSettings);
142+
return topicClient.DescribeTopic(config.GetTopic());
143+
}
122144
};
123145

124146
} // namespace NKikimr::NSQS

ydb/core/persqueue/events/internal.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
#include <ydb/library/actors/core/actorid.h>
2121
#include <ydb/core/grpc_services/rpc_calls.h>
2222
#include <ydb/public/api/protos/persqueue_error_codes_v1.pb.h>
23+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h>
2324
#include <util/generic/maybe.h>
25+
#include <expected>
2426

2527
namespace NYdb::inline Dev {
2628
class ICredentialsProviderFactory;
@@ -204,6 +206,7 @@ struct TEvPQ {
204206
EvExclusiveLockAcquired,
205207
EvReleaseExclusiveLock,
206208
EvRunCompaction,
209+
EvMirrorTopicDescription,
207210
EvEnd
208211
};
209212

@@ -771,6 +774,21 @@ struct TEvPQ {
771774
NKikimr::TTabletCountersBase Counters;
772775
};
773776

777+
struct TEvMirrorTopicDescription : public TEventLocal<TEvMirrorTopicDescription, EvMirrorTopicDescription> {
778+
TEvMirrorTopicDescription(NYdb::NTopic::TDescribeTopicResult description)
779+
: Description(std::move(description))
780+
{
781+
}
782+
783+
TEvMirrorTopicDescription(TString error)
784+
: Description(std::unexpected(std::move(error)))
785+
{
786+
}
787+
788+
std::expected<NYdb::NTopic::TDescribeTopicResult, TString> Description;
789+
};
790+
791+
774792
struct TEvRetryWrite : public TEventLocal<TEvRetryWrite, EvRetryWrite> {
775793
TEvRetryWrite()
776794
{}

ydb/core/persqueue/mirrorer.cpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,40 @@ void TMirrorer::TryToWrite(const TActorContext& ctx) {
373373
WriteRequestTimestamp = ctx.Now();
374374
}
375375

376+
void TMirrorer::TryToSplitMerge(const TActorContext& ctx) {
377+
if (!EndPartitionSessionEvent) {
378+
return;
379+
}
380+
if (!AppData(ctx)->FeatureFlags.GetEnableMirroredTopicSplitMerge()) {
381+
return;
382+
}
383+
if (WriteRequestInFlight || !Queue.empty()) {
384+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " postpone split-merge event until all write operations completed");
385+
return;
386+
}
387+
const bool isSplit = EndPartitionSessionEvent->GetAdjacentPartitionIds().empty();
388+
if (!isSplit) {
389+
LOG_WARN_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " topic merge not supported yet.");
390+
return;
391+
}
392+
if (EndPartitionSessionEvent->GetChildPartitionIds().empty()) {
393+
LOG_WARN_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " split-merge operation has no child partitions");
394+
return;
395+
}
396+
const ::NKikimrPQ::EScaleStatus value = isSplit ? NKikimrPQ::EScaleStatus::NEED_SPLIT : NKikimrPQ::EScaleStatus::NEED_MERGE;
397+
THolder request = MakeHolder<TEvPQ::TEvPartitionScaleStatusChanged>();
398+
request->Record.SetPartitionId(Partition);
399+
request->Record.SetScaleStatus(value);
400+
auto* relation = request->Record.MutableParticipatingPartitions();
401+
for (const auto& p : EndPartitionSessionEvent->GetChildPartitionIds()) {
402+
relation->AddChildPartitionIds(p);
403+
}
404+
for (const auto& p : EndPartitionSessionEvent->GetAdjacentPartitionIds()) {
405+
relation->AddAdjacentPartitionIds(p);
406+
}
407+
Send(PartitionActor, std::move(request));
408+
EndPartitionSessionEvent = std::nullopt;
409+
}
376410

377411
void TMirrorer::HandleInitCredentials(TEvPQ::TEvInitCredentials::TPtr& /*ev*/, const TActorContext& ctx) {
378412
if (CredentialsRequestInFlight) {
@@ -440,6 +474,7 @@ void TMirrorer::HandleRetryWrite(TEvPQ::TEvRetryWrite::TPtr& /*ev*/, const TActo
440474
void TMirrorer::HandleWakeup(const TActorContext& ctx) {
441475
TryToRead(ctx);
442476
TryToWrite(ctx);
477+
TryToSplitMerge(ctx);
443478
}
444479

445480
void TMirrorer::CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr&, const TActorContext& ctx) {
@@ -536,6 +571,7 @@ void TMirrorer::ScheduleConsumerCreation(const TActorContext& ctx) {
536571
ReadSession->Close(TDuration::Zero());
537572
ReadSession = nullptr;
538573
PartitionStream = nullptr;
574+
EndPartitionSessionEvent = std::nullopt;
539575
ReadFuturesInFlight = 0;
540576
ReadFeatures.clear();
541577
WaitNextReaderEventInFlight = false;
@@ -676,6 +712,14 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup)
676712
ProcessError(ctx, TStringBuilder() << " read session closed: " << closeSessionEvent->DebugString());
677713
ScheduleConsumerCreation(ctx);
678714
return;
715+
} else if (auto* endPartitionSessionEvent = std::get_if<TPersQueueReadEvent::TEndPartitionSessionEvent>(&event.value())) {
716+
LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription()
717+
<< " got end partion session event: " << endPartitionSessionEvent->DebugString());
718+
if (EndPartitionSessionEvent.has_value()) {
719+
LOG_WARN_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " already has end partition session event");
720+
EndPartitionSessionEvent.reset();
721+
}
722+
EndPartitionSessionEvent = *endPartitionSessionEvent;
679723
} else {
680724
ProcessError(ctx, TStringBuilder() << " got unmatched event: " << event.value().index());
681725
ScheduleConsumerCreation(ctx);

ydb/core/persqueue/mirrorer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ class TMirrorer : public TActorBootstrapped<TMirrorer> {
137137
void HandleChangeConfig(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorContext& ctx);
138138
void TryToRead(const TActorContext& ctx);
139139
void TryToWrite(const TActorContext& ctx);
140+
void TryToSplitMerge(const TActorContext& ctx);
140141
void HandleInitCredentials(TEvPQ::TEvInitCredentials::TPtr& ev, const TActorContext& ctx);
141142
void HandleCredentialsCreated(TEvPQ::TEvCredentialsCreated::TPtr& ev, const TActorContext& ctx);
142143
void HandleRetryWrite(TEvPQ::TEvRetryWrite::TPtr& ev, const TActorContext& ctx);
@@ -164,6 +165,7 @@ class TMirrorer : public TActorBootstrapped<TMirrorer> {
164165
TDeque<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage> WriteInFlight;
165166
ui64 BytesInFlight = 0;
166167
std::optional<NKikimrClient::TPersQueuePartitionRequest> WriteRequestInFlight;
168+
std::optional<NYdb::NTopic::TReadSessionEvent::TEndPartitionSessionEvent> EndPartitionSessionEvent;
167169
TDuration WriteRetryTimeout = WRITE_RETRY_TIMEOUT_START;
168170
TInstant WriteRequestTimestamp;
169171
NYdb::TCredentialsProviderFactoryPtr CredentialsProvider;

ydb/core/persqueue/partition.cpp

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -983,10 +983,38 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
983983
}
984984
}
985985
}
986-
result.SetScaleStatus(SplitMergeEnabled(TabletConfig) ? ScaleStatus : NKikimrPQ::EScaleStatus::NORMAL);
986+
if (SplitMergeEnabled(TabletConfig)) {
987+
if (PartitionScaleParticipants.Defined()) {
988+
result.MutableScaleParticipatingPartitions()->CopyFrom(*PartitionScaleParticipants);
989+
}
990+
result.SetScaleStatus(ScaleStatus);
991+
} else {
992+
result.SetScaleStatus(NKikimrPQ::EScaleStatus::NORMAL);
993+
}
994+
987995
ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionStatusResponse(result, Partition));
988996
}
989997

998+
void TPartition::Handle(TEvPQ::TEvPartitionScaleStatusChanged::TPtr& ev, const TActorContext& ctx)
999+
{
1000+
const bool mirroredPartition = MirroringEnabled(Config);
1001+
const NKikimrPQ::TEvPartitionScaleStatusChanged& record = ev->Get()->Record;
1002+
if (mirroredPartition) {
1003+
if (record.HasParticipatingPartitions()) [[likely]] {
1004+
PQ_LOG_I("Got split-merge event from mirrorer: " << ev->ToString());
1005+
1006+
ScaleStatus = record.GetScaleStatus();
1007+
PartitionScaleParticipants.ConstructInPlace();
1008+
PartitionScaleParticipants->CopyFrom(record.GetParticipatingPartitions());
1009+
ctx.Send(Tablet, ev->Release());
1010+
} else {
1011+
PQ_LOG_W("Ignoring split-merge event from the mirrorer because it does not have participating partitions info: " << ev->ToString());
1012+
}
1013+
} else {
1014+
PQ_LOG_W("Ignoring split-merge event because mirroring is disabled: " << ev->ToString());
1015+
}
1016+
}
1017+
9901018
void TPartition::HandleOnInit(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx) {
9911019
NKikimrPQ::TStatusResponse::TPartResult result;
9921020
result.SetPartition(Partition.InternalPartitionId);
@@ -3012,6 +3040,7 @@ void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config,
30123040
ctx.Send(Mirrorer->Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter,
30133041
Config));
30143042
} else {
3043+
ScaleStatus = NKikimrPQ::EScaleStatus::NORMAL;
30153044
CreateMirrorerActor();
30163045
}
30173046
} else {

ydb/core/persqueue/partition.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -480,6 +480,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
480480

481481
NKikimrPQ::EScaleStatus CheckScaleStatus(const TActorContext& ctx);
482482
void ChangeScaleStatusIfNeeded(NKikimrPQ::EScaleStatus scaleStatus);
483+
void Handle(TEvPQ::TEvPartitionScaleStatusChanged::TPtr& ev, const TActorContext& ctx);
483484

484485
TString LogPrefix() const;
485486

@@ -635,6 +636,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
635636
HFuncTraced(TEvPQ::TEvRegisterMessageGroup, HandleOnIdle);
636637
HFuncTraced(TEvPQ::TEvDeregisterMessageGroup, HandleOnIdle);
637638
HFuncTraced(TEvPQ::TEvSplitMessageGroup, HandleOnIdle);
639+
HFuncTraced(TEvPQ::TEvPartitionScaleStatusChanged, Handle);
638640
HFuncTraced(TEvPersQueue::TEvProposeTransaction, Handle);
639641
HFuncTraced(TEvPQ::TEvTxCalcPredicate, Handle);
640642
HFuncTraced(TEvPQ::TEvGetWriteInfoRequest, Handle);
@@ -938,6 +940,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
938940

939941
std::unique_ptr<NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>>> SplitMergeAvgWriteBytes;
940942
TInstant LastScaleRequestTime = TInstant::Zero();
943+
TMaybe<NKikimrPQ::TPartitionScaleParticipants> PartitionScaleParticipants;
941944
NKikimrPQ::EScaleStatus ScaleStatus = NKikimrPQ::EScaleStatus::NORMAL;
942945

943946
ui64 ReservedSize;
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
#include "partition_index_generator.h"
2+
3+
#include <util/generic/mapfindptr.h>
4+
#include <format>
5+
6+
namespace NKikimr::NPQ {
7+
8+
TPartitionIndexGenerator::TPartitionIndexGenerator(ui32 nextId, ui32 nextGroupId)
9+
: First{
10+
.Id = nextId,
11+
.GroupId = nextGroupId,
12+
}
13+
, Next(First)
14+
{
15+
}
16+
17+
TPartitionIndexGenerator::~TPartitionIndexGenerator() = default;
18+
19+
std::expected<void, TPartitionIndexGenerator::TErrorMessage> TPartitionIndexGenerator::ReservePartitionIndex(ui32 id, ui32 sourceId, bool allowToReuseExistingPartition) {
20+
if (id < First.Id && !allowToReuseExistingPartition) {
21+
return std::unexpected(std::format("Attempt to reserve partition id ({}) that is less than the first availiable id ({})", id, First.Id));
22+
}
23+
TReservationInfo res{
24+
.Partition{
25+
.Id = id,
26+
.GroupId = GenerateGroupIdFor(id),
27+
},
28+
.SourceId = sourceId,
29+
.Existed = (id < First.Id),
30+
};
31+
const auto [it, unique] = Reserved.try_emplace(id, std::move(res));
32+
if (!unique) {
33+
if (it->second.SourceId == sourceId) {
34+
return std::unexpected(std::format("Splitting parition id ({}) has repetition in the children partition ids ({})",
35+
sourceId, id));
36+
} else {
37+
return std::unexpected(std::format("Attempt to reserve parition id ({}) for multiple split/merge operations ({}, {})",
38+
id, it->second.SourceId, sourceId));
39+
}
40+
}
41+
return {};
42+
}
43+
44+
std::expected<TPartitionIndexGenerator::TIdPair, TPartitionIndexGenerator::TErrorMessage> TPartitionIndexGenerator::GetNextUnreservedIdAndGroupId() {
45+
while (Reserved.contains(Next.Id)) {
46+
Advance(Next);
47+
}
48+
return Allocate(Advance(Next), EAllocationType::Free);
49+
}
50+
51+
std::expected<TPartitionIndexGenerator::TIdPair, TPartitionIndexGenerator::TErrorMessage> TPartitionIndexGenerator::GetNextReservedIdAndGroupId(ui32 id) {
52+
const TReservationInfo* res = MapFindPtr(Reserved, id);
53+
if (!res) {
54+
return std::unexpected(std::format("Partition id ({}) is not reserved", id));
55+
}
56+
return Allocate(res->Partition, EAllocationType::Reserved);
57+
}
58+
59+
std::expected<void, TPartitionIndexGenerator::TErrorMessage> TPartitionIndexGenerator::ValidateAllocationSequence() {
60+
for (const auto& [id, res] : Reserved) {
61+
if (!Allocated.contains(id)) {
62+
return std::unexpected(std::format("Partition id ({}) is reserved but not allocated", id));
63+
}
64+
}
65+
ui32 id = First.Id;
66+
for (const auto& [allocId, type] : Allocated) {
67+
const TReservationInfo* reserve = MapFindPtr(Reserved, allocId);
68+
if (!reserve) {
69+
if (type == EAllocationType::Reserved) {
70+
return std::unexpected(std::format("Partition id ({}) is not reserved", id));
71+
}
72+
} else if (reserve->Existed) {
73+
continue;
74+
}
75+
if (id != allocId) {
76+
return std::unexpected(std::format("Gap in the partition indices: attempt to create new partition ({}) without creating a previous one ({})", allocId, id));
77+
}
78+
++id;
79+
}
80+
return {};
81+
}
82+
83+
TPartitionIndexGenerator::TIdPair TPartitionIndexGenerator::Advance(TIdPair& pair) {
84+
TIdPair prev = pair;
85+
++pair.Id;
86+
++pair.GroupId;
87+
return prev;
88+
}
89+
90+
std::expected<TPartitionIndexGenerator::TIdPair, TPartitionIndexGenerator::TErrorMessage> TPartitionIndexGenerator::Allocate(TIdPair pair, EAllocationType type) {
91+
auto [it, ins] = Allocated.try_emplace(pair.Id, type);
92+
if (!ins) {
93+
return std::unexpected(std::format("Attempt to allocate partition id ({}) that is already allocated", pair.Id));
94+
}
95+
return pair;
96+
}
97+
98+
ui32 TPartitionIndexGenerator::GenerateGroupIdFor(ui32 id) const {
99+
ui32 offset = id - First.Id;
100+
ui32 g = First.GroupId + offset;
101+
return g;
102+
}
103+
104+
} // namespace NKikimr::NPQ
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
#pragma once
2+
3+
#include <util/generic/string.h>
4+
5+
#include <expected>
6+
#include <map>
7+
8+
namespace NKikimr::NPQ {
9+
10+
class TPartitionIndexGenerator {
11+
public:
12+
using TErrorMessage = TString;
13+
14+
struct TIdPair {
15+
ui32 Id;
16+
ui32 GroupId;
17+
};
18+
19+
public:
20+
explicit TPartitionIndexGenerator(ui32 nextId, ui32 nextGroupId);
21+
~TPartitionIndexGenerator();
22+
23+
std::expected<void, TErrorMessage> ReservePartitionIndex(ui32 id, ui32 sourceId, bool allowToReuseExistingPartition);
24+
std::expected<TIdPair, TErrorMessage> GetNextUnreservedIdAndGroupId();
25+
std::expected<TIdPair, TErrorMessage> GetNextReservedIdAndGroupId(ui32 id);
26+
std::expected<void, TErrorMessage> ValidateAllocationSequence();
27+
28+
private:
29+
struct TReservationInfo {
30+
TIdPair Partition;
31+
ui32 SourceId = 0;
32+
bool Existed = false;
33+
};
34+
35+
enum class EAllocationType {
36+
Free,
37+
Reserved,
38+
};
39+
40+
static TIdPair Advance(TIdPair& pair);
41+
std::expected<TIdPair, TErrorMessage> Allocate(TIdPair pair, EAllocationType type);
42+
ui32 GenerateGroupIdFor(ui32 id) const;
43+
44+
private:
45+
const TIdPair First;
46+
TIdPair Next;
47+
std::map<ui32, TReservationInfo> Reserved;
48+
std::map<ui32, EAllocationType> Allocated;
49+
};
50+
} // namespace NKikimr::NPQ

0 commit comments

Comments
 (0)