Skip to content

Commit cdba4c7

Browse files
authored
Merge 7fd3e0a into 9dbab4a
2 parents 9dbab4a + 7fd3e0a commit cdba4c7

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+4317
-77
lines changed

ydb/core/persqueue/actor_persqueue_client_iface.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,11 @@ class IPersQueueMirrorReaderFactory {
5252
TMaybe<TLog> logger = Nothing()
5353
) const = 0;
5454

55+
virtual NThreading::TFuture<NYdb::NTopic::TDescribeTopicResult> GetTopicDescription(
56+
const NKikimrPQ::TMirrorPartitionConfig& config,
57+
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory
58+
) const = 0;
59+
5560
virtual ~IPersQueueMirrorReaderFactory() = default;
5661

5762
TDeferredActorLogBackend::TSharedAtomicActorSystemPtr GetSharedActorSystem() const {
@@ -105,6 +110,7 @@ class TPersQueueMirrorReaderFactory : public IPersQueueMirrorReaderFactory {
105110
.ConsumerName(config.GetConsumer())
106111
.MaxMemoryUsageBytes(maxMemoryUsageBytes)
107112
.Decompress(false)
113+
.AutoPartitioningSupport(true)
108114
.RetryPolicy(NYdb::NTopic::IRetryPolicy::GetNoRetryPolicy());
109115
if (logger) {
110116
settings.Log(logger.GetRef());
@@ -119,6 +125,22 @@ class TPersQueueMirrorReaderFactory : public IPersQueueMirrorReaderFactory {
119125
NYdb::NTopic::TTopicClient topicClient(*Driver, clientSettings);
120126
return topicClient.CreateReadSession(settings);
121127
}
128+
129+
NThreading::TFuture<NYdb::NTopic::TDescribeTopicResult> GetTopicDescription(
130+
const NKikimrPQ::TMirrorPartitionConfig& config,
131+
std::shared_ptr<NYdb::ICredentialsProviderFactory> credentialsProviderFactory
132+
) const override {
133+
NYdb::NTopic::TTopicClientSettings clientSettings = NYdb::NTopic::TTopicClientSettings()
134+
.DiscoveryEndpoint(TStringBuilder() << config.GetEndpoint() << ":" << config.GetEndpointPort())
135+
.DiscoveryMode(NYdb::EDiscoveryMode::Async)
136+
.CredentialsProviderFactory(credentialsProviderFactory)
137+
.SslCredentials(NYdb::TSslCredentials(config.GetUseSecureConnection()));
138+
if (config.HasDatabase()) {
139+
clientSettings.Database(config.GetDatabase());
140+
}
141+
NYdb::NTopic::TTopicClient topicClient(*Driver, clientSettings);
142+
return topicClient.DescribeTopic(config.GetTopic());
143+
}
122144
};
123145

124146
} // namespace NKikimr::NSQS

ydb/core/persqueue/events/internal.h

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
#include <ydb/library/actors/core/actorid.h>
2121
#include <ydb/core/grpc_services/rpc_calls.h>
2222
#include <ydb/public/api/protos/persqueue_error_codes_v1.pb.h>
23+
#include <ydb/public/sdk/cpp/include/ydb-cpp-sdk/client/topic/control_plane.h>
2324
#include <util/generic/maybe.h>
25+
#include <expected>
2426

2527
namespace NYdb::inline Dev {
2628
class ICredentialsProviderFactory;
@@ -204,6 +206,8 @@ struct TEvPQ {
204206
EvExclusiveLockAcquired,
205207
EvReleaseExclusiveLock,
206208
EvRunCompaction,
209+
EvMirrorTopicDescription,
210+
EvBroadcastPartitionError,
207211
EvEnd
208212
};
209213

@@ -771,6 +775,21 @@ struct TEvPQ {
771775
NKikimr::TTabletCountersBase Counters;
772776
};
773777

778+
struct TEvMirrorTopicDescription : public TEventLocal<TEvMirrorTopicDescription, EvMirrorTopicDescription> {
779+
TEvMirrorTopicDescription(NYdb::NTopic::TDescribeTopicResult description)
780+
: Description(std::move(description))
781+
{
782+
}
783+
784+
TEvMirrorTopicDescription(TString error)
785+
: Description(std::unexpected(std::move(error)))
786+
{
787+
}
788+
789+
std::expected<NYdb::NTopic::TDescribeTopicResult, TString> Description;
790+
};
791+
792+
774793
struct TEvRetryWrite : public TEventLocal<TEvRetryWrite, EvRetryWrite> {
775794
TEvRetryWrite()
776795
{}
@@ -1183,6 +1202,19 @@ struct TEvPQ {
11831202
}
11841203
};
11851204

1205+
struct TBroadcastPartitionError : public TEventPB<TBroadcastPartitionError,
1206+
NKikimrPQ::TBroadcastPartitionError, EvBroadcastPartitionError> {
1207+
TBroadcastPartitionError() = default;
1208+
1209+
explicit TBroadcastPartitionError(TString message, NKikimrServices::EServiceKikimr service, TInstant timestamp) {
1210+
auto* defaultGroup = Record.AddMessageGroups();
1211+
auto* error = defaultGroup->AddErrors();
1212+
error->SetMessage(std::move(message));
1213+
error->SetService(service);
1214+
error->SetTimestamp(timestamp.Seconds());
1215+
}
1216+
};
1217+
11861218
struct TEvBalanceConsumer : TEventLocal<TEvBalanceConsumer, EvBalanceConsumer> {
11871219
TEvBalanceConsumer(const TString& consumerName)
11881220
: ConsumerName(consumerName)

ydb/core/persqueue/mirrorer.cpp

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,40 @@ void TMirrorer::TryToWrite(const TActorContext& ctx) {
373373
WriteRequestTimestamp = ctx.Now();
374374
}
375375

376+
void TMirrorer::TryToSplitMerge(const TActorContext& ctx) {
377+
if (!EndPartitionSessionEvent) {
378+
return;
379+
}
380+
if (!AppData(ctx)->FeatureFlags.GetEnableMirroredTopicSplitMerge()) {
381+
return;
382+
}
383+
if (WriteRequestInFlight || !Queue.empty()) {
384+
LOG_DEBUG_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " postpone split-merge event until all write operations completed");
385+
return;
386+
}
387+
const bool isSplit = EndPartitionSessionEvent->GetAdjacentPartitionIds().empty();
388+
if (!isSplit) {
389+
LOG_WARN_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " topic merge not supported yet.");
390+
return;
391+
}
392+
if (EndPartitionSessionEvent->GetChildPartitionIds().empty()) {
393+
LOG_WARN_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " split-merge operation has no child partitions");
394+
return;
395+
}
396+
const ::NKikimrPQ::EScaleStatus value = isSplit ? NKikimrPQ::EScaleStatus::NEED_SPLIT : NKikimrPQ::EScaleStatus::NEED_MERGE;
397+
THolder request = MakeHolder<TEvPQ::TEvPartitionScaleStatusChanged>();
398+
request->Record.SetPartitionId(Partition);
399+
request->Record.SetScaleStatus(value);
400+
auto* relation = request->Record.MutableParticipatingPartitions();
401+
for (const auto& p : EndPartitionSessionEvent->GetChildPartitionIds()) {
402+
relation->AddChildPartitionIds(p);
403+
}
404+
for (const auto& p : EndPartitionSessionEvent->GetAdjacentPartitionIds()) {
405+
relation->AddAdjacentPartitionIds(p);
406+
}
407+
Send(PartitionActor, std::move(request));
408+
EndPartitionSessionEvent = std::nullopt;
409+
}
376410

377411
void TMirrorer::HandleInitCredentials(TEvPQ::TEvInitCredentials::TPtr& /*ev*/, const TActorContext& ctx) {
378412
if (CredentialsRequestInFlight) {
@@ -440,6 +474,7 @@ void TMirrorer::HandleRetryWrite(TEvPQ::TEvRetryWrite::TPtr& /*ev*/, const TActo
440474
void TMirrorer::HandleWakeup(const TActorContext& ctx) {
441475
TryToRead(ctx);
442476
TryToWrite(ctx);
477+
TryToSplitMerge(ctx);
443478
}
444479

445480
void TMirrorer::CreateConsumer(TEvPQ::TEvCreateConsumer::TPtr&, const TActorContext& ctx) {
@@ -536,6 +571,7 @@ void TMirrorer::ScheduleConsumerCreation(const TActorContext& ctx) {
536571
ReadSession->Close(TDuration::Zero());
537572
ReadSession = nullptr;
538573
PartitionStream = nullptr;
574+
EndPartitionSessionEvent = std::nullopt;
539575
ReadFuturesInFlight = 0;
540576
ReadFeatures.clear();
541577
WaitNextReaderEventInFlight = false;
@@ -676,6 +712,14 @@ void TMirrorer::DoProcessNextReaderEvent(const TActorContext& ctx, bool wakeup)
676712
ProcessError(ctx, TStringBuilder() << " read session closed: " << closeSessionEvent->DebugString());
677713
ScheduleConsumerCreation(ctx);
678714
return;
715+
} else if (auto* endPartitionSessionEvent = std::get_if<TPersQueueReadEvent::TEndPartitionSessionEvent>(&event.value())) {
716+
LOG_INFO_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription()
717+
<< " got end partion session event: " << endPartitionSessionEvent->DebugString());
718+
if (EndPartitionSessionEvent.has_value()) {
719+
LOG_WARN_S(ctx, NKikimrServices::PQ_MIRRORER, MirrorerDescription() << " already has end partition session event");
720+
EndPartitionSessionEvent.reset();
721+
}
722+
EndPartitionSessionEvent = *endPartitionSessionEvent;
679723
} else {
680724
ProcessError(ctx, TStringBuilder() << " got unmatched event: " << event.value().index());
681725
ScheduleConsumerCreation(ctx);

ydb/core/persqueue/mirrorer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,7 @@ class TMirrorer : public TActorBootstrapped<TMirrorer> {
137137
void HandleChangeConfig(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorContext& ctx);
138138
void TryToRead(const TActorContext& ctx);
139139
void TryToWrite(const TActorContext& ctx);
140+
void TryToSplitMerge(const TActorContext& ctx);
140141
void HandleInitCredentials(TEvPQ::TEvInitCredentials::TPtr& ev, const TActorContext& ctx);
141142
void HandleCredentialsCreated(TEvPQ::TEvCredentialsCreated::TPtr& ev, const TActorContext& ctx);
142143
void HandleRetryWrite(TEvPQ::TEvRetryWrite::TPtr& ev, const TActorContext& ctx);
@@ -164,6 +165,7 @@ class TMirrorer : public TActorBootstrapped<TMirrorer> {
164165
TDeque<NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent::TCompressedMessage> WriteInFlight;
165166
ui64 BytesInFlight = 0;
166167
std::optional<NKikimrClient::TPersQueuePartitionRequest> WriteRequestInFlight;
168+
std::optional<NYdb::NTopic::TReadSessionEvent::TEndPartitionSessionEvent> EndPartitionSessionEvent;
167169
TDuration WriteRetryTimeout = WRITE_RETRY_TIMEOUT_START;
168170
TInstant WriteRequestTimestamp;
169171
NYdb::TCredentialsProviderFactoryPtr CredentialsProvider;

ydb/core/persqueue/partition.cpp

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,15 @@ void TPartition::Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContex
587587
}
588588
}
589589

590+
void TPartition::Handle(TEvPQ::TBroadcastPartitionError::TPtr& ev, const TActorContext& ctx) {
591+
const NKikimrPQ::TBroadcastPartitionError& record = ev->Get()->Record;
592+
for (const auto& group : record.GetMessageGroups()) {
593+
for (const auto& error : group.GetErrors()) {
594+
LogAndCollectError(error, ctx);
595+
}
596+
}
597+
}
598+
590599
void TPartition::DestroyActor(const TActorContext& ctx)
591600
{
592601
// Reply to all outstanding requests in order to destroy corresponding actors
@@ -983,10 +992,38 @@ void TPartition::Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext
983992
}
984993
}
985994
}
986-
result.SetScaleStatus(SplitMergeEnabled(TabletConfig) ? ScaleStatus : NKikimrPQ::EScaleStatus::NORMAL);
995+
if (SplitMergeEnabled(TabletConfig)) {
996+
if (PartitionScaleParticipants.Defined()) {
997+
result.MutableScaleParticipatingPartitions()->CopyFrom(*PartitionScaleParticipants);
998+
}
999+
result.SetScaleStatus(ScaleStatus);
1000+
} else {
1001+
result.SetScaleStatus(NKikimrPQ::EScaleStatus::NORMAL);
1002+
}
1003+
9871004
ctx.Send(ev->Get()->Sender, new TEvPQ::TEvPartitionStatusResponse(result, Partition));
9881005
}
9891006

1007+
void TPartition::Handle(TEvPQ::TEvPartitionScaleStatusChanged::TPtr& ev, const TActorContext& ctx)
1008+
{
1009+
const bool mirroredPartition = MirroringEnabled(Config);
1010+
const NKikimrPQ::TEvPartitionScaleStatusChanged& record = ev->Get()->Record;
1011+
if (mirroredPartition) {
1012+
if (record.HasParticipatingPartitions()) [[likely]] {
1013+
PQ_LOG_I("Got split-merge event from mirrorer: " << ev->ToString());
1014+
1015+
ScaleStatus = record.GetScaleStatus();
1016+
PartitionScaleParticipants.ConstructInPlace();
1017+
PartitionScaleParticipants->CopyFrom(record.GetParticipatingPartitions());
1018+
ctx.Send(Tablet, ev->Release());
1019+
} else {
1020+
PQ_LOG_W("Ignoring split-merge event from the mirrorer because it does not have participating partitions info: " << ev->ToString());
1021+
}
1022+
} else {
1023+
PQ_LOG_W("Ignoring split-merge event because mirroring is disabled: " << ev->ToString());
1024+
}
1025+
}
1026+
9901027
void TPartition::HandleOnInit(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx) {
9911028
NKikimrPQ::TStatusResponse::TPartResult result;
9921029
result.SetPartition(Partition.InternalPartitionId);
@@ -3012,6 +3049,7 @@ void TPartition::EndChangePartitionConfig(NKikimrPQ::TPQTabletConfig&& config,
30123049
ctx.Send(Mirrorer->Actor, new TEvPQ::TEvChangePartitionConfig(TopicConverter,
30133050
Config));
30143051
} else {
3052+
ScaleStatus = NKikimrPQ::EScaleStatus::NORMAL;
30153053
CreateMirrorerActor();
30163054
}
30173055
} else {

ydb/core/persqueue/partition.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
244244
void Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext& ctx);
245245
void Handle(TEvPQ::TEvRunCompaction::TPtr& ev);
246246
void Handle(TEvPQ::TEvExclusiveLockAcquired::TPtr& ev);
247+
void Handle(TEvPQ::TBroadcastPartitionError::TPtr& ev, const TActorContext& ctx);
247248
void HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx);
248249
void HandleOnIdle(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx);
249250
void HandleOnIdle(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx);
@@ -480,6 +481,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
480481

481482
NKikimrPQ::EScaleStatus CheckScaleStatus(const TActorContext& ctx);
482483
void ChangeScaleStatusIfNeeded(NKikimrPQ::EScaleStatus scaleStatus);
484+
void Handle(TEvPQ::TEvPartitionScaleStatusChanged::TPtr& ev, const TActorContext& ctx);
483485

484486
TString LogPrefix() const;
485487

@@ -571,6 +573,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
571573
HFuncTraced(TEvPersQueue::TEvReportPartitionError, Handle);
572574
HFuncTraced(TEvPersQueue::TEvHasDataInfo, Handle);
573575
HFuncTraced(TEvPQ::TEvMirrorerCounters, Handle);
576+
HFuncTraced(TEvPQ::TBroadcastPartitionError, Handle);
574577
HFuncTraced(TEvPQ::TEvGetPartitionClientInfo, Handle);
575578
HFuncTraced(TEvPQ::TEvTxCalcPredicate, HandleOnInit);
576579
HFuncTraced(TEvPQ::TEvProposePartitionConfig, HandleOnInit);
@@ -624,6 +627,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
624627
HFuncTraced(TEvPQ::TEvChangeOwner, Handle);
625628
HFuncTraced(TEvPersQueue::TEvHasDataInfo, Handle);
626629
HFuncTraced(TEvPQ::TEvMirrorerCounters, Handle);
630+
HFuncTraced(TEvPQ::TBroadcastPartitionError, Handle);
627631
HFuncTraced(TEvPQ::TEvProxyResponse, Handle);
628632
HFuncTraced(TEvPQ::TEvError, Handle);
629633
HFuncTraced(TEvPQ::TEvGetPartitionClientInfo, Handle);
@@ -635,6 +639,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
635639
HFuncTraced(TEvPQ::TEvRegisterMessageGroup, HandleOnIdle);
636640
HFuncTraced(TEvPQ::TEvDeregisterMessageGroup, HandleOnIdle);
637641
HFuncTraced(TEvPQ::TEvSplitMessageGroup, HandleOnIdle);
642+
HFuncTraced(TEvPQ::TEvPartitionScaleStatusChanged, Handle);
638643
HFuncTraced(TEvPersQueue::TEvProposeTransaction, Handle);
639644
HFuncTraced(TEvPQ::TEvTxCalcPredicate, Handle);
640645
HFuncTraced(TEvPQ::TEvGetWriteInfoRequest, Handle);
@@ -938,6 +943,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
938943

939944
std::unique_ptr<NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>>> SplitMergeAvgWriteBytes;
940945
TInstant LastScaleRequestTime = TInstant::Zero();
946+
TMaybe<NKikimrPQ::TPartitionScaleParticipants> PartitionScaleParticipants;
941947
NKikimrPQ::EScaleStatus ScaleStatus = NKikimrPQ::EScaleStatus::NORMAL;
942948

943949
ui64 ReservedSize;
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#include "partition_index_generator.h"
2+
3+
#include <util/generic/mapfindptr.h>
4+
#include <format>
5+
6+
namespace NKikimr::NPQ {
7+
8+
TPartitionIndexGenerator::TPartitionIndexGenerator(TPartitionId nextId)
9+
: First{nextId}
10+
, Next(First)
11+
{
12+
}
13+
14+
TPartitionIndexGenerator::~TPartitionIndexGenerator() = default;
15+
16+
std::expected<void, TPartitionIndexGenerator::TErrorMessage> TPartitionIndexGenerator::ReservePartitionIndex(TPartitionId id, TPartitionId sourceId, bool allowToReuseExistingPartition) {
17+
if (id < First && !allowToReuseExistingPartition) {
18+
return std::unexpected(std::format("Attempt to reserve partition id ({}) that is less than the first availiable id ({})", id, First));
19+
}
20+
TReservationInfo res{
21+
.PartitionId = id,
22+
.SourceId = sourceId,
23+
.Existed = (id < First),
24+
};
25+
const auto [it, unique] = Reserved.try_emplace(id, std::move(res));
26+
if (!unique) {
27+
if (it->second.SourceId == sourceId) {
28+
return std::unexpected(std::format("Splitting parition id ({}) has repetition in the children partition ids ({})",
29+
sourceId, id));
30+
} else {
31+
return std::unexpected(std::format("Attempt to reserve parition id ({}) for multiple split/merge operations ({}, {})",
32+
id, it->second.SourceId, sourceId));
33+
}
34+
}
35+
return {};
36+
}
37+
38+
std::expected<TPartitionIndexGenerator::TPartitionId, TPartitionIndexGenerator::TErrorMessage> TPartitionIndexGenerator::GetNextUnreservedId() {
39+
while (Reserved.contains(Next)) {
40+
Advance(Next);
41+
}
42+
return Allocate(Advance(Next), EAllocationType::Free);
43+
}
44+
45+
std::expected<TPartitionIndexGenerator::TPartitionId, TPartitionIndexGenerator::TErrorMessage> TPartitionIndexGenerator::GetNextReservedId(TPartitionId id) {
46+
const TReservationInfo* res = MapFindPtr(Reserved, id);
47+
if (!res) {
48+
return std::unexpected(std::format("Partition id ({}) is not reserved", id));
49+
}
50+
return Allocate(res->PartitionId, EAllocationType::Reserved);
51+
}
52+
53+
std::expected<void, TPartitionIndexGenerator::TErrorMessage> TPartitionIndexGenerator::ValidateAllocationSequence() {
54+
for (const auto& [id, res] : Reserved) {
55+
if (!Allocated.contains(id)) {
56+
return std::unexpected(std::format("Partition id ({}) is reserved but not allocated", id));
57+
}
58+
}
59+
TPartitionId id = First;
60+
for (const auto& [allocId, type] : Allocated) {
61+
const TReservationInfo* reserve = MapFindPtr(Reserved, allocId);
62+
if (!reserve) {
63+
if (type == EAllocationType::Reserved) {
64+
return std::unexpected(std::format("Partition id ({}) is not reserved", id));
65+
}
66+
} else if (reserve->Existed) {
67+
continue;
68+
}
69+
if (id != allocId) {
70+
return std::unexpected(std::format("Gap in the partition indices: attempt to create new partition ({}) without creating a previous one ({})", allocId, id));
71+
}
72+
++id;
73+
}
74+
return {};
75+
}
76+
77+
TPartitionIndexGenerator::TPartitionId TPartitionIndexGenerator::Advance(TPartitionId& id) {
78+
return id++;
79+
}
80+
81+
std::expected<TPartitionIndexGenerator::TPartitionId, TPartitionIndexGenerator::TErrorMessage> TPartitionIndexGenerator::Allocate(TPartitionId id, EAllocationType type) {
82+
auto [it, ins] = Allocated.try_emplace(id, type);
83+
if (!ins) {
84+
return std::unexpected(std::format("Attempt to allocate partition id ({}) that is already allocated", id));
85+
}
86+
return id;
87+
}
88+
89+
} // namespace NKikimr::NPQ

0 commit comments

Comments
 (0)