@@ -245,6 +245,8 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
245245 void InterceptSaveTxState (TAutoPtr<IEventHandle>& event);
246246 void SendSaveTxState (TAutoPtr<IEventHandle>& event);
247247
248+ void WaitForTheTransactionToBeDeleted (ui64 txId);
249+
248250 //
249251 // TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait
250252 //
@@ -1082,6 +1084,39 @@ void TPQTabletFixture::SendSaveTxState(TAutoPtr<IEventHandle>& event)
10821084 Ctx->Runtime ->Send (event);
10831085}
10841086
1087+ void TPQTabletFixture::WaitForTheTransactionToBeDeleted (ui64 txId)
1088+ {
1089+ const TString key = GetTxKey (txId);
1090+
1091+ for (size_t i = 0 ; i < 200 ; ++i) {
1092+ auto request = std::make_unique<TEvKeyValue::TEvRequest>();
1093+ request->Record .SetCookie (12345 );
1094+ auto cmd = request->Record .AddCmdReadRange ();
1095+ auto range = cmd->MutableRange ();
1096+ range->SetFrom (key);
1097+ range->SetIncludeFrom (true );
1098+ range->SetTo (key);
1099+ range->SetIncludeTo (true );
1100+ cmd->SetIncludeData (false );
1101+ SendToPipe (Ctx->Edge , request.release ());
1102+
1103+ auto response = Ctx->Runtime ->GrabEdgeEvent <TEvKeyValue::TEvResponse>();
1104+ UNIT_ASSERT_VALUES_EQUAL (response->Record .GetStatus (), NMsgBusProxy::MSTATUS_OK);
1105+
1106+ const auto & result = response->Record .GetReadRangeResult (0 );
1107+ if (result.GetStatus () == static_cast <ui32>(NKikimrProto::OK)) {
1108+ Ctx->Runtime ->SimulateSleep (TDuration::MilliSeconds (300 ));
1109+ continue ;
1110+ }
1111+
1112+ if (result.GetStatus () == NKikimrProto::NODATA) {
1113+ return ;
1114+ }
1115+ }
1116+
1117+ UNIT_FAIL (" Too many attempts" );
1118+ }
1119+
10851120Y_UNIT_TEST_F (Parallel_Transactions_1, TPQTabletFixture)
10861121{
10871122 TestParallelTransactions (" consumer" , " consumer" );
@@ -2017,6 +2052,61 @@ Y_UNIT_TEST_F(TEvReadSet_Is_Not_Sent_Ahead_Of_Time, TPQTabletFixture)
20172052 WaitReadSetAck (*tablet, {.Step =100 , .TxId =txId, .Source =22222 , .Target =Ctx->TabletId , .Consumer =Ctx->TabletId });
20182053}
20192054
2055+ Y_UNIT_TEST_F (TEvReadSet_For_A_Non_Existent_Tablet, TPQTabletFixture)
2056+ {
2057+ const ui64 txId = 67890 ;
2058+ const ui64 mockTabletId = MakeTabletID (false , 22222 );
2059+
2060+ // We are simulating a situation where the recipient of TEvReadSet has already completed a transaction
2061+ // and has been deleted.
2062+ //
2063+ // To do this, we "forget" the TEvReadSet from the PQ tablet and send TEvClientConnected with the Dead flag
2064+ // instead of TEvReadSetAck.
2065+ TTestActorRuntimeBase::TEventFilter prev;
2066+ auto filter = [&](TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event) -> bool {
2067+ if (auto * msg = event->CastAsLocal <TEvTxProcessing::TEvReadSet>()) {
2068+ const auto & r = msg->Record ;
2069+ if (r.GetTabletSource () == Ctx->TabletId ) {
2070+ runtime.Send (event->Sender ,
2071+ Ctx->Edge ,
2072+ new TEvTabletPipe::TEvClientConnected (mockTabletId,
2073+ NKikimrProto::ERROR,
2074+ event->Sender ,
2075+ TActorId (),
2076+ true ,
2077+ true , // Dead
2078+ 0 ));
2079+ return true ;
2080+ }
2081+ }
2082+ return false ;
2083+ };
2084+ prev = Ctx->Runtime ->SetEventFilter (filter);
2085+
2086+ NHelpers::TPQTabletMock* tablet = CreatePQTabletMock (mockTabletId);
2087+ PQTabletPrepare ({.partitions =1 }, {}, *Ctx);
2088+
2089+ SendProposeTransactionRequest ({.TxId =txId,
2090+ .Senders ={mockTabletId}, .Receivers ={mockTabletId},
2091+ .TxOps ={
2092+ {.Partition =0 , .Consumer =" user" , .Begin =0 , .End =0 , .Path =" /topic" },
2093+ }});
2094+ WaitProposeTransactionResponse ({.TxId =txId,
2095+ .Status =NKikimrPQ::TEvProposeTransactionResult::PREPARED});
2096+
2097+ SendPlanStep ({.Step =100 , .TxIds ={txId}});
2098+
2099+ // We are sending a TEvReadSet so that the PQ tablet can complete the transaction.
2100+ tablet->SendReadSet (*Ctx->Runtime ,
2101+ {.Step =100 , .TxId =txId, .Target =Ctx->TabletId , .Decision =NKikimrTx::TReadSetData::DECISION_COMMIT});
2102+
2103+ WaitProposeTransactionResponse ({.TxId =txId, .Status =NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
2104+
2105+ // Instead of TEvReadSetAck, the PQ tablet will receive TEvClientConnected with the Dead flag. The transaction
2106+ // will switch from the WAIT_RS_AKS state to the DELETING state.
2107+ WaitForTheTransactionToBeDeleted (txId);
2108+ }
2109+
20202110}
20212111
20222112}
0 commit comments