Skip to content

KIKIMR-16087 Replace Y_ABORT_UNLESS #569

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 19, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 41 additions & 12 deletions ydb/core/persqueue/writer/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -451,20 +451,28 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
return WriteResult(ErrorCode, "Rejected by writer", MakeResponse(cookie));
}

void HoldPending(TEvPartitionWriter::TEvWriteRequest::TPtr& ev) {
bool HoldPending(TEvPartitionWriter::TEvWriteRequest::TPtr& ev) {
auto& record = ev->Get()->Record;
const auto cookie = record.GetPartitionRequest().GetCookie();

Y_ABORT_UNLESS(Pending.empty() || Pending.rbegin()->first < cookie);
Y_ABORT_UNLESS(PendingReserve.empty() || PendingReserve.rbegin()->first < cookie);
Y_ABORT_UNLESS(PendingWrite.empty() || PendingWrite.back() < cookie);
auto pendingValid = (Pending.empty() || Pending.rbegin()->first < cookie);
auto reserveValid = (PendingReserve.empty() || PendingReserve.rbegin()->first < cookie);
auto writeValid = (PendingWrite.empty() || PendingWrite.back() < cookie);

if (!(pendingValid && reserveValid && writeValid)) {
ERROR("The cookie of WriteRequest is invalid. Cookie=" << cookie);
Disconnected(EErrorCode::InternalError);
return false;
}

Pending.emplace(cookie, std::move(ev->Get()->Record));
return true;
}

void Handle(TEvPartitionWriter::TEvWriteRequest::TPtr& ev, const TActorContext& ctx) {
HoldPending(ev);
ReserveBytes(ctx);
if (HoldPending(ev)) {
ReserveBytes(ctx);
}
}

void ReserveBytes(const TActorContext& ctx) {
Expand Down Expand Up @@ -514,10 +522,18 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
}

void EnqueueReservedAndProcess(ui64 cookie) {
Y_ABORT_UNLESS(!PendingReserve.empty());
if(PendingReserve.empty()) {
ERROR("The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #01");
Disconnected(EErrorCode::InternalError);
return;
}
auto it = PendingReserve.begin();

Y_ABORT_UNLESS(it->first == cookie);
if(it->first != cookie) {
ERROR("The order of reservation is invalid. Cookie=" << cookie << ", ReserveCookie=" << it->first);
Disconnected(EErrorCode::InternalError);
return;
}

ReceivedReserve.emplace(it->first, std::move(it->second));

Expand Down Expand Up @@ -582,11 +598,20 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
}

void Write(ui64 cookie) {
Y_ABORT_UNLESS(!PendingReserve.empty());
if (PendingReserve.empty()) {
ERROR("The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #02");
Disconnected(EErrorCode::InternalError);
return;
}
auto it = PendingReserve.begin();

Y_ABORT_UNLESS(it->first == cookie);
Y_ABORT_UNLESS(PendingWrite.empty() || PendingWrite.back() < cookie);
auto cookieReserveValid = (it->first == cookie);
auto cookieWriteValid = (PendingWrite.empty() || PendingWrite.back() < cookie);
if (!(cookieReserveValid && cookieWriteValid)) {
ERROR("The cookie of Write is invalid. Cookie=" << cookie);
Disconnected(EErrorCode::InternalError);
return;
}

Write(cookie, std::move(it->second.Request));

Expand Down Expand Up @@ -634,7 +659,11 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl

WriteAccepted(cookie);

Y_ABORT_UNLESS(!PendingReserve.empty());
if (PendingReserve.empty()) {
ERROR("The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #03");
Disconnected(EErrorCode::InternalError);
return;
}
auto it = PendingReserve.begin();
auto& holder = it->second;

Expand Down