Skip to content

Fix pq writer and few renames #1757

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 23 additions & 42 deletions ydb/core/persqueue/writer/writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
return;
}

const bool checkQuota = Opts.CheckRequestUnits() && IsQuotaRequired();
const bool needToRequestQuota = Opts.CheckRequestUnits() && IsQuotaRequired();

size_t processed = 0;
PendingQuotaAmount = 0;
Expand All @@ -490,23 +490,23 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
cmd.SetSize(it->second.ByteSize());
cmd.SetLastRequest(false);

if (checkQuota) {
if (needToRequestQuota) {
++processed;
PendingQuotaAmount += CalcRuConsumption(it->second.ByteSize());
PendingQuota.emplace_back(it->first);
}

NTabletPipe::SendData(SelfId(), PipeClient, ev.Release());

PendingReserve.emplace(it->first, RequestHolder{ std::move(it->second), checkQuota });
PendingReserve.emplace(it->first, RequestHolder{ std::move(it->second), needToRequestQuota });
Pending.erase(it);

if (checkQuota && processed == MAX_QUOTA_INFLIGHT) {
if (needToRequestQuota && processed == MAX_QUOTA_INFLIGHT) {
break;
}
}

if (checkQuota) {
if (needToRequestQuota) {
RequestDataQuota(PendingQuotaAmount, ctx);
}
}
Expand All @@ -527,18 +527,18 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl

ReceivedReserve.emplace(it->first, std::move(it->second));

ProcessQuota();
ProcessQuotaAndWrite();
}

void ProcessQuota() {
void ProcessQuotaAndWrite() {
auto rit = ReceivedReserve.begin();
auto qit = ReceivedQuota.begin();

while(rit != ReceivedReserve.end() && qit != ReceivedQuota.end()) {
auto& request = rit->second;
const auto cookie = rit->first;
TRACE("processing quota for request cookie=" << cookie << ", QuotaChecked=" << request.QuotaChecked << ", QuotaAccepted=" << request.QuotaAccepted);
if (!request.QuotaChecked || request.QuotaAccepted) {
TRACE("processing quota for request cookie=" << cookie << ", QuotaCheckEnabled=" << request.QuotaCheckEnabled << ", QuotaAccepted=" << request.QuotaAccepted);
if (!request.QuotaCheckEnabled || request.QuotaAccepted) {
// A situation when a quota was not requested or was received while waiting for a reserve
Write(cookie, std::move(request.Request));
ReceivedReserve.erase(rit++);
Expand All @@ -559,8 +559,8 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
while(rit != ReceivedReserve.end()) {
auto& request = rit->second;
const auto cookie = rit->first;
TRACE("processing quota for request cookie=" << cookie << ", QuotaChecked=" << request.QuotaChecked << ", QuotaAccepted=" << request.QuotaAccepted);
if (request.QuotaChecked && !request.QuotaAccepted) {
TRACE("processing quota for request cookie=" << cookie << ", QuotaCheckEnabled=" << request.QuotaCheckEnabled << ", QuotaAccepted=" << request.QuotaAccepted);
if (request.QuotaCheckEnabled && !request.QuotaAccepted) {
break;
}

Expand All @@ -587,27 +587,6 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
ReceivedQuota.clear();
}

void Write(ui64 cookie) {
if (PendingReserve.empty()) {
ERROR("The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #02");
Disconnected(EErrorCode::InternalError);
return;
}
auto it = PendingReserve.begin();

auto cookieReserveValid = (it->first == cookie);
auto cookieWriteValid = (PendingWrite.empty() || PendingWrite.back() < cookie);
if (!(cookieReserveValid && cookieWriteValid)) {
ERROR("The cookie of Write is invalid. Cookie=" << cookie);
Disconnected(EErrorCode::InternalError);
return;
}

Write(cookie, std::move(it->second.Request));

PendingReserve.erase(it);
}

void Write(ui64 cookie, NKikimrClient::TPersQueueRequest&& req) {
auto ev = MakeHolder<TEvPersQueue::TEvRequest>();
ev->Record = std::move(req);
Expand Down Expand Up @@ -651,24 +630,26 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
return WriteResult(EErrorCode::InternalError, error, std::move(record));
}

WriteAccepted(cookie);

if (PendingReserve.empty()) {
ERROR("The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #03");
auto cookieWriteValid = (PendingWrite.empty() || PendingWrite.back() < cookie);
if (!cookieWriteValid) {
ERROR("The cookie of Write is invalid. Cookie=" << cookie);
Disconnected(EErrorCode::InternalError);
return;
}

WriteAccepted(cookie);
auto it = PendingReserve.begin();
auto& holder = it->second;

if ((holder.QuotaChecked && !holder.QuotaAccepted)|| !ReceivedReserve.empty()) {
if ((holder.QuotaCheckEnabled && !holder.QuotaAccepted) || !ReceivedReserve.empty()) {
// There may be two situations:
// - a quota has been requested, and the quota has not been received yet
// - the quota was not requested, for example, due to a change in the metering option, but the previous quota requests have not yet been processed
EnqueueReservedAndProcess(cookie);
} else {
Write(cookie);
Write(cookie, std::move(it->second.Request));
}
PendingReserve.erase(it);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Это ключевое место. Убрал метод write, перенес проверки сюда, и перетащил сюда удаление из PendingReserve

} else {
if (PendingWrite.empty()) {
return WriteResult(EErrorCode::InternalError, "Unexpected Write response", std::move(record));
Expand Down Expand Up @@ -740,7 +721,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
ReceivedQuota.insert(ReceivedQuota.end(), PendingQuota.begin(), PendingQuota.end());
PendingQuota.clear();

ProcessQuota();
ProcessQuotaAndWrite();

break;

Expand Down Expand Up @@ -829,12 +810,12 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl

struct RequestHolder {
NKikimrClient::TPersQueueRequest Request;
bool QuotaChecked;
bool QuotaCheckEnabled;
bool QuotaAccepted;

RequestHolder(NKikimrClient::TPersQueueRequest&& request, bool quotaChecked)
RequestHolder(NKikimrClient::TPersQueueRequest&& request, bool quotaCheckEnabled)
: Request(std::move(request))
, QuotaChecked(quotaChecked)
, QuotaCheckEnabled(quotaCheckEnabled)
, QuotaAccepted(false) {
}
};
Expand Down