Skip to content

Commit cc69210

Browse files
authored
Merge 3830e7d into e16ace1
2 parents e16ace1 + 3830e7d commit cc69210

File tree

12 files changed

+235
-44
lines changed

12 files changed

+235
-44
lines changed

ydb/core/persqueue/ut/common/autoscaling_ut_common.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,8 @@ TTestReadSession::TTestReadSession(const TString& name, TTopicClient& client, si
209209
auto partitionId = ev.GetPartitionSession()->GetPartitionId();
210210
impl->EndedPartitions.insert(partitionId);
211211
impl->EndedPartitionEvents.push_back(ev);
212+
213+
ev.Confirm();
212214
});
213215

214216

ydb/core/persqueue/ut/ya.make

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ IF (SANITIZER_TYPE == "thread" OR WITH_VALGRIND)
1010
TIMEOUT(3000)
1111
ELSE()
1212
SIZE(MEDIUM)
13-
TIMEOUT(600)
13+
TIMEOUT(60)
1414
ENDIF()
1515

1616
PEERDIR(

ydb/core/tx/replication/ydb_proxy/ydb_proxy.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,7 +205,8 @@ class TTopicReader: public TBaseProxyActor<TTopicReader> {
205205
x->Confirm();
206206
return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie);
207207
} else if (auto* x = std::get_if<TReadSessionEvent::TEndPartitionSessionEvent>(&*event)) {
208-
// do nothing.
208+
x->Confirm();
209+
return WaitEvent(ev->Get()->Sender, ev->Get()->Cookie);
209210
} else if (auto* x = std::get_if<TReadSessionEvent::TDataReceivedEvent>(&*event)) {
210211
if (AutoCommit) {
211212
DeferredCommit.Add(*x);

ydb/public/sdk/cpp/client/ydb_persqueue_public/ut/read_session_ut.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1930,8 +1930,8 @@ Y_UNIT_TEST_SUITE(ReadSessionImplTest) {
19301930
TDeferredActions actions;
19311931

19321932
TPartitionStreamImpl::SignalReadyEvents(stream,
1933-
&sessionQueue,
1934-
actions);
1933+
&sessionQueue,
1934+
actions);
19351935

19361936
UNIT_ASSERT_DATA_EVENT(1);
19371937
UNIT_ASSERT_CONTROL_EVENT();

ydb/public/sdk/cpp/client/ydb_topic/impl/event_handlers.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ class TGracefulReleasingSimpleDataHandlers : public TThrRefBase {
7676
}
7777
}
7878

79-
void OnEndPartitionStream(TReadSessionEvent::TEndPartitionSessionEvent&) {
79+
void OnEndPartitionStream(TReadSessionEvent::TEndPartitionSessionEvent& event) {
80+
event.Confirm();
8081
}
8182

8283
void OnPartitionStreamClosed(TReadSessionEvent::TPartitionSessionClosedEvent& event) {
@@ -139,7 +140,8 @@ TReadSessionSettings::TEventHandlers& TReadSessionSettings::TEventHandlers::Simp
139140
StopPartitionSessionHandler([](TReadSessionEvent::TStopPartitionSessionEvent& event) {
140141
event.Confirm();
141142
});
142-
EndPartitionSessionHandler([](TReadSessionEvent::TEndPartitionSessionEvent&) {
143+
EndPartitionSessionHandler([](TReadSessionEvent::TEndPartitionSessionEvent& event) {
144+
event.Confirm();
143145
});
144146
CommitOffsetAcknowledgementHandler([](TReadSessionEvent::TCommitOffsetAcknowledgementEvent&){});
145147
PartitionSessionClosedHandler([](TReadSessionEvent::TPartitionSessionClosedEvent&){});

ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_event.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,12 @@ TEndPartitionSessionEvent::TEndPartitionSessionEvent(TPartitionSession::TPtr par
349349
, ChildPartitionIds(std::move(childPartitionIds)) {
350350
}
351351

352+
void TEndPartitionSessionEvent::Confirm() {
353+
if (PartitionSession) {
354+
static_cast<TPartitionStreamImpl<false>*>(PartitionSession.Get())->ConfirmEnd(GetChildPartitionIds());
355+
}
356+
}
357+
352358
void JoinIds(TStringBuilder& ret, const std::vector<ui32> ids) {
353359
ret << "[";
354360
for (size_t i = 0; i < ids.size(); ++i) {

ydb/public/sdk/cpp/client/ydb_topic/impl/read_session_impl.h

Lines changed: 99 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,9 @@ struct TRawPartitionStreamEvent {
489489
template <bool UseMigrationProtocol>
490490
class TRawPartitionStreamEventQueue {
491491
public:
492-
TRawPartitionStreamEventQueue() = default;
492+
TRawPartitionStreamEventQueue(TCallbackContextPtr<UseMigrationProtocol> cbContext)
493+
: CbContext(cbContext) {
494+
};
493495

494496
template <class... Ts>
495497
TRawPartitionStreamEvent<UseMigrationProtocol>& emplace_back(Ts&&... event)
@@ -550,6 +552,7 @@ class TRawPartitionStreamEventQueue {
550552
std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>>& queue);
551553

552554
private:
555+
TCallbackContextPtr<UseMigrationProtocol> CbContext;
553556
std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> Ready;
554557
std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> NotReady;
555558
};
@@ -583,6 +586,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
583586
, AssignId(assignId)
584587
, FirstNotReadOffset(readOffset)
585588
, CbContext(std::move(cbContext))
589+
, EventsQueue(CbContext)
586590
{
587591
TAPartitionStream<true>::PartitionStreamId = partitionStreamId;
588592
TAPartitionStream<true>::TopicPath = std::move(topicPath);
@@ -603,6 +607,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
603607
, AssignId(static_cast<ui64>(assignId))
604608
, FirstNotReadOffset(static_cast<ui64>(readOffset))
605609
, CbContext(std::move(cbContext))
610+
, EventsQueue(CbContext)
606611
{
607612
TAPartitionStream<false>::PartitionSessionId = partitionStreamId;
608613
TAPartitionStream<false>::TopicPath = std::move(topicPath);
@@ -625,6 +630,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
625630

626631
void ConfirmCreate(TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset);
627632
void ConfirmDestroy();
633+
void ConfirmEnd(const std::vector<ui32>& childIds);
628634

629635
void StopReading() /*override*/;
630636
void ResumeReading() /*override*/;
@@ -764,6 +770,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
764770
TMutex Lock;
765771
};
766772

773+
767774
template <bool UseMigrationProtocol>
768775
class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSettings<UseMigrationProtocol>,
769776
typename TAReadSessionEvent<UseMigrationProtocol>::TEvent,
@@ -823,7 +830,8 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
823830
}
824831

825832
bool TryApplyCallbackToEventImpl(typename TParent::TEvent& event,
826-
TDeferredActions<UseMigrationProtocol>& deferred);
833+
TDeferredActions<UseMigrationProtocol>& deferred,
834+
TCallbackContextPtr<UseMigrationProtocol>& cbContext);
827835
bool HasDataEventCallback() const;
828836
void ApplyCallbackToEventImpl(TADataReceivedEvent<UseMigrationProtocol>& event,
829837
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>&& eventsInfo,
@@ -859,13 +867,19 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
859867

860868
void ClearAllEvents();
861869

870+
void SetCallbackContext(TCallbackContextPtr<UseMigrationProtocol>& ctx) {
871+
CbContext = ctx;
872+
}
873+
862874
private:
863875
struct THandlersVisitor : public TParent::TBaseHandlersVisitor {
864876
THandlersVisitor(const TAReadSessionSettings<UseMigrationProtocol>& settings,
865877
typename TParent::TEvent& event,
866-
TDeferredActions<UseMigrationProtocol>& deferred)
878+
TDeferredActions<UseMigrationProtocol>& deferred,
879+
TCallbackContextPtr<UseMigrationProtocol>& cbContext)
867880
: TParent::TBaseHandlersVisitor(settings, event)
868-
, Deferred(deferred) {
881+
, Deferred(deferred)
882+
, CbContext(cbContext) {
869883
}
870884

871885
#define DECLARE_HANDLER(type, handler, answer) \
@@ -880,7 +894,7 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
880894
} \
881895
/**/
882896

883-
#define DECLARE_TEMPLATE_HANDLER(type_true, type_false, handler_true, handler_false, answer) \
897+
#define DECLARE_TEMPLATE_HANDLER(type_true, type_false, handler_true, handler_false) \
884898
bool operator()(std::conditional_t<UseMigrationProtocol, type_true, type_false>&) { \
885899
if (this->template PushHandler<std::conditional_t<UseMigrationProtocol, type_true, type_false>>( \
886900
std::move(TParent::TBaseHandlersVisitor::Event), \
@@ -892,7 +906,7 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
892906
} \
893907
}(), \
894908
this->Settings.EventHandlers_.CommonHandler_)) { \
895-
return answer; \
909+
return true; \
896910
} \
897911
return false; \
898912
} \
@@ -901,50 +915,85 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
901915
DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TDataReceivedEvent,
902916
typename TAReadSessionEvent<false>::TDataReceivedEvent,
903917
DataReceivedHandler_,
904-
DataReceivedHandler_,
905-
true);
918+
DataReceivedHandler_);
906919
DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TCommitAcknowledgementEvent,
907920
typename TAReadSessionEvent<false>::TCommitOffsetAcknowledgementEvent,
908921
CommitAcknowledgementHandler_,
909-
CommitOffsetAcknowledgementHandler_,
910-
true);
922+
CommitOffsetAcknowledgementHandler_);
911923
DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TCreatePartitionStreamEvent,
912924
typename TAReadSessionEvent<false>::TStartPartitionSessionEvent,
913925
CreatePartitionStreamHandler_,
914-
StartPartitionSessionHandler_,
915-
true);
926+
StartPartitionSessionHandler_);
916927
DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TDestroyPartitionStreamEvent,
917928
typename TAReadSessionEvent<false>::TStopPartitionSessionEvent,
918929
DestroyPartitionStreamHandler_,
919-
StopPartitionSessionHandler_,
920-
true);
930+
StopPartitionSessionHandler_);
921931
DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TPartitionStreamStatusEvent,
922932
typename TAReadSessionEvent<false>::TPartitionSessionStatusEvent,
923933
PartitionStreamStatusHandler_,
924-
PartitionSessionStatusHandler_,
925-
true);
926-
DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TPartitionStreamClosedEvent,
927-
typename TAReadSessionEvent<false>::TPartitionSessionClosedEvent,
928-
PartitionStreamClosedHandler_,
929-
PartitionSessionClosedHandler_,
930-
true);
934+
PartitionSessionStatusHandler_);
935+
//DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TPartitionStreamClosedEvent,
936+
// typename TAReadSessionEvent<false>::TPartitionSessionClosedEvent,
937+
// PartitionStreamClosedHandler_,
938+
// PartitionSessionClosedHandler_);
931939
DECLARE_HANDLER(TASessionClosedEvent<UseMigrationProtocol>, SessionClosedHandler_, false); // Not applied
932940

933941
#undef DECLARE_HANDLER
934942
#undef DECLARE_TEMPLATE_HANDLER
935943

944+
bool operator()(std::conditional_t<UseMigrationProtocol, typename TAReadSessionEvent<true>::TPartitionStreamClosedEvent, typename TAReadSessionEvent<false>::TPartitionSessionClosedEvent>&) {
945+
auto specific = [this]() {
946+
if constexpr (UseMigrationProtocol) {
947+
return this->Settings.EventHandlers_.PartitionStreamClosedHandler_;
948+
} else {
949+
return this->Settings.EventHandlers_.PartitionSessionClosedHandler_;
950+
}
951+
}();
952+
953+
if (!specific && !this->Settings.EventHandlers_.CommonHandler_) {
954+
return false;
955+
}
956+
957+
this->template PushCommonHandler<>(
958+
std::move(TParent::TBaseHandlersVisitor::Event),
959+
[specific = specific,
960+
common = this->Settings.EventHandlers_.CommonHandler_,
961+
cbContext = CbContext](auto& event) {
962+
auto& e = std::get<std::conditional_t<UseMigrationProtocol, typename TAReadSessionEvent<true>::TPartitionStreamClosedEvent, typename TAReadSessionEvent<false>::TPartitionSessionClosedEvent>>(event);
963+
if (specific) {
964+
specific(e);
965+
} else if (common) {
966+
common(event);
967+
}
968+
if constexpr (!UseMigrationProtocol) {
969+
if (auto session = cbContext->LockShared()) {
970+
session->UnregisterPartition(e.GetPartitionSession()->GetPartitionId(), e.GetPartitionSession()->GetPartitionSessionId());
971+
}
972+
}
973+
});
974+
975+
return true;
976+
}
977+
936978
template<bool E = !UseMigrationProtocol>
937979
constexpr std::enable_if_t<E, bool>
938980
operator()(typename TAReadSessionEvent<false>::TEndPartitionSessionEvent&) {
939-
if (this->template PushHandler<typename TAReadSessionEvent<false>::TEndPartitionSessionEvent>(
940-
std::move(TParent::TBaseHandlersVisitor::Event),
941-
[this](){
942-
return this->Settings.EventHandlers_.EndPartitionSessionHandler_;
943-
}(),
944-
this->Settings.EventHandlers_.CommonHandler_)) {
981+
if (!this->Settings.EventHandlers_.EndPartitionSessionHandler_ && !this->Settings.EventHandlers_.CommonHandler_) {
945982
return false;
946983
}
947-
return false;
984+
this->template PushCommonHandler<>(
985+
std::move(TParent::TBaseHandlersVisitor::Event),
986+
[specific = this->Settings.EventHandlers_.EndPartitionSessionHandler_,
987+
common = this->Settings.EventHandlers_.CommonHandler_,
988+
cbContext = CbContext](TReadSessionEvent::TEvent& event) {
989+
auto& e = std::get<TReadSessionEvent::TEndPartitionSessionEvent>(event);
990+
if (specific) {
991+
specific(e);
992+
} else if (common) {
993+
common(event);
994+
}
995+
});
996+
return true;
948997
}
949998

950999
bool Visit() {
@@ -956,6 +1005,7 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
9561005
}
9571006

9581007
TDeferredActions<UseMigrationProtocol>& Deferred;
1008+
TCallbackContextPtr<UseMigrationProtocol> CbContext;
9591009
};
9601010

9611011
TADataReceivedEvent<UseMigrationProtocol>
@@ -964,12 +1014,13 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
9641014
TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator); // Assumes that we're under lock.
9651015

9661016
bool ApplyHandler(TReadSessionEventInfo<UseMigrationProtocol>& eventInfo, TDeferredActions<UseMigrationProtocol>& deferred) {
967-
THandlersVisitor visitor(this->Settings, eventInfo.GetEvent(), deferred);
1017+
THandlersVisitor visitor(this->Settings, eventInfo.GetEvent(), deferred, CbContext);
9681018
return visitor.Visit();
9691019
}
9701020

9711021
private:
9721022
bool HasEventCallbacks;
1023+
TCallbackContextPtr<UseMigrationProtocol> CbContext;
9731024
};
9741025

9751026
} // namespace NYdb::NTopic
@@ -1045,6 +1096,7 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
10451096
void Start();
10461097
void ConfirmPartitionStreamCreate(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset);
10471098
void ConfirmPartitionStreamDestroy(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream);
1099+
void ConfirmPartitionStreamEnd(TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, const std::vector<ui32>& childIds);
10481100
void RequestPartitionStreamStatus(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream);
10491101
void Commit(const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, ui64 startOffset, ui64 endOffset);
10501102

@@ -1092,6 +1144,16 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
10921144
return Log;
10931145
}
10941146

1147+
void RegisterParentPartition(ui32 partitionId, ui32 parentPartitionId, ui64 parentPartitionSessionId);
1148+
void UnregisterPartition(ui32 partitionId, ui64 partitionSessionId);
1149+
std::vector<ui64> GetParentPartitionSessions(ui32 partitionId, ui64 partitionSessionId);
1150+
bool AllParentSessionsHasBeenRead(ui32 partitionId, ui64 partitionSessionId);
1151+
1152+
void SetSelfContext(TPtr ptr) {
1153+
TEnableSelfContext<TSingleClusterReadSessionImpl<UseMigrationProtocol>>::SetSelfContext(std::move(ptr));
1154+
EventsQueue->SetCallbackContext(TEnableSelfContext<TSingleClusterReadSessionImpl<UseMigrationProtocol>>::SelfContext);
1155+
}
1156+
10951157
private:
10961158
void BreakConnectionAndReconnectImpl(TPlainStatus&& status, TDeferredActions<UseMigrationProtocol>& deferred);
10971159

@@ -1273,6 +1335,14 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
12731335
std::atomic<int> DecompressionTasksInflight = 0;
12741336
i64 ReadSizeBudget;
12751337
i64 ReadSizeServerDelta = 0;
1338+
1339+
struct TParentInfo {
1340+
ui32 PartitionId;
1341+
ui64 PartitionSessionId;
1342+
};
1343+
1344+
std::unordered_map<ui32, std::vector<TParentInfo>> HierarchyData;
1345+
std::unordered_set<ui64> ReadingFinishedData;
12761346
};
12771347

12781348
} // namespace NYdb::NTopic

0 commit comments

Comments
 (0)