Skip to content

Commit 13dcd9f

Browse files
Merge 3e3623b into 405f7f2
2 parents 405f7f2 + 3e3623b commit 13dcd9f

File tree

4 files changed

+156
-10
lines changed

4 files changed

+156
-10
lines changed

ydb/core/persqueue/partition.cpp

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -497,24 +497,32 @@ void TPartition::DestroyActor(const TActorContext& ctx)
497497
{
498498
// Reply to all outstanding requests in order to destroy corresponding actors
499499

500+
NPersQueue::NErrorCode::EErrorCode errorCode;
500501
TStringBuilder ss;
501-
ss << "Tablet is restarting, topic '" << TopicName() << "'";
502+
503+
if (IsSupportive()) {
504+
errorCode = NPersQueue::NErrorCode::ERROR;
505+
ss << "The transaction is completed";
506+
} else {
507+
errorCode = NPersQueue::NErrorCode::INITIALIZING;
508+
ss << "Tablet is restarting, topic '" << TopicName() << "'";
509+
}
502510

503511
for (const auto& ev : WaitToChangeOwner) {
504-
ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::INITIALIZING, ss);
512+
ReplyError(ctx, ev->Cookie, errorCode, ss);
505513
}
506514

507515
for (auto& w : PendingRequests) {
508-
ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::INITIALIZING, ss);
516+
ReplyError(ctx, w.GetCookie(), errorCode, ss);
509517
w.Span.EndError(static_cast<const TString&>(ss));
510518
}
511519

512520
for (const auto& w : Responses) {
513-
ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::INITIALIZING, TStringBuilder() << ss << " (WriteResponses)");
521+
ReplyError(ctx, w.GetCookie(), errorCode, TStringBuilder() << ss << " (WriteResponses)");
514522
}
515523

516524
for (const auto& ri : ReadInfo) {
517-
ReplyError(ctx, ri.second.Destination, NPersQueue::NErrorCode::INITIALIZING,
525+
ReplyError(ctx, ri.second.Destination, errorCode,
518526
TStringBuilder() << ss << " (ReadInfo) cookie " << ri.first);
519527
}
520528

@@ -3646,7 +3654,13 @@ void TPartition::Handle(TEvPQ::TEvApproveWriteQuota::TPtr& ev, const TActorConte
36463654
TopicWriteQuotaWaitCounter->IncFor(TopicQuotaWaitTimeForCurrentBlob.MilliSeconds());
36473655
}
36483656

3649-
RequestBlobQuota();
3657+
if (NeedDeletePartition) {
3658+
// deferred TEvPQ::TEvDeletePartition
3659+
DeletePartitionState = DELETION_INITED;
3660+
} else {
3661+
RequestBlobQuota();
3662+
}
3663+
36503664
ProcessTxsAndUserActs(ctx);
36513665
}
36523666

@@ -3750,6 +3764,13 @@ void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvDeletePartition>
37503764
Y_ABORT_UNLESS(IsSupportive());
37513765
Y_ABORT_UNLESS(DeletePartitionState == DELETION_NOT_INITED);
37523766

3767+
NeedDeletePartition = true;
3768+
3769+
if (TopicQuotaRequestCookie != 0) {
3770+
// wait for TEvPQ::TEvApproveWriteQuota
3771+
return;
3772+
}
3773+
37533774
DeletePartitionState = DELETION_INITED;
37543775

37553776
ProcessTxsAndUserActs(ctx);

ydb/core/persqueue/partition.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -934,6 +934,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
934934
ui64 TopicQuotaRequestCookie = 0;
935935
ui64 NextTopicWriteQuotaRequestCookie = 1;
936936
ui64 BlobQuotaSize = 0;
937+
bool NeedDeletePartition = false;
937938

938939
// Wait topic quota metrics
939940
ui64 TotalPartitionWriteSpeed = 0;

ydb/core/persqueue/partition_write.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1638,7 +1638,7 @@ bool TPartition::RequestBlobQuota()
16381638

16391639
void TPartition::HandlePendingRequests(const TActorContext& ctx)
16401640
{
1641-
if (WaitingForPreviousBlobQuota() || WaitingForSubDomainQuota(ctx)) {
1641+
if (WaitingForPreviousBlobQuota() || WaitingForSubDomainQuota(ctx) || NeedDeletePartition) {
16421642
return;
16431643
}
16441644
if (RequestBlobQuota()) {

ydb/core/persqueue/ut/partition_ut.cpp

Lines changed: 127 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ class TPartitionFixture : public NUnitTest::TBaseFixture {
286286
void SendReserveBytes(const ui64 cookie, const ui32 size, const TString& ownerCookie, const ui64 messageNo, bool lastRequest = false);
287287
void SendChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const bool force = true);
288288
void SendWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data,
289-
bool ignoreQuotaDeadline = false, ui64 seqNo = 0);
289+
bool ignoreQuotaDeadline = false, ui64 seqNo = 0, bool isDirectWrite = false);
290290
void SendGetWriteInfo();
291291
void ShadowPartitionCountersTest(bool isFirstClass);
292292

@@ -302,6 +302,14 @@ class TPartitionFixture : public NUnitTest::TBaseFixture {
302302
void SendEvent(IEventBase* event);
303303
void SendEvent(IEventBase* event, const TActorId& from, const TActorId& to);
304304

305+
THolder<TEvPQ::TEvApproveWriteQuota> WaitForRequestQuotaAndHoldApproveWriteQuota();
306+
void SendDeletePartition();
307+
void WaitForDeletePartitionDoneTimeout();
308+
void SendApproveWriteQuota(THolder<TEvPQ::TEvApproveWriteQuota>&& event);
309+
void WaitForQuotaConsumed();
310+
void WaitForWriteError(ui64 cookie, NPersQueue::NErrorCode::EErrorCode errorCode);
311+
void WaitForDeletePartitionDone();
312+
305313
TMaybe<TTestContext> Ctx;
306314
TMaybe<TFinalizer> Finalizer;
307315

@@ -639,7 +647,7 @@ void TPartitionFixture::SendReserveBytes(const ui64 cookie, const ui32 size, con
639647

640648
void TPartitionFixture::SendWrite
641649
(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data,
642-
bool ignoreQuotaDeadline, ui64 seqNo
650+
bool ignoreQuotaDeadline, ui64 seqNo, bool isDirectWrite
643651
) {
644652
TEvPQ::TEvWrite::TMsg msg;
645653
msg.SourceId = "SourceId";
@@ -661,7 +669,7 @@ void TPartitionFixture::SendWrite
661669
TVector<TEvPQ::TEvWrite::TMsg> msgs;
662670
msgs.push_back(msg);
663671

664-
auto event = MakeHolder<TEvPQ::TEvWrite>(cookie, messageNo, ownerCookie, offset, std::move(msgs), false, std::nullopt);
672+
auto event = MakeHolder<TEvPQ::TEvWrite>(cookie, messageNo, ownerCookie, offset, std::move(msgs), isDirectWrite, std::nullopt);
665673
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
666674
}
667675

@@ -1372,6 +1380,92 @@ void TPartitionFixture::TestWriteSubDomainOutOfSpace(TDuration quotaWaitDuration
13721380
}
13731381
}
13741382

1383+
THolder<TEvPQ::TEvApproveWriteQuota> TPartitionFixture::WaitForRequestQuotaAndHoldApproveWriteQuota()
1384+
{
1385+
THolder<TEvPQ::TEvApproveWriteQuota> approveWriteQuota;
1386+
1387+
auto observer = [&approveWriteQuota](TAutoPtr<IEventHandle>& ev) mutable {
1388+
if (auto* event = ev->CastAsLocal<TEvPQ::TEvApproveWriteQuota>()) {
1389+
approveWriteQuota = MakeHolder<TEvPQ::TEvApproveWriteQuota>(event->Cookie,
1390+
event->AccountQuotaWaitTime,
1391+
event->PartitionQuotaWaitTime);
1392+
return TTestActorRuntimeBase::EEventAction::DROP;
1393+
}
1394+
return TTestActorRuntimeBase::EEventAction::PROCESS;
1395+
};
1396+
auto prevObserver = Ctx->Runtime->SetObserverFunc(observer);
1397+
1398+
TDispatchOptions options;
1399+
options.CustomFinalCondition = [&]() {
1400+
return approveWriteQuota != nullptr;
1401+
};
1402+
UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
1403+
1404+
Ctx->Runtime->SetObserverFunc(prevObserver);
1405+
1406+
UNIT_ASSERT(approveWriteQuota != nullptr);
1407+
1408+
return approveWriteQuota;
1409+
}
1410+
1411+
void TPartitionFixture::SendDeletePartition()
1412+
{
1413+
auto event = MakeHolder<TEvPQ::TEvDeletePartition>();
1414+
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
1415+
}
1416+
1417+
void TPartitionFixture::WaitForDeletePartitionDoneTimeout()
1418+
{
1419+
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvDeletePartitionDone>(TDuration::Seconds(3));
1420+
UNIT_ASSERT_VALUES_EQUAL(event, nullptr);
1421+
}
1422+
1423+
void TPartitionFixture::SendApproveWriteQuota(THolder<TEvPQ::TEvApproveWriteQuota>&& event)
1424+
{
1425+
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
1426+
event = nullptr;
1427+
}
1428+
1429+
void TPartitionFixture::WaitForQuotaConsumed()
1430+
{
1431+
bool hasQuotaConsumed = false;
1432+
1433+
auto observer = [&hasQuotaConsumed](TAutoPtr<IEventHandle>& ev) mutable {
1434+
if (auto* event = ev->CastAsLocal<TEvPQ::TEvConsumed>()) {
1435+
hasQuotaConsumed = true;
1436+
}
1437+
return TTestActorRuntimeBase::EEventAction::PROCESS;
1438+
};
1439+
auto prevObserver = Ctx->Runtime->SetObserverFunc(observer);
1440+
1441+
TDispatchOptions options;
1442+
options.CustomFinalCondition = [&]() {
1443+
return hasQuotaConsumed;
1444+
};
1445+
UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));
1446+
1447+
Ctx->Runtime->SetObserverFunc(prevObserver);
1448+
1449+
UNIT_ASSERT(hasQuotaConsumed);
1450+
}
1451+
1452+
void TPartitionFixture::WaitForWriteError(ui64 cookie, NPersQueue::NErrorCode::EErrorCode errorCode)
1453+
{
1454+
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvError>();
1455+
1456+
UNIT_ASSERT(event != nullptr);
1457+
1458+
UNIT_ASSERT_VALUES_EQUAL(cookie, event->Cookie);
1459+
UNIT_ASSERT_C(errorCode == event->ErrorCode, "extected: " << (int)errorCode << ", accepted: " << (int)event->ErrorCode);
1460+
}
1461+
1462+
void TPartitionFixture::WaitForDeletePartitionDone()
1463+
{
1464+
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvDeletePartitionDone>();
1465+
1466+
UNIT_ASSERT(event != nullptr);
1467+
}
1468+
13751469
struct TTestUserAct {
13761470
TSrcIdMap SourceIds = {};
13771471
TString ClientId = {};
@@ -3485,6 +3579,36 @@ Y_UNIT_TEST_F(EndWriteTimestamp_HeadKeys, TPartitionFixture) {
34853579
UNIT_ASSERT_C(now - TDuration::Seconds(2) < endWriteTimestamp && endWriteTimestamp < now, "" << (now - TDuration::Seconds(2)) << " < " << endWriteTimestamp << " < " << now );
34863580
} // EndWriteTimestamp_FromMeta
34873581

3582+
Y_UNIT_TEST_F(The_DeletePartition_Message_Arrives_Before_The_ApproveWriteQuota_Message, TPartitionFixture)
3583+
{
3584+
// create a supportive partition
3585+
const TPartitionId partitionId{1, TWriteId{2, 3}, 4};
3586+
CreatePartition({.Partition=partitionId});
3587+
3588+
// write 2 messages in it
3589+
SendWrite(1, 0, "owner", 0, "message #1", false, 1, true);
3590+
SendWrite(2, 1, "owner", 1, "message #2", false, 2, true);
3591+
3592+
// delay the response from the quoter
3593+
auto approveWriteQuota = WaitForRequestQuotaAndHoldApproveWriteQuota();
3594+
3595+
// Send a `TEvDeletePartition`. The partition will wait for the response from the quoter to arrive.
3596+
SendDeletePartition();
3597+
WaitForDeletePartitionDoneTimeout();
3598+
3599+
// The answer is from the quoter
3600+
SendApproveWriteQuota(std::move(approveWriteQuota));
3601+
WaitForQuotaConsumed();
3602+
3603+
WaitCmdWrite();
3604+
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);
3605+
3606+
// Write operations fail with an error
3607+
WaitForWriteError(1, NPersQueue::NErrorCode::ERROR);
3608+
WaitForDeletePartitionDone();
3609+
WaitForWriteError(2, NPersQueue::NErrorCode::ERROR);
3610+
}
3611+
34883612
} // End of suite
34893613

34903614
} // namespace

0 commit comments

Comments
 (0)