@@ -729,6 +729,10 @@ class TPartitionStreamImpl : public TAPartitionStream<UseMigrationProtocol> {
729729 EventsQueue.clear ();
730730 }
731731
732+ TRawPartitionStreamEventQueue<UseMigrationProtocol> ExtractQueue () noexcept {
733+ return std::move (EventsQueue);
734+ }
735+
732736 static void GetDataEventImpl (TIntrusivePtr<TPartitionStreamImpl<UseMigrationProtocol>> partitionStream,
733737 size_t & maxEventsCount,
734738 size_t & maxByteSize,
@@ -786,14 +790,16 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
786790
787791 bool Close (const TASessionClosedEvent<UseMigrationProtocol>& event, TDeferredActions<UseMigrationProtocol>& deferred) {
788792 TWaiter waiter;
793+ TVector<TRawPartitionStreamEventQueue<UseMigrationProtocol>> defferedDelete;
789794 with_lock (TParent::Mutex) {
790795 if (TParent::Closed) {
791796 return false ;
792797 }
798+ defferedDelete.reserve (TParent::Events.size ());
793799 while (!TParent::Events.empty ()) {
794800 auto & event = TParent::Events.front ();
795801 if (!event.IsEmpty ()) {
796- event.PartitionStream ->ClearQueue ( );
802+ defferedDelete. push_back ( event.PartitionStream ->ExtractQueue () );
797803 }
798804 TParent::Events.pop ();
799805 }
@@ -802,6 +808,9 @@ class TReadSessionEventsQueue: public TBaseSessionEventsQueue<TAReadSessionSetti
802808 waiter = TWaiter (TParent::Waiter.ExtractPromise (), this );
803809 }
804810
811+ // Delayed deletion is necessary to avoid deadlock with PushEvent
812+ defferedDelete.clear ();
813+
805814 TReadSessionEventInfo<UseMigrationProtocol> info (event);
806815 ApplyHandler (info, deferred);
807816 deferred.DeferSignalWaiter (std::move (waiter));
0 commit comments