@@ -932,50 +932,55 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
932932 typename TAReadSessionEvent<false >::TPartitionSessionStatusEvent,
933933 PartitionStreamStatusHandler_,
934934 PartitionSessionStatusHandler_);
935- DECLARE_TEMPLATE_HANDLER (typename TAReadSessionEvent<true >::TPartitionStreamClosedEvent,
936- typename TAReadSessionEvent<false >::TPartitionSessionClosedEvent,
937- PartitionStreamClosedHandler_,
938- PartitionSessionClosedHandler_);
935+ // DECLARE_TEMPLATE_HANDLER(typename TAReadSessionEvent<true>::TPartitionStreamClosedEvent,
936+ // typename TAReadSessionEvent<false>::TPartitionSessionClosedEvent,
937+ // PartitionStreamClosedHandler_,
938+ // PartitionSessionClosedHandler_);
939939 DECLARE_HANDLER (TASessionClosedEvent<UseMigrationProtocol>, SessionClosedHandler_, false ); // Not applied
940940
941941#undef DECLARE_HANDLER
942942#undef DECLARE_TEMPLATE_HANDLER
943943
944- using TPartitionSessionClosedEvent = std::conditional_t <UseMigrationProtocol,
945- TAReadSessionEvent<true >::TPartitionStreamClosedEvent,
946- TAReadSessionEvent<false >::TPartitionSessionClosedEvent>;
947-
948- /*
949- bool operator()(TPartitionSessionClosedEvent&) {
950- auto specificProvider = [this](){
944+ bool operator ()(std::conditional_t <UseMigrationProtocol, typename TAReadSessionEvent<true >::TPartitionStreamClosedEvent, typename TAReadSessionEvent<false >::TPartitionSessionClosedEvent>&) {
945+ auto specific = [this ]() {
951946 if constexpr (UseMigrationProtocol) {
952947 return this ->Settings .EventHandlers_ .PartitionStreamClosedHandler_ ;
953948 } else {
954949 return this ->Settings .EventHandlers_ .PartitionSessionClosedHandler_ ;
955950 }
956- };
951+ }();
952+
953+ if (!specific && !this ->Settings .EventHandlers_ .CommonHandler_ ) {
954+ return false ;
955+ }
957956
958957 this ->template PushCommonHandler <>(
959958 std::move (TParent::TBaseHandlersVisitor::Event),
960- [specific = specificProvider() ,
959+ [specific = specific ,
961960 common = this ->Settings .EventHandlers_ .CommonHandler_ ,
962- cbContext = CbContext](TReadSessionEvent::TEvent & event) {
963- auto& e = std::get<TPartitionSessionClosedEvent>(event);
961+ cbContext = CbContext](auto & event) {
962+ auto & e = std::get<std:: conditional_t <UseMigrationProtocol, typename TAReadSessionEvent< true >::TPartitionStreamClosedEvent, typename TAReadSessionEvent< false >:: TPartitionSessionClosedEvent> >(event);
964963 if (specific) {
965964 specific (e);
966965 } else if (common) {
967966 common (event);
968967 }
969- if (auto session = cbContext->LockShared()) {
970- session->UnregisterPartition(e.GetPartitionSession()->GetPartitionId(), e.GetPartitionSession()->GetPartitionSessionId());
968+ if constexpr (!UseMigrationProtocol) {
969+ if (auto session = cbContext->LockShared ()) {
970+ session->UnregisterPartition (e.GetPartitionSession ()->GetPartitionId (), e.GetPartitionSession ()->GetPartitionSessionId ());
971+ }
971972 }
972973 });
973- return specificProvider() || this->Settings.EventHandlers_.CommonHandler_;
974+
975+ return true ;
974976 }
975- */
977+
976978 template <bool E = !UseMigrationProtocol>
977979 constexpr std::enable_if_t <E, bool >
978980 operator ()(typename TAReadSessionEvent<false >::TEndPartitionSessionEvent&) {
981+ if (!this ->Settings .EventHandlers_ .EndPartitionSessionHandler_ && !this ->Settings .EventHandlers_ .CommonHandler_ ) {
982+ return false ;
983+ }
979984 this ->template PushCommonHandler <>(
980985 std::move (TParent::TBaseHandlersVisitor::Event),
981986 [specific = this ->Settings .EventHandlers_ .EndPartitionSessionHandler_ ,
@@ -987,11 +992,8 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
987992 } else if (common) {
988993 common (event);
989994 }
990- // if (auto session = cbContext->LockShared()) {
991- // session->SetReadingFinished(e.GetPartitionSession()->GetPartitionSessionId(), e.GetChildPartitionIds());
992- // }
993995 });
994- return this -> Settings . EventHandlers_ . EndPartitionSessionHandler_ || this -> Settings . EventHandlers_ . CommonHandler_ ;
996+ return true ;
995997 }
996998
997999 bool Visit () {
0 commit comments