Skip to content

Commit 32a6a78

Browse files
The PQ tablet loses its TEvReadSet (#17842) (#17855)
1 parent 152e499 commit 32a6a78

File tree

3 files changed

+156
-18
lines changed

3 files changed

+156
-18
lines changed

ydb/core/persqueue/pq_impl.cpp

Lines changed: 52 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3133,6 +3133,11 @@ void TPersQueue::SetTxInFlyCounter()
31333133

31343134
void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, const TActorContext& ctx)
31353135
{
3136+
if (!InitCompleted) {
3137+
AddPendingEvent(ev.Release());
3138+
return;
3139+
}
3140+
31363141
PQ_LOG_D("Handle TEvPersQueue::TEvCancelTransactionProposal");
31373142

31383143
NKikimrPQ::TEvCancelTransactionProposal& event = ev->Get()->Record;
@@ -3149,6 +3154,11 @@ void TPersQueue::Handle(TEvPersQueue::TEvCancelTransactionProposal::TPtr& ev, co
31493154

31503155
void TPersQueue::Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
31513156
{
3157+
if (!InitCompleted) {
3158+
AddPendingEvent(ev.Release());
3159+
return;
3160+
}
3161+
31523162
const NKikimrPQ::TEvProposeTransaction& event = ev->Get()->GetRecord();
31533163
PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransaction " << event.ShortDebugString());
31543164

@@ -3328,6 +3338,11 @@ void TPersQueue::HandleConfigTransaction(TAutoPtr<TEvPersQueue::TEvProposeTransa
33283338

33293339
void TPersQueue::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorContext& ctx)
33303340
{
3341+
if (!InitCompleted) {
3342+
AddPendingEvent(ev.Release());
3343+
return;
3344+
}
3345+
33313346
PQ_LOG_D("Handle TEvTxProcessing::TEvPlanStep " << ev->Get()->Record.ShortDebugString());
33323347

33333348
EvPlanStepQueue.emplace_back(ev->Sender, ev->Release().Release());
@@ -3337,6 +3352,11 @@ void TPersQueue::Handle(TEvTxProcessing::TEvPlanStep::TPtr& ev, const TActorCont
33373352

33383353
void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorContext& ctx)
33393354
{
3355+
if (!InitCompleted) {
3356+
AddPendingEvent(ev.Release());
3357+
return;
3358+
}
3359+
33403360
PQ_LOG_D("Handle TEvTxProcessing::TEvReadSet " << ev->Get()->Record.ShortDebugString());
33413361

33423362
NKikimrTx::TEvReadSet& event = ev->Get()->Record;
@@ -3348,7 +3368,8 @@ void TPersQueue::Handle(TEvTxProcessing::TEvReadSet::TPtr& ev, const TActorConte
33483368
}
33493369

33503370
if (auto tx = GetTransaction(ctx, event.GetTxId()); tx && tx->PredicatesReceived.contains(event.GetTabletProducer())) {
3351-
if (tx->State >= NKikimrPQ::TTransaction::EXECUTED) {
3371+
if ((tx->State > NKikimrPQ::TTransaction::EXECUTED) ||
3372+
((tx->State == NKikimrPQ::TTransaction::EXECUTED) && !tx->WriteInProgress)) {
33523373
if (ack) {
33533374
PQ_LOG_D("send TEvReadSetAck to " << event.GetTabletProducer());
33543375
ctx.Send(ev->Sender, ack.release());
@@ -3724,15 +3745,17 @@ void TPersQueue::ProcessPlanStepQueue(const TActorContext& ctx)
37243745
void TPersQueue::ProcessWriteTxs(const TActorContext& ctx,
37253746
NKikimrClient::TKeyValueRequest& request)
37263747
{
3727-
Y_ABORT_UNLESS(!WriteTxsInProgress);
3748+
Y_ABORT_UNLESS(!WriteTxsInProgress, "PQ %" PRIu64, TabletID());
37283749

37293750
for (auto& [txId, state] : WriteTxs) {
3751+
// There may be cases when in one iteration of a record we change the state of a transaction and delete it
37303752
auto tx = GetTransaction(ctx, txId);
3731-
Y_ABORT_UNLESS(tx);
3732-
3733-
tx->AddCmdWrite(request, state);
3753+
if (tx) {
3754+
PQ_LOG_D("write key for TxId " << txId);
3755+
tx->AddCmdWrite(request, state);
37343756

3735-
ChangedTxs.emplace(tx->Step, txId);
3757+
ChangedTxs.emplace(tx->Step, txId);
3758+
}
37363759
}
37373760

37383761
WriteTxs.clear();
@@ -3741,9 +3764,7 @@ void TPersQueue::ProcessWriteTxs(const TActorContext& ctx,
37413764
void TPersQueue::ProcessDeleteTxs(const TActorContext& ctx,
37423765
NKikimrClient::TKeyValueRequest& request)
37433766
{
3744-
Y_ABORT_UNLESS(!WriteTxsInProgress,
3745-
"PQ %" PRIu64,
3746-
TabletID());
3767+
Y_ABORT_UNLESS(!WriteTxsInProgress, "PQ %" PRIu64, TabletID());
37473768

37483769
for (ui64 txId : DeleteTxs) {
37493770
PQ_LOG_D("delete key for TxId " << txId);
@@ -4840,8 +4861,9 @@ void TPersQueue::DeleteSupportivePartitions(const TActorContext& ctx)
48404861
void TPersQueue::OnInitComplete(const TActorContext& ctx)
48414862
{
48424863
SignalTabletActive(ctx);
4843-
TryStartTransaction(ctx);
48444864
InitCompleted = true;
4865+
ProcessPendingEvents();
4866+
TryStartTransaction(ctx);
48454867
}
48464868

48474869
ui64 TPersQueue::GetAllowedStep() const
@@ -4877,6 +4899,11 @@ void TPersQueue::Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext
48774899

48784900
void TPersQueue::Handle(TEvPersQueue::TEvProposeTransactionAttach::TPtr &ev, const TActorContext &ctx)
48794901
{
4902+
if (!InitCompleted) {
4903+
AddPendingEvent(ev.Release());
4904+
return;
4905+
}
4906+
48804907
PQ_LOG_D("Handle TEvPersQueue::TEvProposeTransactionAttach " << ev->Get()->Record.ShortDebugString());
48814908

48824909
const ui64 txId = ev->Get()->Record.GetTxId();
@@ -5087,6 +5114,21 @@ ui64 TPersQueue::GetGeneration() {
50875114
return *TabletGeneration;
50885115
}
50895116

5117+
void TPersQueue::AddPendingEvent(IEventHandle* ev)
5118+
{
5119+
PendingEvents.emplace_back(ev);
5120+
}
5121+
5122+
void TPersQueue::ProcessPendingEvents()
5123+
{
5124+
auto events = std::move(PendingEvents);
5125+
PendingEvents.clear();
5126+
5127+
for (auto& ev : events) {
5128+
HandleHook(ev);
5129+
}
5130+
}
5131+
50905132
bool TPersQueue::HandleHook(STFUNC_SIG)
50915133
{
50925134
SetActivityType(NKikimrServices::TActivity::PERSQUEUE_ACTOR);

ydb/core/persqueue/pq_impl.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -555,6 +555,11 @@ class TPersQueue : public NKeyValue::TKeyValueFlat {
555555
void ResendEvReadSetToReceiversForState(const TActorContext& ctx, NKikimrPQ::TTransaction::EState state);
556556

557557
void DeleteSupportivePartitions(const TActorContext& ctx);
558+
559+
TDeque<TAutoPtr<IEventHandle>> PendingEvents;
560+
561+
void AddPendingEvent(IEventHandle* ev);
562+
void ProcessPendingEvents();
558563
};
559564

560565

ydb/core/persqueue/ut/pqtablet_ut.cpp

Lines changed: 99 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
197197

198198
void WaitReadSetAck(NHelpers::TPQTabletMock& tablet, const TReadSetAckMatcher& matcher);
199199
void SendReadSetAck(NHelpers::TPQTabletMock& tablet);
200+
void WaitForNoReadSetAck(NHelpers::TPQTabletMock& tablet);
200201

201202
void SendDropTablet(const TDropTabletParams& params);
202203
void WaitDropTabletReply(const TDropTabletReplyMatcher& matcher);
@@ -206,7 +207,7 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
206207

207208
void SendCancelTransactionProposal(const TCancelTransactionProposalParams& params);
208209

209-
void StartPQWriteTxsObserver();
210+
void StartPQWriteTxsObserver(TAutoPtr<IEventHandle>* ev = nullptr);
210211
void WaitForPQWriteTxs();
211212

212213
template <class T> void WaitForEvent(size_t count);
@@ -215,7 +216,7 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
215216

216217
void TestWaitingForTEvReadSet(size_t senders, size_t receivers);
217218

218-
void StartPQWriteObserver(bool& flag, unsigned cookie);
219+
void StartPQWriteObserver(bool& flag, unsigned cookie, TAutoPtr<IEventHandle>* ev = nullptr);
219220
void WaitForPQWriteComplete(bool& flag);
220221

221222
bool FoundPQWriteState = false;
@@ -241,6 +242,9 @@ class TPQTabletFixture : public NUnitTest::TBaseFixture {
241242
void WaitForTxState(ui64 txId, NKikimrPQ::TTransaction::EState state);
242243
void WaitForExecStep(ui64 step);
243244

245+
void InterceptSaveTxState(TAutoPtr<IEventHandle>& event);
246+
void SendSaveTxState(TAutoPtr<IEventHandle>& event);
247+
244248
//
245249
// TODO(abcdef): для тестирования повторных вызовов нужны примитивы Send+Wait
246250
//
@@ -522,6 +526,17 @@ void TPQTabletFixture::WaitReadSetAck(NHelpers::TPQTabletMock& tablet, const TRe
522526
}
523527
}
524528

529+
void TPQTabletFixture::WaitForNoReadSetAck(NHelpers::TPQTabletMock& tablet)
530+
{
531+
TDispatchOptions options;
532+
options.CustomFinalCondition = [&]() {
533+
return tablet.ReadSetAck.Defined();
534+
};
535+
Ctx->Runtime->DispatchEvents(options, TDuration::Seconds(2));
536+
537+
UNIT_ASSERT(!tablet.ReadSetAck.Defined());
538+
}
539+
525540
void TPQTabletFixture::SendDropTablet(const TDropTabletParams& params)
526541
{
527542
auto event = MakeHolder<TEvPersQueue::TEvDropTablet>();
@@ -745,18 +760,21 @@ void TPQTabletFixture::WaitWriteResponse(const TWriteResponseMatcher& matcher)
745760
Ctx->Runtime->SetObserverFunc(prev);
746761
}
747762

748-
void TPQTabletFixture::StartPQWriteObserver(bool& flag, unsigned cookie)
763+
void TPQTabletFixture::StartPQWriteObserver(bool& flag, unsigned cookie, TAutoPtr<IEventHandle>* ev)
749764
{
750765
flag = false;
751766

752-
auto observer = [&flag, cookie](TAutoPtr<IEventHandle>& event) {
767+
auto observer = [&flag, cookie, ev](TAutoPtr<IEventHandle>& event) {
753768
if (auto* kvResponse = event->CastAsLocal<TEvKeyValue::TEvResponse>()) {
754-
if (kvResponse->Record.HasCookie()) {
755-
}
756769
if ((event->Sender == event->Recipient) &&
757770
kvResponse->Record.HasCookie() &&
758771
(kvResponse->Record.GetCookie() == cookie)) {
759772
flag = true;
773+
774+
if (ev) {
775+
*ev = event;
776+
return TTestActorRuntimeBase::EEventAction::DROP;
777+
}
760778
}
761779
}
762780

@@ -793,9 +811,9 @@ void TPQTabletFixture::SendCancelTransactionProposal(const TCancelTransactionPro
793811
event.Release());
794812
}
795813

796-
void TPQTabletFixture::StartPQWriteTxsObserver()
814+
void TPQTabletFixture::StartPQWriteTxsObserver(TAutoPtr<IEventHandle>* event)
797815
{
798-
StartPQWriteObserver(FoundPQWriteTxs, 5); // TPersQueue::WRITE_TX_COOKIE
816+
StartPQWriteObserver(FoundPQWriteTxs, 5, event); // TPersQueue::WRITE_TX_COOKIE
799817
}
800818

801819
void TPQTabletFixture::WaitForPQWriteTxs()
@@ -1030,6 +1048,40 @@ void TPQTabletFixture::WaitForExecStep(ui64 step)
10301048
UNIT_FAIL("expected execution step " << step);
10311049
}
10321050

1051+
void TPQTabletFixture::InterceptSaveTxState(TAutoPtr<IEventHandle>& ev)
1052+
{
1053+
bool found = false;
1054+
1055+
TTestActorRuntimeBase::TEventFilter prev;
1056+
auto filter = [&](TTestActorRuntimeBase&, TAutoPtr<IEventHandle>& event) -> bool {
1057+
if (auto* msg = event->CastAsLocal<TEvKeyValue::TEvRequest>()) {
1058+
if (msg->Record.HasCookie() && (msg->Record.GetCookie() == 5)) { // WRITE_TX_COOKIE
1059+
ev = event;
1060+
found = true;
1061+
return true;
1062+
}
1063+
}
1064+
1065+
return false;
1066+
};
1067+
prev = Ctx->Runtime->SetEventFilter(filter);
1068+
1069+
TDispatchOptions options;
1070+
options.CustomFinalCondition = [&found]() {
1071+
return found;
1072+
};
1073+
1074+
UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
1075+
UNIT_ASSERT(found);
1076+
1077+
Ctx->Runtime->SetEventFilter(prev);
1078+
}
1079+
1080+
void TPQTabletFixture::SendSaveTxState(TAutoPtr<IEventHandle>& event)
1081+
{
1082+
Ctx->Runtime->Send(event);
1083+
}
1084+
10331085
Y_UNIT_TEST_F(Parallel_Transactions_1, TPQTabletFixture)
10341086
{
10351087
TestParallelTransactions("consumer", "consumer");
@@ -1926,6 +1978,45 @@ Y_UNIT_TEST_F(After_Restarting_The_Tablet_Sends_A_TEvReadSet_For_Transactions_In
19261978
WaitReadSetEx(*tablet, {.Step=100, .TxId=txId_1, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT, .Count=2});
19271979
}
19281980

1981+
Y_UNIT_TEST_F(TEvReadSet_Is_Not_Sent_Ahead_Of_Time, TPQTabletFixture)
1982+
{
1983+
const ui64 txId = 67890;
1984+
const ui64 mockTabletId = 22222;
1985+
1986+
NHelpers::TPQTabletMock* tablet = CreatePQTabletMock(mockTabletId);
1987+
PQTabletPrepare({.partitions=1}, {}, *Ctx);
1988+
1989+
SendProposeTransactionRequest({.TxId=txId,
1990+
.Senders={mockTabletId}, .Receivers={mockTabletId},
1991+
.TxOps={
1992+
{.Partition=0, .Consumer="user", .Begin=0, .End=0, .Path="/topic"},
1993+
}});
1994+
WaitProposeTransactionResponse({.TxId=txId,
1995+
.Status=NKikimrPQ::TEvProposeTransactionResult::PREPARED});
1996+
1997+
SendPlanStep({.Step=100, .TxIds={txId}});
1998+
1999+
WaitForCalcPredicateResult();
2000+
2001+
tablet->SendReadSet(*Ctx->Runtime, {.Step=100, .TxId=txId, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT});
2002+
2003+
//WaitProposeTransactionResponse({.TxId=txId,
2004+
// .Status=NKikimrPQ::TEvProposeTransactionResult::COMPLETE});
2005+
2006+
TAutoPtr<IEventHandle> kvRequest;
2007+
InterceptSaveTxState(kvRequest);
2008+
2009+
tablet->SendReadSet(*Ctx->Runtime, {.Step=100, .TxId=txId, .Target=Ctx->TabletId, .Decision=NKikimrTx::TReadSetData::DECISION_COMMIT});
2010+
2011+
WaitForNoReadSetAck(*tablet);
2012+
2013+
SendSaveTxState(kvRequest);
2014+
2015+
WaitForTxState(txId, NKikimrPQ::TTransaction::EXECUTED);
2016+
2017+
WaitReadSetAck(*tablet, {.Step=100, .TxId=txId, .Source=22222, .Target=Ctx->TabletId, .Consumer=Ctx->TabletId});
2018+
}
2019+
19292020
}
19302021

19312022
}

0 commit comments

Comments
 (0)