@@ -470,7 +470,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
470470 return ;
471471 }
472472
473- const bool checkQuota = Opts.CheckRequestUnits () && IsQuotaRequired ();
473+ const bool needToRequestQuota = Opts.CheckRequestUnits () && IsQuotaRequired ();
474474
475475 size_t processed = 0 ;
476476 PendingQuotaAmount = 0 ;
@@ -490,23 +490,23 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
490490 cmd.SetSize (it->second .ByteSize ());
491491 cmd.SetLastRequest (false );
492492
493- if (checkQuota ) {
493+ if (needToRequestQuota ) {
494494 ++processed;
495495 PendingQuotaAmount += CalcRuConsumption (it->second .ByteSize ());
496496 PendingQuota.emplace_back (it->first );
497497 }
498498
499499 NTabletPipe::SendData (SelfId (), PipeClient, ev.Release ());
500500
501- PendingReserve.emplace (it->first , RequestHolder{ std::move (it->second ), checkQuota });
501+ PendingReserve.emplace (it->first , RequestHolder{ std::move (it->second ), needToRequestQuota });
502502 Pending.erase (it);
503503
504- if (checkQuota && processed == MAX_QUOTA_INFLIGHT) {
504+ if (needToRequestQuota && processed == MAX_QUOTA_INFLIGHT) {
505505 break ;
506506 }
507507 }
508508
509- if (checkQuota ) {
509+ if (needToRequestQuota ) {
510510 RequestDataQuota (PendingQuotaAmount, ctx);
511511 }
512512 }
@@ -527,18 +527,18 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
527527
528528 ReceivedReserve.emplace (it->first , std::move (it->second ));
529529
530- ProcessQuota ();
530+ ProcessQuotaAndWrite ();
531531 }
532532
533- void ProcessQuota () {
533+ void ProcessQuotaAndWrite () {
534534 auto rit = ReceivedReserve.begin ();
535535 auto qit = ReceivedQuota.begin ();
536536
537537 while (rit != ReceivedReserve.end () && qit != ReceivedQuota.end ()) {
538538 auto & request = rit->second ;
539539 const auto cookie = rit->first ;
540- TRACE (" processing quota for request cookie=" << cookie << " , QuotaChecked =" << request.QuotaChecked << " , QuotaAccepted=" << request.QuotaAccepted );
541- if (!request.QuotaChecked || request.QuotaAccepted ) {
540+ TRACE (" processing quota for request cookie=" << cookie << " , QuotaCheckEnabled =" << request.QuotaCheckEnabled << " , QuotaAccepted=" << request.QuotaAccepted );
541+ if (!request.QuotaCheckEnabled || request.QuotaAccepted ) {
542542 // A situation when a quota was not requested or was received while waiting for a reserve
543543 Write (cookie, std::move (request.Request ));
544544 ReceivedReserve.erase (rit++);
@@ -559,8 +559,8 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
559559 while (rit != ReceivedReserve.end ()) {
560560 auto & request = rit->second ;
561561 const auto cookie = rit->first ;
562- TRACE (" processing quota for request cookie=" << cookie << " , QuotaChecked =" << request.QuotaChecked << " , QuotaAccepted=" << request.QuotaAccepted );
563- if (request.QuotaChecked && !request.QuotaAccepted ) {
562+ TRACE (" processing quota for request cookie=" << cookie << " , QuotaCheckEnabled =" << request.QuotaCheckEnabled << " , QuotaAccepted=" << request.QuotaAccepted );
563+ if (request.QuotaCheckEnabled && !request.QuotaAccepted ) {
564564 break ;
565565 }
566566
@@ -587,27 +587,6 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
587587 ReceivedQuota.clear ();
588588 }
589589
590- void Write (ui64 cookie) {
591- if (PendingReserve.empty ()) {
592- ERROR (" The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #02" );
593- Disconnected (EErrorCode::InternalError);
594- return ;
595- }
596- auto it = PendingReserve.begin ();
597-
598- auto cookieReserveValid = (it->first == cookie);
599- auto cookieWriteValid = (PendingWrite.empty () || PendingWrite.back () < cookie);
600- if (!(cookieReserveValid && cookieWriteValid)) {
601- ERROR (" The cookie of Write is invalid. Cookie=" << cookie);
602- Disconnected (EErrorCode::InternalError);
603- return ;
604- }
605-
606- Write (cookie, std::move (it->second .Request ));
607-
608- PendingReserve.erase (it);
609- }
610-
611590 void Write (ui64 cookie, NKikimrClient::TPersQueueRequest&& req) {
612591 auto ev = MakeHolder<TEvPersQueue::TEvRequest>();
613592 ev->Record = std::move (req);
@@ -651,24 +630,26 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
651630 return WriteResult (EErrorCode::InternalError, error, std::move (record));
652631 }
653632
654- WriteAccepted (cookie);
655-
656- if (PendingReserve.empty ()) {
657- ERROR (" The state of the PartitionWriter is invalid. PendingReserve is empty. Marker #03" );
633+ auto cookieWriteValid = (PendingWrite.empty () || PendingWrite.back () < cookie);
634+ if (!cookieWriteValid) {
635+ ERROR (" The cookie of Write is invalid. Cookie=" << cookie);
658636 Disconnected (EErrorCode::InternalError);
659637 return ;
660638 }
639+
640+ WriteAccepted (cookie);
661641 auto it = PendingReserve.begin ();
662642 auto & holder = it->second ;
663643
664- if ((holder.QuotaChecked && !holder.QuotaAccepted )|| !ReceivedReserve.empty ()) {
644+ if ((holder.QuotaCheckEnabled && !holder.QuotaAccepted ) || !ReceivedReserve.empty ()) {
665645 // There may be two situations:
666646 // - a quota has been requested, and the quota has not been received yet
667647 // - the quota was not requested, for example, due to a change in the metering option, but the previous quota requests have not yet been processed
668648 EnqueueReservedAndProcess (cookie);
669649 } else {
670- Write (cookie);
650+ Write (cookie, std::move (it-> second . Request ) );
671651 }
652+ PendingReserve.erase (it);
672653 } else {
673654 if (PendingWrite.empty ()) {
674655 return WriteResult (EErrorCode::InternalError, " Unexpected Write response" , std::move (record));
@@ -740,7 +721,7 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
740721 ReceivedQuota.insert (ReceivedQuota.end (), PendingQuota.begin (), PendingQuota.end ());
741722 PendingQuota.clear ();
742723
743- ProcessQuota ();
724+ ProcessQuotaAndWrite ();
744725
745726 break ;
746727
@@ -829,12 +810,12 @@ class TPartitionWriter: public TActorBootstrapped<TPartitionWriter>, private TRl
829810
830811 struct RequestHolder {
831812 NKikimrClient::TPersQueueRequest Request;
832- bool QuotaChecked ;
813+ bool QuotaCheckEnabled ;
833814 bool QuotaAccepted;
834815
835- RequestHolder (NKikimrClient::TPersQueueRequest&& request, bool quotaChecked )
816+ RequestHolder (NKikimrClient::TPersQueueRequest&& request, bool quotaCheckEnabled )
836817 : Request(std::move(request))
837- , QuotaChecked(quotaChecked )
818+ , QuotaCheckEnabled(quotaCheckEnabled )
838819 , QuotaAccepted(false ) {
839820 }
840821 };
0 commit comments