Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 27 additions & 6 deletions ydb/core/persqueue/partition.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,23 +492,31 @@ void TPartition::DestroyActor(const TActorContext& ctx)
{
// Reply to all outstanding requests in order to destroy corresponding actors

NPersQueue::NErrorCode::EErrorCode errorCode;
TStringBuilder ss;
ss << "Tablet is restarting, topic '" << TopicName() << "'";

if (IsSupportive()) {
errorCode = NPersQueue::NErrorCode::ERROR;
ss << "The transaction is completed";
} else {
errorCode = NPersQueue::NErrorCode::INITIALIZING;
ss << "Tablet is restarting, topic '" << TopicName() << "'";
}

for (const auto& ev : WaitToChangeOwner) {
ReplyError(ctx, ev->Cookie, NPersQueue::NErrorCode::INITIALIZING, ss);
ReplyError(ctx, ev->Cookie, errorCode, ss);
}

for (const auto& w : PendingRequests) {
ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::INITIALIZING, ss);
ReplyError(ctx, w.GetCookie(), errorCode, ss);
}

for (const auto& w : Responses) {
ReplyError(ctx, w.GetCookie(), NPersQueue::NErrorCode::INITIALIZING, TStringBuilder() << ss << " (WriteResponses)");
ReplyError(ctx, w.GetCookie(), errorCode, TStringBuilder() << ss << " (WriteResponses)");
}

for (const auto& ri : ReadInfo) {
ReplyError(ctx, ri.second.Destination, NPersQueue::NErrorCode::INITIALIZING,
ReplyError(ctx, ri.second.Destination, errorCode,
TStringBuilder() << ss << " (ReadInfo) cookie " << ri.first);
}

Expand Down Expand Up @@ -3465,7 +3473,13 @@ void TPartition::Handle(TEvPQ::TEvApproveWriteQuota::TPtr& ev, const TActorConte
TopicWriteQuotaWaitCounter->IncFor(TopicQuotaWaitTimeForCurrentBlob.MilliSeconds());
}

RequestBlobQuota();
if (NeedDeletePartition) {
// deferred TEvPQ::TEvDeletePartition
DeletePartitionState = DELETION_INITED;
} else {
RequestBlobQuota();
}

ProcessTxsAndUserActs(ctx);
}

Expand Down Expand Up @@ -3575,6 +3589,13 @@ void TPartition::ProcessPendingEvent(std::unique_ptr<TEvPQ::TEvDeletePartition>
Y_ABORT_UNLESS(IsSupportive());
Y_ABORT_UNLESS(DeletePartitionState == DELETION_NOT_INITED);

NeedDeletePartition = true;

if (TopicQuotaRequestCookie != 0) {
// wait for TEvPQ::TEvApproveWriteQuota
return;
}

DeletePartitionState = DELETION_INITED;

ProcessTxsAndUserActs(ctx);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/persqueue/partition.h
Original file line number Diff line number Diff line change
Expand Up @@ -913,6 +913,7 @@ class TPartition : public TActorBootstrapped<TPartition> {
ui64 TopicQuotaRequestCookie = 0;
ui64 NextTopicWriteQuotaRequestCookie = 1;
ui64 BlobQuotaSize = 0;
bool NeedDeletePartition = false;

// Wait topic quota metrics
ui64 TotalPartitionWriteSpeed = 0;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/persqueue/partition_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1617,7 +1617,7 @@ bool TPartition::RequestBlobQuota()

void TPartition::HandlePendingRequests(const TActorContext& ctx)
{
if (WaitingForPreviousBlobQuota() || WaitingForSubDomainQuota(ctx)) {
if (WaitingForPreviousBlobQuota() || WaitingForSubDomainQuota(ctx) || NeedDeletePartition) {
return;
}
if (RequestBlobQuota()) {
Expand Down
130 changes: 127 additions & 3 deletions ydb/core/persqueue/ut/partition_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ class TPartitionFixture : public NUnitTest::TBaseFixture {
void SendReserveBytes(const ui64 cookie, const ui32 size, const TString& ownerCookie, const ui64 messageNo, bool lastRequest = false);
void SendChangeOwner(const ui64 cookie, const TString& owner, const TActorId& pipeClient, const bool force = true);
void SendWrite(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data,
bool ignoreQuotaDeadline = false, ui64 seqNo = 0);
bool ignoreQuotaDeadline = false, ui64 seqNo = 0, bool isDirectWrite = false);
void SendGetWriteInfo();
void ShadowPartitionCountersTest(bool isFirstClass);

Expand All @@ -298,6 +298,14 @@ class TPartitionFixture : public NUnitTest::TBaseFixture {
void SendEvent(IEventBase* event);
void SendEvent(IEventBase* event, const TActorId& from, const TActorId& to);

THolder<TEvPQ::TEvApproveWriteQuota> WaitForRequestQuotaAndHoldApproveWriteQuota();
void SendDeletePartition();
void WaitForDeletePartitionDoneTimeout();
void SendApproveWriteQuota(THolder<TEvPQ::TEvApproveWriteQuota>&& event);
void WaitForQuotaConsumed();
void WaitForWriteError(ui64 cookie, NPersQueue::NErrorCode::EErrorCode errorCode);
void WaitForDeletePartitionDone();

TMaybe<TTestContext> Ctx;
TMaybe<TFinalizer> Finalizer;

Expand Down Expand Up @@ -635,7 +643,7 @@ void TPartitionFixture::SendReserveBytes(const ui64 cookie, const ui32 size, con

void TPartitionFixture::SendWrite
(const ui64 cookie, const ui64 messageNo, const TString& ownerCookie, const TMaybe<ui64> offset, const TString& data,
bool ignoreQuotaDeadline, ui64 seqNo
bool ignoreQuotaDeadline, ui64 seqNo, bool isDirectWrite
) {
TEvPQ::TEvWrite::TMsg msg;
msg.SourceId = "SourceId";
Expand All @@ -657,7 +665,7 @@ void TPartitionFixture::SendWrite
TVector<TEvPQ::TEvWrite::TMsg> msgs;
msgs.push_back(msg);

auto event = MakeHolder<TEvPQ::TEvWrite>(cookie, messageNo, ownerCookie, offset, std::move(msgs), false, std::nullopt);
auto event = MakeHolder<TEvPQ::TEvWrite>(cookie, messageNo, ownerCookie, offset, std::move(msgs), isDirectWrite, std::nullopt);
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
}

Expand Down Expand Up @@ -1274,6 +1282,92 @@ void TPartitionFixture::TestWriteSubDomainOutOfSpace(TDuration quotaWaitDuration
}
}

THolder<TEvPQ::TEvApproveWriteQuota> TPartitionFixture::WaitForRequestQuotaAndHoldApproveWriteQuota()
{
THolder<TEvPQ::TEvApproveWriteQuota> approveWriteQuota;

auto observer = [&approveWriteQuota](TAutoPtr<IEventHandle>& ev) mutable {
if (auto* event = ev->CastAsLocal<TEvPQ::TEvApproveWriteQuota>()) {
approveWriteQuota = MakeHolder<TEvPQ::TEvApproveWriteQuota>(event->Cookie,
event->AccountQuotaWaitTime,
event->PartitionQuotaWaitTime);
return TTestActorRuntimeBase::EEventAction::DROP;
}
return TTestActorRuntimeBase::EEventAction::PROCESS;
};
auto prevObserver = Ctx->Runtime->SetObserverFunc(observer);

TDispatchOptions options;
options.CustomFinalCondition = [&]() {
return approveWriteQuota != nullptr;
};
UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));

Ctx->Runtime->SetObserverFunc(prevObserver);

UNIT_ASSERT(approveWriteQuota != nullptr);

return approveWriteQuota;
}

void TPartitionFixture::SendDeletePartition()
{
auto event = MakeHolder<TEvPQ::TEvDeletePartition>();
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
}

void TPartitionFixture::WaitForDeletePartitionDoneTimeout()
{
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvDeletePartitionDone>(TDuration::Seconds(3));
UNIT_ASSERT_VALUES_EQUAL(event, nullptr);
}

void TPartitionFixture::SendApproveWriteQuota(THolder<TEvPQ::TEvApproveWriteQuota>&& event)
{
Ctx->Runtime->SingleSys()->Send(new IEventHandle(ActorId, Ctx->Edge, event.Release()));
event = nullptr;
}

void TPartitionFixture::WaitForQuotaConsumed()
{
bool hasQuotaConsumed = false;

auto observer = [&hasQuotaConsumed](TAutoPtr<IEventHandle>& ev) mutable {
if (auto* event = ev->CastAsLocal<TEvPQ::TEvConsumed>()) {
hasQuotaConsumed = true;
}
return TTestActorRuntimeBase::EEventAction::PROCESS;
};
auto prevObserver = Ctx->Runtime->SetObserverFunc(observer);

TDispatchOptions options;
options.CustomFinalCondition = [&]() {
return hasQuotaConsumed;
};
UNIT_ASSERT(Ctx->Runtime->DispatchEvents(options));

Ctx->Runtime->SetObserverFunc(prevObserver);

UNIT_ASSERT(hasQuotaConsumed);
}

void TPartitionFixture::WaitForWriteError(ui64 cookie, NPersQueue::NErrorCode::EErrorCode errorCode)
{
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvError>();

UNIT_ASSERT(event != nullptr);

UNIT_ASSERT_VALUES_EQUAL(cookie, event->Cookie);
UNIT_ASSERT_C(errorCode == event->ErrorCode, "extected: " << (int)errorCode << ", accepted: " << (int)event->ErrorCode);
}

void TPartitionFixture::WaitForDeletePartitionDone()
{
auto event = Ctx->Runtime->GrabEdgeEvent<TEvPQ::TEvDeletePartitionDone>();

UNIT_ASSERT(event != nullptr);
}

struct TTestUserAct {
TSrcIdMap SourceIds = {};
TString ClientId = {};
Expand Down Expand Up @@ -3270,6 +3364,36 @@ Y_UNIT_TEST_F(EndWriteTimestamp_HeadKeys, TPartitionFixture) {
UNIT_ASSERT_C(now - TDuration::Seconds(2) < endWriteTimestamp && endWriteTimestamp < now, "" << (now - TDuration::Seconds(2)) << " < " << endWriteTimestamp << " < " << now );
} // EndWriteTimestamp_FromMeta

Y_UNIT_TEST_F(The_DeletePartition_Message_Arrives_Before_The_ApproveWriteQuota_Message, TPartitionFixture)
{
// create a supportive partition
const TPartitionId partitionId{1, TWriteId{2, 3}, 4};
CreatePartition({.Partition=partitionId});

// write 2 messages in it
SendWrite(1, 0, "owner", 0, "message #1", false, 1, true);
SendWrite(2, 1, "owner", 1, "message #2", false, 2, true);

// delay the response from the quoter
auto approveWriteQuota = WaitForRequestQuotaAndHoldApproveWriteQuota();

// Send a `TEvDeletePartition`. The partition will wait for the response from the quoter to arrive.
SendDeletePartition();
WaitForDeletePartitionDoneTimeout();

// The answer is from the quoter
SendApproveWriteQuota(std::move(approveWriteQuota));
WaitForQuotaConsumed();

WaitCmdWrite();
SendCmdWriteResponse(NMsgBusProxy::MSTATUS_OK);

// Write operations fail with an error
WaitForWriteError(1, NPersQueue::NErrorCode::ERROR);
WaitForDeletePartitionDone();
WaitForWriteError(2, NPersQueue::NErrorCode::ERROR);
}

} // End of suite

} // namespace
Loading