Skip to content

Commit afa2023

Browse files
The tablet leaves executed transactions (#13134) (#13139)
1 parent bfa1e00 commit afa2023

File tree

4 files changed

+145
-9
lines changed

4 files changed

+145
-9
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -980,9 +980,7 @@ void TPersQueue::ReadConfig(const NKikimrClient::TKeyValueResponse::TReadResult&
980980
Txs.emplace(tx.GetTxId(), tx);
981981

982982
if (tx.HasStep()) {
983-
if (std::make_pair(tx.GetStep(), tx.GetTxId()) >= std::make_pair(ExecStep, ExecTxId)) {
984-
PlannedTxs.emplace_back(tx.GetStep(), tx.GetTxId());
985-
}
983+
PlannedTxs.emplace_back(tx.GetStep(), tx.GetTxId());
986984
}
987985

988986
if (tx.HasWriteId()) {
@@ -4392,7 +4390,10 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
43924390
Y_ABORT_UNLESS(tx.TxId == TxsOrder[tx.State].front(),
43934391
"PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64,
43944392
TabletID(), tx.TxId, TxsOrder[tx.State].front());
4395-
4393+
Y_ABORT_UNLESS(tx.TxId == TxQueue.front().second,
4394+
"PQ %" PRIu64 ", TxId %" PRIu64 ", FrontTxId %" PRIu64,
4395+
TabletID(), tx.TxId, TxQueue.front().second);
4396+
TxQueue.pop_front();
43964397
SendEvReadSetAckToSenders(ctx, tx);
43974398

43984399
TryChangeTxState(tx, NKikimrPQ::TTransaction::WAIT_RS_ACKS);
@@ -4411,10 +4412,6 @@ void TPersQueue::CheckTxState(const TActorContext& ctx,
44114412

44124413
case NKikimrPQ::TTransaction::DELETING:
44134414
// The PQ tablet has persisted its state. Now she can delete the transaction and take the next one.
4414-
if (!TxQueue.empty() && (TxQueue.front().second == tx.TxId)) {
4415-
TxQueue.pop_front();
4416-
}
4417-
44184415
DeleteWriteId(tx.WriteId);
44194416
PQ_LOG_D("delete TxId " << tx.TxId);
44204417
Txs.erase(tx.TxId);

ydb/core/persqueue/ut/pqtablet_mock.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,10 @@ void TPQTabletMock::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorCo
9494
{
9595
Y_UNUSED(ctx);
9696

97-
ReadSet = ev->Get()->Record;
97+
const auto& record = ev->Get()->Record;
98+
99+
ReadSet = record;
100+
ReadSets[std::make_pair(record.GetStep(), record.GetTxId())].push_back(record);
98101
}
99102

100103
void TPQTabletMock::Handle(TEvTxProcessing::TEvReadSetAck::TPtr& ev, const TActorContext& ctx)

ydb/core/persqueue/ut/pqtablet_mock.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ class TPQTabletMock : public TActor<TPQTabletMock>, public NTabletFlatExecutor::
3232
TMaybe<NKikimrTx::TEvReadSet> ReadSet;
3333
TMaybe<NKikimrTx::TEvReadSetAck> ReadSetAck;
3434

35+
THashMap<std::pair<ui64, ui64>, TVector<NKikimrTx::TEvReadSet>> ReadSets;
36+
3537
private:
3638
struct TEvPQTablet {
3739
enum EEv {

ydb/core/persqueue/ut/pqtablet_ut.cpp

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
139139
TMaybe<ui64> Target;
140140
TMaybe<NKikimrTx::TReadSetData::EDecision> Decision;
141141
TMaybe<ui64> Producer;
142+
TMaybe<size_t> Count;
142143
};
143144

144145
struct TReadSetAckMatcher {
@@ -191,6 +192,7 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
191192
void WaitPlanStepAccepted(const TPlanStepAcceptedMatcher& matcher = {});
192193

193194
void WaitReadSet(NHelpers::TPQTabletMock& tablet, const TReadSetMatcher& matcher);
195+
void WaitReadSetEx(NHelpers::TPQTabletMock& tablet, const TReadSetMatcher& matcher);
194196
void SendReadSet(const TReadSetParams& params);
195197

196198
void WaitReadSetAck(NHelpers::TPQTabletMock& tablet, const TReadSetAckMatcher& matcher);
@@ -236,6 +238,9 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
236238
void StartPQCalcPredicateObserver(size_t& received);
237239
void WaitForPQCalcPredicate(size_t& received, size_t expected);
238240

241+
void WaitForTxState(ui64 txId, NKikimrPQ::TTransaction::EState state);
242+
void WaitForExecStep(ui64 step);
243+
239244
//
240245
// TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait
241246
//
@@ -456,6 +461,15 @@ void TPQTabletFixture::WaitReadSet(NHelpers::TPQTabletMock& tablet, const TReadS
456461
}
457462
}
458463

464+
void TPQTabletFixture::WaitReadSetEx(NHelpers::TPQTabletMock& tablet, const TReadSetMatcher& matcher)
465+
{
466+
TDispatchOptions options;
467+
options.CustomFinalCondition = [&]() {
468+
return tablet.ReadSets[std::make_pair(*matcher.Step, *matcher.TxId)].size() >= *matcher.Count;
469+
};
470+
UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
471+
}
472+
459473
void TPQTabletFixture::SendReadSet(const TReadSetParams& params)
460474
{
461475
NKikimrTx::TReadSetData payload;
@@ -952,6 +966,70 @@ void TPQTabletFixture::WaitForPQCalcPredicate(size_t& received, size_t expected)
952966
UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
953967
}
954968

969+
void TPQTabletFixture::WaitForTxState(ui64 txId, NKikimrPQ::TTransaction::EState state)
970+
{
971+
const TString key = GetTxKey(txId);
972+
973+
while (true) {
974+
auto request = std::make_unique<TEvKeyValue::TEvRequest>();
975+
request->Record.SetCookie(12345);
976+
auto cmd = request->Record.AddCmdReadRange();
977+
auto range = cmd->MutableRange();
978+
range->SetFrom(key);
979+
range->SetIncludeFrom(true);
980+
range->SetTo(key);
981+
range->SetIncludeTo(true);
982+
cmd->SetIncludeData(true);
983+
SendToPipe(Ctx->Edge, request.release());
984+
985+
auto response = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>();
986+
UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);
987+
const auto& result = response->Record.GetReadRangeResult(0);
988+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), static_cast<ui32>(NKikimrProto::OK));
989+
const auto& pair = result.GetPair(0);
990+
991+
NKikimrPQ::TTransaction tx;
992+
Y_ABORT_UNLESS(tx.ParseFromString(pair.GetValue()));
993+
994+
if (tx.GetState() == state) {
995+
return;
996+
}
997+
}
998+
999+
UNIT_FAIL("transaction " << txId << " has not entered the " << state << " state");
1000+
}
1001+
1002+
void TPQTabletFixture::WaitForExecStep(ui64 step)
1003+
{
1004+
while (true) {
1005+
auto request = std::make_unique<TEvKeyValue::TEvRequest>();
1006+
request->Record.SetCookie(12345);
1007+
auto cmd = request->Record.AddCmdReadRange();
1008+
auto range = cmd->MutableRange();
1009+
range->SetFrom("_txinfo");
1010+
range->SetIncludeFrom(true);
1011+
range->SetTo("_txinfo");
1012+
range->SetIncludeTo(true);
1013+
cmd->SetIncludeData(true);
1014+
SendToPipe(Ctx->Edge, request.release());
1015+
1016+
auto response = Ctx->Runtime->GrabEdgeEvent<TEvKeyValue::TEvResponse>();
1017+
UNIT_ASSERT_VALUES_EQUAL(response->Record.GetStatus(), NMsgBusProxy::MSTATUS_OK);
1018+
const auto& result = response->Record.GetReadRangeResult(0);
1019+
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), static_cast<ui32>(NKikimrProto::OK));
1020+
const auto& pair = result.GetPair(0);
1021+
1022+
NKikimrPQ::TTabletTxInfo txInfo;
1023+
Y_ABORT_UNLESS(txInfo.ParseFromString(pair.GetValue()));
1024+
1025+
if (txInfo.GetExecStep() == step) {
1026+
return;
1027+
}
1028+
}
1029+
1030+
UNIT_FAIL("expected execution step " << step);
1031+
}
1032+
9551033
Y_UNIT_TEST_F(Parallel_Transactions_1, TPQTabletFixture)
9561034
{
9571035
TestParallelTransactions("consumer", "consumer");
@@ -1730,6 +1808,62 @@ Y_UNIT_TEST_F(Huge_ProposeTransacton, TPQTabletFixture)
17301808
WaitPlanStepAccepted({.Step=100});
17311809
}
17321810

1811+
Y_UNIT_TEST_F(After_Restarting_The_Tablet_Sends_A_TEvReadSet_For_Transactions_In_The_EXECUTED_State, TPQTabletFixture)
1812+
{
1813+
const ui64 txId_1 = 67890;
1814+
const ui64 txId_2 = txId_1 + 1;
1815+
const ui64 mockTabletId = 22222;
1816+
1817+
NHelpers::TPQTabletMock* tablet = CreatePQTabletMock(mockTabletId);
1818+
PQTabletPrepare({.partitions=1}, {}, *Ctx);
1819+
1820+
// 1st tx
1821+
SendProposeTransactionRequest({.TxId=txId_1,
1822+
.Senders={mockTabletId}, .Receivers={mockTabletId},
1823+
.TxOps={
1824+
{.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
1825+
}});
1826+
WaitProposeTransactionResponse({.TxId=txId_1,
1827+
.Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
1828+
1829+
SendPlanStep({.Step=100, .TxIds={txId_1}});
1830+
1831+
WaitForCalcPredicateResult();
1832+
1833+
tablet->SendReadSet(*Ctx->Runtime, {.Step=100, .TxId=txId_1, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT});
1834+
1835+
WaitProposeTransactionResponse({.TxId=txId_1,
1836+
.Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
1837+
1838+
WaitForTxState(txId_1, NKikimrPQ::TTransaction::EXECUTED);
1839+
1840+
tablet->ReadSet = Nothing();
1841+
1842+
// 2nd tx
1843+
SendProposeTransactionRequest({.TxId=txId_2,
1844+
.Senders={mockTabletId}, .Receivers={mockTabletId},
1845+
.TxOps={
1846+
{.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
1847+
}});
1848+
WaitProposeTransactionResponse({.TxId=txId_2,
1849+
.Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
1850+
1851+
SendPlanStep({.Step=110, .TxIds={txId_2}});
1852+
1853+
WaitForCalcPredicateResult();
1854+
1855+
WaitReadSetEx(*tablet, {.Step=110, .TxId=txId_2, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Count=1});
1856+
1857+
// the PQ tablet has moved a step forward
1858+
WaitForExecStep(110);
1859+
1860+
// restart PQ tablet
1861+
PQTabletRestart(*Ctx);
1862+
1863+
// the PQ tablet should send a TEvReadSet for the executed transaction
1864+
WaitReadSetEx(*tablet, {.Step=100, .TxId=txId_1, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Count=2});
1865+
}
1866+
17331867
}
17341868

17351869
}

0 commit comments

Comments
 (0)