@@ -42,7 +42,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
4242 TErasureSplitContext ErasureSplitContext = TErasureSplitContext::Init(MaxBytesToSplitAtOnce);
4343 TBatchedVec<TStackVec<TRope, TypicalPartsInBlob>> PartSets;
4444
45- std::set<TInstant> PutDeadlines;
45+ static_assert (MaxBatchedPutRequests <= sizeof (ui64) * 8);
46+ std::map<TInstant, ui64> PutDeadlineMasks;
47+ ui64 DeadlineMask = 0 ;
48+
4649 TStackVec<ui64, TypicalDisksInGroup> WaitingVDiskResponseCount;
4750 ui64 WaitingVDiskCount = 0 ;
4851
@@ -648,7 +651,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
648651 << " RestartCounter# " << RestartCounter);
649652
650653 for (size_t blobIdx = 0 ; blobIdx < PutImpl.Blobs .size (); ++blobIdx) {
651- PutDeadlines. insert ( PutImpl.Blobs [blobIdx].Deadline );
654+ PutDeadlineMasks[ PutImpl.Blobs [blobIdx].Deadline ] |= ( 1 << blobIdx );
652655 LWTRACK (DSProxyPutBootstrapStart, PutImpl.Blobs [blobIdx].Orbit );
653656 }
654657
@@ -670,7 +673,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
670673 );
671674
672675 Become (&TBlobStorageGroupPutRequest::StateWait);
673- ScheduleWakeup ();
676+ ScheduleWakeup (TInstant::Zero () );
674677
675678 PartSets.resize (PutImpl.Blobs .size ());
676679 for (auto & partSet : PartSets) {
@@ -721,15 +724,26 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
721724 << " BlobIDs# " << BlobIdSequenceToString ()
722725 << " Not answered in "
723726 << (TActivationContext::Monotonic () - RequestStartTime) << " seconds" );
727+
724728 const TInstant now = TActivationContext::Now ();
729+ while (!PutDeadlineMasks.empty ()) {
730+ auto [deadline, mask] = *PutDeadlineMasks.begin ();
731+ if (deadline <= now) {
732+ DeadlineMask |= mask;
733+ PutDeadlineMasks.erase (PutDeadlineMasks.begin ());
734+ } else {
735+ break ;
736+ }
737+ }
738+
725739 TPutImpl::TPutResultVec putResults;
726740 for (size_t blobIdx = 0 ; blobIdx < PutImpl.Blobs .size (); ++blobIdx) {
727- if (!PutImpl.Blobs [blobIdx].Replied && now >= PutImpl. Blobs [ blobIdx]. Deadline ) {
741+ if (!PutImpl.Blobs [blobIdx].Replied && (DeadlineMask & ( 1 << blobIdx)) ) {
728742 PutImpl.PrepareOneReply (NKikimrProto::DEADLINE, blobIdx, LogCtx, " Deadline timer hit" , putResults);
729743 }
730744 }
731745 if (!ReplyAndDieWithLastResponse (putResults)) {
732- ScheduleWakeup ();
746+ ScheduleWakeup (now );
733747 }
734748 }
735749
@@ -797,16 +811,19 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
797811 << " State# " << PutImpl.DumpFullState ());
798812 }
799813
800- void ScheduleWakeup () {
814+ void ScheduleWakeup (TInstant lastWakeup ) {
801815 TInstant now = TActivationContext::Now ();
802- while (!PutDeadlines.empty ()) {
803- TInstant deadline = *PutDeadlines.begin ();
804- PutDeadlines.erase (PutDeadlines.begin ());
805- if (deadline > now && deadline != TInstant::Max ()) {
806- Schedule (deadline, new TKikimrEvents::TEvWakeup);
807- return ;
816+ TInstant deadline = lastWakeup + DsMinimumDelayBetweenPutWakeups;
817+
818+ // find first deadline after now
819+ for (auto it = PutDeadlineMasks.begin (); it != PutDeadlineMasks.end (); ++it) {
820+ deadline = std::max (deadline, it->first );
821+ if (it->first > now) {
822+ break ;
808823 }
809824 }
825+
826+ Schedule (deadline, new TKikimrEvents::TEvWakeup);
810827 }
811828
812829 STATEFN (StateWait) {
0 commit comments