@@ -271,10 +271,14 @@ void TPartitionActor::Handle(TEvPQProxy::TEvDirectReadAck::TPtr& ev, const TActo
271271 if (DirectReadRestoreStage != EDirectReadRestoreStage::None) {
272272 if (RestoredDirectReadId == ev->Get ()->DirectReadId ) {
273273 // This direct read is already being restored. Have to forget it later.
274+ LOG_DEBUG_S (ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Got ack for direct read " << ev->Get ()->DirectReadId
275+ << " while restoring, store it to forget further" );
274276 DirectReadsToForget.insert (ev->Get ()->DirectReadId );
275277 return ;
276278 }
277279 if (DirectReadsToRestore.contains (ev->Get ()->DirectReadId )) {
280+ LOG_DEBUG_S (ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Got ack for direct read " << ev->Get ()->DirectReadId
281+ << " while restoring, remove it from restore list" );
278282 // This direct read is pending for restore. No need to foreget - not yet prepared, just erase it;
279283 DirectReadsToRestore.erase (ev->Get ()->DirectReadId );
280284 DirectReadsToPublish.erase (ev->Get ()->DirectReadId );
@@ -316,6 +320,7 @@ void TPartitionActor::Handle(const TEvPQProxy::TEvRestartPipe::TPtr&, const TAct
316320 DirectReadsToRestore = DirectReadResults;
317321 DirectReadsToPublish = PublishedDirectReads;
318322 Y_ABORT_UNLESS (!DirectReadsToPublish.contains (DirectReadId));
323+ RestoredDirectReadId = 0 ;
319324 RestartDirectReadSession ();
320325 return ;
321326 }
@@ -634,6 +639,7 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
634639 break ;
635640 case EDirectReadRestoreStage::Session:
636641 Y_ABORT_UNLESS (result.HasCmdRestoreDirectReadResult ());
642+ LOG_DEBUG_S (ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Direct read - session restarted for partition " << Partition);
637643 if (!SendNextRestorePrepareOrForget ()) {
638644 OnDirectReadsRestored ();
639645 }
@@ -654,6 +660,7 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
654660 return ;
655661 case EDirectReadRestoreStage::Publish:
656662 Y_ABORT_UNLESS (RestoredDirectReadId != 0 );
663+
657664 Y_ABORT_UNLESS (result.HasCmdPublishReadResult ());
658665 Y_ABORT_UNLESS (*DirectReadsToPublish.begin () == result.GetCmdPublishReadResult ().GetDirectReadId ());
659666 DirectReadsToPublish.erase (DirectReadsToPublish.begin ());
@@ -730,6 +737,8 @@ void TPartitionActor::Handle(TEvPersQueue::TEvResponse::TPtr& ev, const TActorCo
730737
731738 Y_ABORT_UNLESS (DirectRead);
732739 Y_ABORT_UNLESS (res.GetDirectReadId () == DirectReadId);
740+ if (!PipeClient)
741+ return ; // Pipe was already destroyed, direct read session is being restored. Will resend this request afterwards;
733742
734743 EndOffset = res.GetEndOffset ();
735744 SizeLag = res.GetSizeLag ();
@@ -1092,13 +1101,18 @@ bool TPartitionActor::SendNextRestorePrepareOrForget() {
10921101 if (shouldForget) {
10931102 // We have something to forget from what was already restored; Do NOT change RestoredDirectReadId
10941103 DirectReadRestoreStage = EDirectReadRestoreStage::Forget;
1104+ LOG_DEBUG_S (ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Restore direct read, forget id "
1105+ << *DirectReadsToForget.begin () << " for partition " << Partition);
10951106 SendForgetDirectRead (*DirectReadsToForget.begin (), ctx);
10961107 return true ;
10971108 } else {
1098- LOG_DEBUG_S (ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Resend prepare direct read id " << prepareId << " for partition " << Partition);
1109+ auto & dr = DirectReadsToRestore.begin ()->second ;
1110+ LOG_DEBUG_S (ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Resend prepare direct read id " << prepareId
1111+ << " (internal id: " << dr.GetDirectReadId () << " ) for partition " << Partition);
10991112 Y_ABORT_UNLESS (prepareId != 0 );
1113+
11001114 // Restore;
1101- auto & dr = DirectReadsToRestore. begin ()-> second ;
1115+ Y_ABORT_UNLESS (prepareId == dr. GetDirectReadId ()) ;
11021116
11031117 Y_ABORT_UNLESS (RestoredDirectReadId < dr.GetDirectReadId ());
11041118 RestoredDirectReadId = dr.GetDirectReadId ();
@@ -1125,8 +1139,8 @@ bool TPartitionActor::SendNextRestorePublishRequest() {
11251139 return false ;
11261140 }
11271141 auto id = *DirectReadsToPublish.begin ();
1128- LOG_DEBUG_S (ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " " << Partition
1129- << " Resend publish direct read on restore, id: " << id );
1142+ LOG_DEBUG_S (ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " Resend publish direct read on restore, id: "
1143+ << id << " for partition " << Partition );
11301144
11311145 Y_ABORT_UNLESS (RestoredDirectReadId == id);
11321146 DirectReadRestoreStage = EDirectReadRestoreStage::Publish;
@@ -1298,7 +1312,6 @@ void TPartitionActor::Handle(TEvPQProxy::TEvRead::TPtr& ev, const TActorContext&
12981312
12991313 Y_ABORT_UNLESS (ReadGuid.empty ());
13001314 Y_ABORT_UNLESS (!RequestInfly);
1301- Y_ABORT_UNLESS (DirectReadRestoreStage == EDirectReadRestoreStage::None);
13021315
13031316 ReadGuid = ev->Get ()->Guid ;
13041317
@@ -1311,6 +1324,12 @@ void TPartitionActor::Handle(TEvPQProxy::TEvRead::TPtr& ev, const TActorContext&
13111324 if (!PipeClient) // Pipe will be recreated soon
13121325 return ;
13131326
1327+ if (DirectReadRestoreStage != EDirectReadRestoreStage::None) {
1328+ LOG_DEBUG_S (ctx, NKikimrServices::PQ_READ_PROXY, PQ_LOG_PREFIX << " READ FROM " << Partition
1329+ << " store this request utill direct read is restored" );
1330+ return ;
1331+ }
1332+
13141333 TAutoPtr<TEvPersQueue::TEvRequest> event (new TEvPersQueue::TEvRequest);
13151334 event->Record .Swap (&request);
13161335
0 commit comments