@@ -489,7 +489,9 @@ struct TRawPartitionStreamEvent {
489489template <bool UseMigrationProtocol>
490490class TRawPartitionStreamEventQueue {
491491public:
492- TRawPartitionStreamEventQueue () = default ;
492+ TRawPartitionStreamEventQueue (TCallbackContextPtr<UseMigrationProtocol> cbContext)
493+ : CbContext(cbContext) {
494+ };
493495
494496 template <class ... Ts>
495497 TRawPartitionStreamEvent<UseMigrationProtocol>& emplace_back (Ts&&... event)
@@ -550,6 +552,7 @@ class TRawPartitionStreamEventQueue {
550552 std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>>& queue);
551553
552554private:
555+ TCallbackContextPtr<UseMigrationProtocol> CbContext;
553556 std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> Ready;
554557 std::deque<TRawPartitionStreamEvent<UseMigrationProtocol>> NotReady;
555558};
@@ -583,6 +586,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
583586 , AssignId(assignId)
584587 , FirstNotReadOffset(readOffset)
585588 , CbContext(std::move(cbContext))
589+ , EventsQueue(CbContext)
586590 {
587591 TAPartitionStream<true >::PartitionStreamId = partitionStreamId;
588592 TAPartitionStream<true >::TopicPath = std::move (topicPath);
@@ -603,6 +607,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
603607 , AssignId(static_cast <ui64>(assignId))
604608 , FirstNotReadOffset(static_cast <ui64>(readOffset))
605609 , CbContext(std::move(cbContext))
610+ , EventsQueue(CbContext)
606611 {
607612 TAPartitionStream<false >::PartitionSessionId = partitionStreamId;
608613 TAPartitionStream<false >::TopicPath = std::move (topicPath);
@@ -625,6 +630,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
625630
626631 void ConfirmCreate (TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset);
627632 void ConfirmDestroy ();
633+ void ConfirmEnd (const std::vector<ui32>& childIds);
628634
629635 void StopReading () /* override*/ ;
630636 void ResumeReading () /* override*/ ;
@@ -764,6 +770,7 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
764770 TMutex Lock;
765771};
766772
773+
767774template <bool UseMigrationProtocol>
768775class TReadSessionEventsQueue : public TBaseSessionEventsQueue <TAReadSessionSettings<UseMigrationProtocol>,
769776 typename TAReadSessionEvent<UseMigrationProtocol>::TEvent,
@@ -823,7 +830,8 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
823830 }
824831
825832 bool TryApplyCallbackToEventImpl (typename TParent::TEvent& event,
826- TDeferredActions<UseMigrationProtocol>& deferred);
833+ TDeferredActions<UseMigrationProtocol>& deferred,
834+ TCallbackContextPtr<UseMigrationProtocol>& cbContext);
827835 bool HasDataEventCallback () const ;
828836 void ApplyCallbackToEventImpl (TADataReceivedEvent<UseMigrationProtocol>& event,
829837 TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>&& eventsInfo,
@@ -859,13 +867,19 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
859867
860868 void ClearAllEvents ();
861869
870+ void SetCallbackContext (TCallbackContextPtr<UseMigrationProtocol>& ctx) {
871+ CbContext = ctx;
872+ }
873+
862874private:
863875 struct THandlersVisitor : public TParent ::TBaseHandlersVisitor {
864876 THandlersVisitor (const TAReadSessionSettings<UseMigrationProtocol>& settings,
865877 typename TParent::TEvent& event,
866- TDeferredActions<UseMigrationProtocol>& deferred)
878+ TDeferredActions<UseMigrationProtocol>& deferred,
879+ TCallbackContextPtr<UseMigrationProtocol>& cbContext)
867880 : TParent::TBaseHandlersVisitor(settings, event)
868- , Deferred(deferred) {
881+ , Deferred(deferred)
882+ , CbContext(cbContext) {
869883 }
870884
871885#define DECLARE_HANDLER (type, handler, answer ) \
@@ -880,7 +894,7 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
880894 } \
881895 /* */
882896
883- #define DECLARE_TEMPLATE_HANDLER (type_true, type_false, handler_true, handler_false, answer ) \
897+ #define DECLARE_TEMPLATE_HANDLER (type_true, type_false, handler_true, handler_false ) \
884898 bool operator ()(std::conditional_t <UseMigrationProtocol, type_true, type_false>&) { \
885899 if (this ->template PushHandler <std::conditional_t <UseMigrationProtocol, type_true, type_false>>( \
886900 std::move (TParent::TBaseHandlersVisitor::Event), \
@@ -892,7 +906,7 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
892906 } \
893907 }(), \
894908 this ->Settings .EventHandlers_ .CommonHandler_ )) { \
895- return answer; \
909+ return true ; \
896910 } \
897911 return false ; \
898912 } \
@@ -901,50 +915,81 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
901915 DECLARE_TEMPLATE_HANDLER (typename TAReadSessionEvent<true >::TDataReceivedEvent,
902916 typename TAReadSessionEvent<false >::TDataReceivedEvent,
903917 DataReceivedHandler_,
904- DataReceivedHandler_,
905- true );
918+ DataReceivedHandler_);
906919 DECLARE_TEMPLATE_HANDLER (typename TAReadSessionEvent<true >::TCommitAcknowledgementEvent,
907920 typename TAReadSessionEvent<false >::TCommitOffsetAcknowledgementEvent,
908921 CommitAcknowledgementHandler_,
909- CommitOffsetAcknowledgementHandler_,
910- true );
922+ CommitOffsetAcknowledgementHandler_);
911923 DECLARE_TEMPLATE_HANDLER (typename TAReadSessionEvent<true >::TCreatePartitionStreamEvent,
912924 typename TAReadSessionEvent<false >::TStartPartitionSessionEvent,
913925 CreatePartitionStreamHandler_,
914- StartPartitionSessionHandler_,
915- true );
926+ StartPartitionSessionHandler_);
916927 DECLARE_TEMPLATE_HANDLER (typename TAReadSessionEvent<true >::TDestroyPartitionStreamEvent,
917928 typename TAReadSessionEvent<false >::TStopPartitionSessionEvent,
918929 DestroyPartitionStreamHandler_,
919- StopPartitionSessionHandler_,
920- true );
930+ StopPartitionSessionHandler_);
921931 DECLARE_TEMPLATE_HANDLER (typename TAReadSessionEvent<true >::TPartitionStreamStatusEvent,
922932 typename TAReadSessionEvent<false >::TPartitionSessionStatusEvent,
923933 PartitionStreamStatusHandler_,
924- PartitionSessionStatusHandler_,
925- true );
926- DECLARE_TEMPLATE_HANDLER (typename TAReadSessionEvent<true >::TPartitionStreamClosedEvent,
927- typename TAReadSessionEvent<false >::TPartitionSessionClosedEvent,
928- PartitionStreamClosedHandler_,
929- PartitionSessionClosedHandler_,
930- true );
934+ PartitionSessionStatusHandler_);
931935 DECLARE_HANDLER (TASessionClosedEvent<UseMigrationProtocol>, SessionClosedHandler_, false ); // Not applied
932936
933937#undef DECLARE_HANDLER
934938#undef DECLARE_TEMPLATE_HANDLER
935939
940+ bool operator ()(std::conditional_t <UseMigrationProtocol, typename TAReadSessionEvent<true >::TPartitionStreamClosedEvent, typename TAReadSessionEvent<false >::TPartitionSessionClosedEvent>&) {
941+ auto specific = [this ]() {
942+ if constexpr (UseMigrationProtocol) {
943+ return this ->Settings .EventHandlers_ .PartitionStreamClosedHandler_ ;
944+ } else {
945+ return this ->Settings .EventHandlers_ .PartitionSessionClosedHandler_ ;
946+ }
947+ }();
948+
949+ if (!specific && !this ->Settings .EventHandlers_ .CommonHandler_ ) {
950+ return false ;
951+ }
952+
953+ this ->template PushCommonHandler <>(
954+ std::move (TParent::TBaseHandlersVisitor::Event),
955+ [specific = specific,
956+ common = this ->Settings .EventHandlers_ .CommonHandler_ ,
957+ cbContext = CbContext](auto & event) {
958+ auto & e = std::get<std::conditional_t <UseMigrationProtocol, typename TAReadSessionEvent<true >::TPartitionStreamClosedEvent, typename TAReadSessionEvent<false >::TPartitionSessionClosedEvent>>(event);
959+ if (specific) {
960+ specific (e);
961+ } else if (common) {
962+ common (event);
963+ }
964+ if constexpr (!UseMigrationProtocol) {
965+ if (auto session = cbContext->LockShared ()) {
966+ session->UnregisterPartition (e.GetPartitionSession ()->GetPartitionId (), e.GetPartitionSession ()->GetPartitionSessionId ());
967+ }
968+ }
969+ });
970+
971+ return true ;
972+ }
973+
936974 template <bool E = !UseMigrationProtocol>
937975 constexpr std::enable_if_t <E, bool >
938976 operator ()(typename TAReadSessionEvent<false >::TEndPartitionSessionEvent&) {
939- if (this ->template PushHandler <typename TAReadSessionEvent<false >::TEndPartitionSessionEvent>(
940- std::move (TParent::TBaseHandlersVisitor::Event),
941- [this ](){
942- return this ->Settings .EventHandlers_ .EndPartitionSessionHandler_ ;
943- }(),
944- this ->Settings .EventHandlers_ .CommonHandler_ )) {
977+ if (!this ->Settings .EventHandlers_ .EndPartitionSessionHandler_ && !this ->Settings .EventHandlers_ .CommonHandler_ ) {
945978 return false ;
946979 }
947- return false ;
980+ this ->template PushCommonHandler <>(
981+ std::move (TParent::TBaseHandlersVisitor::Event),
982+ [specific = this ->Settings .EventHandlers_ .EndPartitionSessionHandler_ ,
983+ common = this ->Settings .EventHandlers_ .CommonHandler_ ,
984+ cbContext = CbContext](TReadSessionEvent::TEvent& event) {
985+ auto & e = std::get<TReadSessionEvent::TEndPartitionSessionEvent>(event);
986+ if (specific) {
987+ specific (e);
988+ } else if (common) {
989+ common (event);
990+ }
991+ });
992+ return true ;
948993 }
949994
950995 bool Visit () {
@@ -956,6 +1001,7 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
9561001 }
9571002
9581003 TDeferredActions<UseMigrationProtocol>& Deferred;
1004+ TCallbackContextPtr<UseMigrationProtocol> CbContext;
9591005 };
9601006
9611007 TADataReceivedEvent<UseMigrationProtocol>
@@ -964,12 +1010,13 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
9641010 TUserRetrievedEventsInfoAccumulator<UseMigrationProtocol>& accumulator); // Assumes that we're under lock.
9651011
9661012 bool ApplyHandler (TReadSessionEventInfo<UseMigrationProtocol>& eventInfo, TDeferredActions<UseMigrationProtocol>& deferred) {
967- THandlersVisitor visitor (this ->Settings , eventInfo.GetEvent (), deferred);
1013+ THandlersVisitor visitor (this ->Settings , eventInfo.GetEvent (), deferred, CbContext );
9681014 return visitor.Visit ();
9691015 }
9701016
9711017private:
9721018 bool HasEventCallbacks;
1019+ TCallbackContextPtr<UseMigrationProtocol> CbContext;
9731020};
9741021
9751022} // namespace NYdb::NTopic
@@ -1045,6 +1092,7 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
10451092 void Start ();
10461093 void ConfirmPartitionStreamCreate (const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, TMaybe<ui64> readOffset, TMaybe<ui64> commitOffset);
10471094 void ConfirmPartitionStreamDestroy (TPartitionStreamImpl<UseMigrationProtocol>* partitionStream);
1095+ void ConfirmPartitionStreamEnd (TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, const std::vector<ui32>& childIds);
10481096 void RequestPartitionStreamStatus (const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream);
10491097 void Commit (const TPartitionStreamImpl<UseMigrationProtocol>* partitionStream, ui64 startOffset, ui64 endOffset);
10501098
@@ -1092,6 +1140,16 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
10921140 return Log;
10931141 }
10941142
1143+ void RegisterParentPartition (ui32 partitionId, ui32 parentPartitionId, ui64 parentPartitionSessionId);
1144+ void UnregisterPartition (ui32 partitionId, ui64 partitionSessionId);
1145+ std::vector<ui64> GetParentPartitionSessions (ui32 partitionId, ui64 partitionSessionId);
1146+ bool AllParentSessionsHasBeenRead (ui32 partitionId, ui64 partitionSessionId);
1147+
1148+ void SetSelfContext (TPtr ptr) {
1149+ TEnableSelfContext<TSingleClusterReadSessionImpl<UseMigrationProtocol>>::SetSelfContext (std::move (ptr));
1150+ EventsQueue->SetCallbackContext (TEnableSelfContext<TSingleClusterReadSessionImpl<UseMigrationProtocol>>::SelfContext);
1151+ }
1152+
10951153private:
10961154 void BreakConnectionAndReconnectImpl (TPlainStatus&& status, TDeferredActions<UseMigrationProtocol>& deferred);
10971155
@@ -1273,6 +1331,14 @@ class TSingleClusterReadSessionImpl : public TEnableSelfContext<TSingleClusterRe
12731331 std::atomic<int > DecompressionTasksInflight = 0 ;
12741332 i64 ReadSizeBudget;
12751333 i64 ReadSizeServerDelta = 0 ;
1334+
1335+ struct TParentInfo {
1336+ ui32 PartitionId;
1337+ ui64 PartitionSessionId;
1338+ };
1339+
1340+ std::unordered_map<ui32, std::vector<TParentInfo>> HierarchyData;
1341+ std::unordered_set<ui64> ReadingFinishedData;
12761342};
12771343
12781344} // namespace NYdb::NTopic
0 commit comments