Skip to content

Commit a77c52e

Browse files
The commit of a transaction in the SDK freezes (#17089)
1 parent c09bd6c commit a77c52e

File tree

4 files changed

+156
-10
lines changed

4 files changed

+156
-10
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -492,23 +492,31 @@ void TPartition::DestroyActor(const TActorContext& ctx)
492492
{
493493
// Reply to all outstanding requests in order to destroy corresponding actors
494494

495+
NPersQueue::NErrorCode::EErrorCode errorCode;
495496
TStringBuilder ss;
496-
ss << "Tablet is restarting, topic '" << TopicName() << "'";
497+
498+
if (IsSupportive()) {
499+
errorCode = NPersQueue::NErrorCode::ERROR;
500+
ss << "The transaction is completed";
501+
} else {
502+
errorCode = NPersQueue::NErrorCode::INITIALIZING;
503+
ss << "Tablet is restarting, topic '" << TopicName() << "'";
504+
}
497505

498506
for (const auto& ev : WaitToChangeOwner) {
499-
ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::INITIALIZING, ss);
507+
ReplyError(ctx, ev->Cookie, errorCode, ss);
500508
}
501509

502510
for (const auto& w : PendingRequests) {
503-
ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::INITIALIZING, ss);
511+
ReplyError(ctx, w.GetCookie(), errorCode, ss);
504512
}
505513

506514
for (const auto& w : Responses) {
507-
ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::INITIALIZING, TStringBuilder() << ss << " (WriteResponses)");
515+
ReplyError(ctx, w.GetCookie(), errorCode, TStringBuilder() << ss << " (WriteResponses)");
508516
}
509517

510518
for (const auto& ri : ReadInfo) {
511-
ReplyError(ctx, ri.second.Destination, NPersQueue::NErrorCode::INITIALIZING,
519+
ReplyError(ctx, ri.second.Destination, errorCode,
512520
TStringBuilder() << ss << " (ReadInfo) cookie " << ri.first);
513521
}
514522

@@ -3465,7 +3473,13 @@ void TPartition::Handle(TEvPQ::TEvApproveWriteQuota::TPtr& ev, const TActorConte
34653473
TopicWriteQuotaWaitCounter->IncFor(TopicQuotaWaitTimeForCurrentBlob.MilliSeconds());
34663474
}
34673475

3468-
RequestBlobQuota();
3476+
if (NeedDeletePartition) {
3477+
// deferred TEvPQ::TEvDeletePartition
3478+
DeletePartitionState = DELETION_INITED;
3479+
} else {
3480+
RequestBlobQuota();
3481+
}
3482+
34693483
ProcessTxsAndUserActs(ctx);
34703484
}
34713485

@@ -3575,6 +3589,13 @@ void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvDeletePartition>
35753589
Y_ABORT_UNLESS(IsSupportive());
35763590
Y_ABORT_UNLESS(DeletePartitionState == DELETION_NOT_INITED);
35773591

3592+
NeedDeletePartition = true;
3593+
3594+
if (TopicQuotaRequestCookie != 0) {
3595+
// wait for TEvPQ::TEvApproveWriteQuota
3596+
return;
3597+
}
3598+
35783599
DeletePartitionState = DELETION_INITED;
35793600

35803601
ProcessTxsAndUserActs(ctx);

ydb/core/persqueue/partition.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -913,6 +913,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
913913
ui64 TopicQuotaRequestCookie = 0;
914914
ui64 NextTopicWriteQuotaRequestCookie = 1;
915915
ui64 BlobQuotaSize = 0;
916+
bool NeedDeletePartition = false;
916917

917918
// Wait topic quota metrics
918919
ui64 TotalPartitionWriteSpeed = 0;

ydb/core/persqueue/partition_write.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1617,7 +1617,7 @@ bool TPartition::RequestBlobQuota()
16171617

16181618
void TPartition::HandlePendingRequests(const TActorContext& ctx)
16191619
{
1620-
if (WaitingForPreviousBlobQuota() || WaitingForSubDomainQuota(ctx)) {
1620+
if (WaitingForPreviousBlobQuota() || WaitingForSubDomainQuota(ctx) || NeedDeletePartition) {
16211621
return;
16221622
}
16231623
if (RequestBlobQuota()) {

ydb/core/persqueue/ut/partition_ut.cpp

Lines changed: 127 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ class TPartitionFixture : public NUnitTest::TBaseFixture {
283283
void SendReserveBytes(const ui64 cookie, const ui32 size, const TString& ownerCookie, const ui64 messageNo, bool lastRequest = false);
284284
void SendChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const bool force = true);
285285
void SendWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data,
286-
bool ignoreQuotaDeadline = false, ui64 seqNo = 0);
286+
bool ignoreQuotaDeadline = false, ui64 seqNo = 0, bool isDirectWrite = false);
287287
void SendGetWriteInfo();
288288
void ShadowPartitionCountersTest(bool isFirstClass);
289289

@@ -298,6 +298,14 @@ class TPartitionFixture : public NUnitTest::TBaseFixture {
298298
void SendEvent(IEventBase* event);
299299
void SendEvent(IEventBase* event, const TActorId& from, const TActorId& to);
300300

301+
THolder<TEvPQ::TEvApproveWriteQuota> WaitForRequestQuotaAndHoldApproveWriteQuota();
302+
void SendDeletePartition();
303+
void WaitForDeletePartitionDoneTimeout();
304+
void SendApproveWriteQuota(THolder<TEvPQ::TEvApproveWriteQuota>&& event);
305+
void WaitForQuotaConsumed();
306+
void WaitForWriteError(ui64 cookie, NPersQueue::NErrorCode::EErrorCode errorCode);
307+
void WaitForDeletePartitionDone();
308+
301309
TMaybe<TTestContext> Ctx;
302310
TMaybe<TFinalizer> Finalizer;
303311

@@ -635,7 +643,7 @@ void TPartitionFixture::SendReserveBytes(const ui64 cookie, const ui32 size, con
635643

636644
void TPartitionFixture::SendWrite
637645
(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data,
638-
bool ignoreQuotaDeadline, ui64 seqNo
646+
bool ignoreQuotaDeadline, ui64 seqNo, bool isDirectWrite
639647
) {
640648
TEvPQ::TEvWrite::TMsg msg;
641649
msg.SourceId = "SourceId";
@@ -657,7 +665,7 @@ void TPartitionFixture::SendWrite
657665
TVector<TEvPQ::TEvWrite::TMsg> msgs;
658666
msgs.push_back(msg);
659667

660-
auto event = MakeHolder<TEvPQ::TEvWrite>(cookie, messageNo, ownerCookie, offset, std::move(msgs), false, std::nullopt);
668+
auto event = MakeHolder<TEvPQ::TEvWrite>(cookie, messageNo, ownerCookie, offset, std::move(msgs), isDirectWrite, std::nullopt);
661669
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
662670
}
663671

@@ -1274,6 +1282,92 @@ void TPartitionFixture::TestWriteSubDomainOutOfSpace(TDuration quotaWaitDuration
12741282
}
12751283
}
12761284

1285+
THolder<TEvPQ::TEvApproveWriteQuota> TPartitionFixture::WaitForRequestQuotaAndHoldApproveWriteQuota()
1286+
{
1287+
THolder<TEvPQ::TEvApproveWriteQuota> approveWriteQuota;
1288+
1289+
auto observer = [&approveWriteQuota](TAutoPtr<IEventHandle>& ev) mutable {
1290+
if (auto* event = ev->CastAsLocal<TEvPQ::TEvApproveWriteQuota>()) {
1291+
approveWriteQuota = MakeHolder<TEvPQ::TEvApproveWriteQuota>(event->Cookie,
1292+
event->AccountQuotaWaitTime,
1293+
event->PartitionQuotaWaitTime);
1294+
return TTestActorRuntimeBase::EEventAction::DROP;
1295+
}
1296+
return TTestActorRuntimeBase::EEventAction::PROCESS;
1297+
};
1298+
auto prevObserver = Ctx->Runtime->SetObserverFunc(observer);
1299+
1300+
TDispatchOptions options;
1301+
options.CustomFinalCondition = [&]() {
1302+
return approveWriteQuota != nullptr;
1303+
};
1304+
UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
1305+
1306+
Ctx->Runtime->SetObserverFunc(prevObserver);
1307+
1308+
UNIT_ASSERT(approveWriteQuota != nullptr);
1309+
1310+
return approveWriteQuota;
1311+
}
1312+
1313+
void TPartitionFixture::SendDeletePartition()
1314+
{
1315+
auto event = MakeHolder<TEvPQ::TEvDeletePartition>();
1316+
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
1317+
}
1318+
1319+
void TPartitionFixture::WaitForDeletePartitionDoneTimeout()
1320+
{
1321+
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvDeletePartitionDone>(TDuration::Seconds(3));
1322+
UNIT_ASSERT_VALUES_EQUAL(event, nullptr);
1323+
}
1324+
1325+
void TPartitionFixture::SendApproveWriteQuota(THolder<TEvPQ::TEvApproveWriteQuota>&& event)
1326+
{
1327+
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
1328+
event = nullptr;
1329+
}
1330+
1331+
void TPartitionFixture::WaitForQuotaConsumed()
1332+
{
1333+
bool hasQuotaConsumed = false;
1334+
1335+
auto observer = [&hasQuotaConsumed](TAutoPtr<IEventHandle>& ev) mutable {
1336+
if (auto* event = ev->CastAsLocal<TEvPQ::TEvConsumed>()) {
1337+
hasQuotaConsumed = true;
1338+
}
1339+
return TTestActorRuntimeBase::EEventAction::PROCESS;
1340+
};
1341+
auto prevObserver = Ctx->Runtime->SetObserverFunc(observer);
1342+
1343+
TDispatchOptions options;
1344+
options.CustomFinalCondition = [&]() {
1345+
return hasQuotaConsumed;
1346+
};
1347+
UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
1348+
1349+
Ctx->Runtime->SetObserverFunc(prevObserver);
1350+
1351+
UNIT_ASSERT(hasQuotaConsumed);
1352+
}
1353+
1354+
void TPartitionFixture::WaitForWriteError(ui64 cookie, NPersQueue::NErrorCode::EErrorCode errorCode)
1355+
{
1356+
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvError>();
1357+
1358+
UNIT_ASSERT(event != nullptr);
1359+
1360+
UNIT_ASSERT_VALUES_EQUAL(cookie, event->Cookie);
1361+
UNIT_ASSERT_C(errorCode == event->ErrorCode, "extected: " << (int)errorCode << ", accepted: " << (int)event->ErrorCode);
1362+
}
1363+
1364+
void TPartitionFixture::WaitForDeletePartitionDone()
1365+
{
1366+
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvDeletePartitionDone>();
1367+
1368+
UNIT_ASSERT(event != nullptr);
1369+
}
1370+
12771371
struct TTestUserAct {
12781372
TSrcIdMap SourceIds = {};
12791373
TString ClientId = {};
@@ -3270,6 +3364,36 @@ Y_UNIT_TEST_F(EndWriteTimestamp_HeadKeys, TPartitionFixture) {
32703364
UNIT_ASSERT_C(now - TDuration::Seconds(2) < endWriteTimestamp && endWriteTimestamp < now, "" << (now - TDuration::Seconds(2)) << " < " << endWriteTimestamp << " < " << now );
32713365
} // EndWriteTimestamp_FromMeta
32723366

3367+
Y_UNIT_TEST_F(The_DeletePartition_Message_Arrives_Before_The_ApproveWriteQuota_Message, TPartitionFixture)
3368+
{
3369+
// create a supportive partition
3370+
const TPartitionId partitionId{1, TWriteId{2, 3}, 4};
3371+
CreatePartition({.Partition=partitionId});
3372+
3373+
// write 2 messages in it
3374+
SendWrite(1, 0, "owner", 0, "message #1", false, 1, true);
3375+
SendWrite(2, 1, "owner", 1, "message #2", false, 2, true);
3376+
3377+
// delay the response from the quoter
3378+
auto approveWriteQuota = WaitForRequestQuotaAndHoldApproveWriteQuota();
3379+
3380+
// Send a `TEvDeletePartition`. The partition will wait for the response from the quoter to arrive.
3381+
SendDeletePartition();
3382+
WaitForDeletePartitionDoneTimeout();
3383+
3384+
// The answer is from the quoter
3385+
SendApproveWriteQuota(std::move(approveWriteQuota));
3386+
WaitForQuotaConsumed();
3387+
3388+
WaitCmdWrite();
3389+
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
3390+
3391+
// Write operations fail with an error
3392+
WaitForWriteError(1, NPersQueue::NErrorCode::ERROR);
3393+
WaitForDeletePartitionDone();
3394+
WaitForWriteError(2, NPersQueue::NErrorCode::ERROR);
3395+
}
3396+
32733397
} // End of suite
32743398

32753399
} // namespace

0 commit comments

Comments
 (0)