Skip to content

Commit 1b7f2bc

Browse files
committed
Better deadlines for multiputs
1 parent 15ea38f commit 1b7f2bc

File tree

1 file changed

+16
-6
lines changed

1 file changed

+16
-6
lines changed

ydb/core/blobstorage/dsproxy/dsproxy_put.cpp

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
4242
TErasureSplitContext ErasureSplitContext = TErasureSplitContext::Init(MaxBytesToSplitAtOnce);
4343
TBatchedVec<TStackVec<TRope, TypicalPartsInBlob>> PartSets;
4444

45+
std::set<TInstant> PutDeadlines;
4546
TStackVec<ui64, TypicalDisksInGroup> WaitingVDiskResponseCount;
4647
ui64 WaitingVDiskCount = 0;
4748

@@ -646,9 +647,8 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
646647
<< " Tactic# " << TEvBlobStorage::TEvPut::TacticName(Tactic)
647648
<< " RestartCounter# " << RestartCounter);
648649

649-
TInstant firstDeadline = TInstant::Max();
650650
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
651-
firstDeadline = std::min(firstDeadline, PutImpl.Blobs[blobIdx].Deadline);
651+
PutDeadlines.insert(PutImpl.Blobs[blobIdx].Deadline);
652652
LWTRACK(DSProxyPutBootstrapStart, PutImpl.Blobs[blobIdx].Orbit);
653653
}
654654

@@ -669,9 +669,8 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
669669
getTotalSize()
670670
);
671671

672-
TInstant now = TActivationContext::Now();
673-
TInstant wakeupTime = std::min(now + TDuration::MilliSeconds(DsPutWakeupMs), firstDeadline);
674-
Become(&TBlobStorageGroupPutRequest::StateWait, wakeupTime, new TKikimrEvents::TEvWakeup);
672+
Become(&TBlobStorageGroupPutRequest::StateWait);
673+
ScheduleWakeup();
675674

676675
PartSets.resize(PutImpl.Blobs.size());
677676
for (auto& partSet : PartSets) {
@@ -730,7 +729,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
730729
}
731730
}
732731
ReplyAndDieWithLastResponse(putResults);
733-
Schedule(TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup);
732+
ScheduleWakeup();
734733
}
735734

736735
void UpdatePengingVDiskResponseCount(const TDeque<TPutImpl::TPutEvent>& putEvents) {
@@ -797,6 +796,17 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
797796
<< " State# " << PutImpl.DumpFullState());
798797
}
799798

799+
void ScheduleWakeup() {
800+
TInstant deadline = TActivationContext::Now() + TDuration::MilliSeconds(DsPutWakeupMs);
801+
802+
auto it = PutDeadlines.begin();
803+
if (it != PutDeadlines.end() && *it <= deadline) {
804+
deadline = *it;
805+
PutDeadlines.erase(it);
806+
}
807+
Schedule(deadline, new TKikimrEvents::TEvWakeup);
808+
}
809+
800810
STATEFN(StateWait) {
801811
if (ProcessEvent(ev, true)) {
802812
return;

0 commit comments

Comments
 (0)