@@ -139,6 +139,7 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
139139 TMaybe<ui64> Target;
140140 TMaybe<NKikimrTx::TReadSetData::EDecision> Decision;
141141 TMaybe<ui64> Producer;
142+ TMaybe<size_t > Count;
142143 };
143144
144145 struct TReadSetAckMatcher {
@@ -191,6 +192,7 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
191192 void WaitPlanStepAccepted (const TPlanStepAcceptedMatcher& matcher = {});
192193
193194 void WaitReadSet (NHelpers::TPQTabletMock& tablet, const TReadSetMatcher& matcher);
195+ void WaitReadSetEx (NHelpers::TPQTabletMock& tablet, const TReadSetMatcher& matcher);
194196 void SendReadSet (const TReadSetParams& params);
195197
196198 void WaitReadSetAck (NHelpers::TPQTabletMock& tablet, const TReadSetAckMatcher& matcher);
@@ -236,6 +238,9 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
236238 void StartPQCalcPredicateObserver (size_t & received);
237239 void WaitForPQCalcPredicate (size_t & received, size_t expected);
238240
241+ void WaitForTxState (ui64 txId, NKikimrPQ::TTransaction::EState state);
242+ void WaitForExecStep (ui64 step);
243+
239244 //
240245 // TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait
241246 //
@@ -456,6 +461,15 @@ void TPQTabletFixture::WaitReadSet(NHelpers::TPQTabletMock& tablet, const TReadS
456461 }
457462}
458463
464+ void TPQTabletFixture::WaitReadSetEx (NHelpers::TPQTabletMock& tablet, const TReadSetMatcher& matcher)
465+ {
466+ TDispatchOptions options;
467+ options.CustomFinalCondition = [&]() {
468+ return tablet.ReadSets [std::make_pair (*matcher.Step , *matcher.TxId )].size () >= *matcher.Count ;
469+ };
470+ UNIT_ASSERT (Ctx->Runtime ->DispatchEvents (options));
471+ }
472+
459473void TPQTabletFixture::SendReadSet (const TReadSetParams& params)
460474{
461475 NKikimrTx::TReadSetData payload;
@@ -952,6 +966,70 @@ void TPQTabletFixture::WaitForPQCalcPredicate(size_t& received, size_t expected)
952966 UNIT_ASSERT (Ctx->Runtime ->DispatchEvents (options));
953967}
954968
969+ void TPQTabletFixture::WaitForTxState (ui64 txId, NKikimrPQ::TTransaction::EState state)
970+ {
971+ const TString key = GetTxKey (txId);
972+
973+ while (true ) {
974+ auto request = std::make_unique<TEvKeyValue::TEvRequest>();
975+ request->Record .SetCookie (12345 );
976+ auto cmd = request->Record .AddCmdReadRange ();
977+ auto range = cmd->MutableRange ();
978+ range->SetFrom (key);
979+ range->SetIncludeFrom (true );
980+ range->SetTo (key);
981+ range->SetIncludeTo (true );
982+ cmd->SetIncludeData (true );
983+ SendToPipe (Ctx->Edge , request.release ());
984+
985+ auto response = Ctx->Runtime ->GrabEdgeEvent <TEvKeyValue::TEvResponse>();
986+ UNIT_ASSERT_VALUES_EQUAL (response->Record .GetStatus (), NMsgBusProxy::MSTATUS_OK);
987+ const auto & result = response->Record .GetReadRangeResult (0 );
988+ UNIT_ASSERT_VALUES_EQUAL (result.GetStatus (), static_cast <ui32>(NKikimrProto::OK));
989+ const auto & pair = result.GetPair (0 );
990+
991+ NKikimrPQ::TTransaction tx;
992+ Y_ABORT_UNLESS (tx.ParseFromString (pair.GetValue ()));
993+
994+ if (tx.GetState () == state) {
995+ return ;
996+ }
997+ }
998+
999+ UNIT_FAIL (" transaction " << txId << " has not entered the " << state << " state" );
1000+ }
1001+
1002+ void TPQTabletFixture::WaitForExecStep (ui64 step)
1003+ {
1004+ while (true ) {
1005+ auto request = std::make_unique<TEvKeyValue::TEvRequest>();
1006+ request->Record .SetCookie (12345 );
1007+ auto cmd = request->Record .AddCmdReadRange ();
1008+ auto range = cmd->MutableRange ();
1009+ range->SetFrom (" _txinfo" );
1010+ range->SetIncludeFrom (true );
1011+ range->SetTo (" _txinfo" );
1012+ range->SetIncludeTo (true );
1013+ cmd->SetIncludeData (true );
1014+ SendToPipe (Ctx->Edge , request.release ());
1015+
1016+ auto response = Ctx->Runtime ->GrabEdgeEvent <TEvKeyValue::TEvResponse>();
1017+ UNIT_ASSERT_VALUES_EQUAL (response->Record .GetStatus (), NMsgBusProxy::MSTATUS_OK);
1018+ const auto & result = response->Record .GetReadRangeResult (0 );
1019+ UNIT_ASSERT_VALUES_EQUAL (result.GetStatus (), static_cast <ui32>(NKikimrProto::OK));
1020+ const auto & pair = result.GetPair (0 );
1021+
1022+ NKikimrPQ::TTabletTxInfo txInfo;
1023+ Y_ABORT_UNLESS (txInfo.ParseFromString (pair.GetValue ()));
1024+
1025+ if (txInfo.GetExecStep () == step) {
1026+ return ;
1027+ }
1028+ }
1029+
1030+ UNIT_FAIL (" expected execution step " << step);
1031+ }
1032+
9551033Y_UNIT_TEST_F (Parallel_Transactions_1, TPQTabletFixture)
9561034{
9571035 TestParallelTransactions (" consumer" , " consumer" );
@@ -1730,6 +1808,62 @@ Y_UNIT_TEST_F(Huge_ProposeTransacton, TPQTabletFixture)
17301808 WaitPlanStepAccepted ({.Step =100 });
17311809}
17321810
1811+ Y_UNIT_TEST_F (After_Restarting_The_Tablet_Sends_A_TEvReadSet_For_Transactions_In_The_EXECUTED_State, TPQTabletFixture)
1812+ {
1813+ const ui64 txId_1 = 67890 ;
1814+ const ui64 txId_2 = txId_1 + 1 ;
1815+ const ui64 mockTabletId = 22222 ;
1816+
1817+ NHelpers::TPQTabletMock* tablet = CreatePQTabletMock (mockTabletId);
1818+ PQTabletPrepare ({.partitions =1 }, {}, *Ctx);
1819+
1820+ // 1st tx
1821+ SendProposeTransactionRequest ({.TxId =txId_1,
1822+ .Senders ={mockTabletId}, .Receivers ={mockTabletId},
1823+ .TxOps ={
1824+ {.Partition =0 , .Consumer =" user" , .Begin =0 , .End =0 , .Path =" /topic" },
1825+ }});
1826+ WaitProposeTransactionResponse ({.TxId =txId_1,
1827+ .Status =NKikimrPQ::TEvProposeTransactionResult::PREPARED});
1828+
1829+ SendPlanStep ({.Step =100 , .TxIds ={txId_1}});
1830+
1831+ WaitForCalcPredicateResult ();
1832+
1833+ tablet->SendReadSet (*Ctx->Runtime , {.Step =100 , .TxId =txId_1, .Target =Ctx->TabletId , .Decision =NKikimrTx::TReadSetData::DECISION_COMMIT});
1834+
1835+ WaitProposeTransactionResponse ({.TxId =txId_1,
1836+ .Status =NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
1837+
1838+ WaitForTxState (txId_1, NKikimrPQ::TTransaction::EXECUTED);
1839+
1840+ tablet->ReadSet = Nothing ();
1841+
1842+ // 2nd tx
1843+ SendProposeTransactionRequest ({.TxId =txId_2,
1844+ .Senders ={mockTabletId}, .Receivers ={mockTabletId},
1845+ .TxOps ={
1846+ {.Partition =0 , .Consumer =" user" , .Begin =0 , .End =0 , .Path =" /topic" },
1847+ }});
1848+ WaitProposeTransactionResponse ({.TxId =txId_2,
1849+ .Status =NKikimrPQ::TEvProposeTransactionResult::PREPARED});
1850+
1851+ SendPlanStep ({.Step =110 , .TxIds ={txId_2}});
1852+
1853+ WaitForCalcPredicateResult ();
1854+
1855+ WaitReadSetEx (*tablet, {.Step =110 , .TxId =txId_2, .Decision =NKikimrTx::TReadSetData::DECISION_COMMIT, .Count =1 });
1856+
1857+ // the PQ tablet has moved a step forward
1858+ WaitForExecStep (110 );
1859+
1860+ // restart PQ tablet
1861+ PQTabletRestart (*Ctx);
1862+
1863+ // the PQ tablet should send a TEvReadSet for the executed transaction
1864+ WaitReadSetEx (*tablet, {.Step =100 , .TxId =txId_1, .Decision =NKikimrTx::TReadSetData::DECISION_COMMIT, .Count =2 });
1865+ }
1866+
17331867}
17341868
17351869}
0 commit comments