Skip to content

Commit 3416d09

Browse files
committed
Improve deadlines in TEvPut handling
1 parent af6695c commit 3416d09

File tree

2 files changed

+30
-13
lines changed

2 files changed

+30
-13
lines changed

ydb/core/blobstorage/dsproxy/dsproxy.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ const ui64 UnconfiguredBufferSizeLimit = 32 << 20;
3939

4040
const TDuration ProxyEstablishSessionsTimeout = TDuration::Seconds(100);
4141

42-
const ui64 DsPutWakeupMs = 60000;
42+
const TDuration DsMinimumDelayBetweenPutWakeups = TDuration::MilliSeconds(100);
4343

4444
const ui64 BufferSizeThreshold = 1 << 20;
4545

ydb/core/blobstorage/dsproxy/dsproxy_put.cpp

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
4242
TErasureSplitContext ErasureSplitContext = TErasureSplitContext::Init(MaxBytesToSplitAtOnce);
4343
TBatchedVec<TStackVec<TRope, TypicalPartsInBlob>> PartSets;
4444

45-
std::set<TInstant> PutDeadlines;
45+
static_assert(MaxBatchedPutRequests <= sizeof(ui64) * 8);
46+
std::map<TInstant, ui64> PutDeadlineMasks;
47+
ui64 DeadlineMask = 0;
48+
4649
TStackVec<ui64, TypicalDisksInGroup> WaitingVDiskResponseCount;
4750
ui64 WaitingVDiskCount = 0;
4851

@@ -648,7 +651,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
648651
<< " RestartCounter# " << RestartCounter);
649652

650653
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
651-
PutDeadlines.insert(PutImpl.Blobs[blobIdx].Deadline);
654+
PutDeadlineMasks[PutImpl.Blobs[blobIdx].Deadline] |= (1 << blobIdx);
652655
LWTRACK(DSProxyPutBootstrapStart, PutImpl.Blobs[blobIdx].Orbit);
653656
}
654657

@@ -670,7 +673,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
670673
);
671674

672675
Become(&TBlobStorageGroupPutRequest::StateWait);
673-
ScheduleWakeup();
676+
ScheduleWakeup(TInstant::Zero());
674677

675678
PartSets.resize(PutImpl.Blobs.size());
676679
for (auto& partSet : PartSets) {
@@ -721,15 +724,26 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
721724
<< " BlobIDs# " << BlobIdSequenceToString()
722725
<< " Not answered in "
723726
<< (TActivationContext::Monotonic() - RequestStartTime) << " seconds");
727+
724728
const TInstant now = TActivationContext::Now();
729+
while (!PutDeadlineMasks.empty()) {
730+
auto [deadline, mask] = *PutDeadlineMasks.begin();
731+
if (deadline <= now) {
732+
DeadlineMask |= mask;
733+
PutDeadlineMasks.erase(PutDeadlineMasks.begin());
734+
} else {
735+
break;
736+
}
737+
}
738+
725739
TPutImpl::TPutResultVec putResults;
726740
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
727-
if (!PutImpl.Blobs[blobIdx].Replied && now >= PutImpl.Blobs[blobIdx].Deadline) {
741+
if (!PutImpl.Blobs[blobIdx].Replied && (DeadlineMask & (1 << blobIdx))) {
728742
PutImpl.PrepareOneReply(NKikimrProto::DEADLINE, blobIdx, LogCtx, "Deadline timer hit", putResults);
729743
}
730744
}
731745
if (!ReplyAndDieWithLastResponse(putResults)) {
732-
ScheduleWakeup();
746+
ScheduleWakeup(now);
733747
}
734748
}
735749

@@ -797,16 +811,19 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
797811
<< " State# " << PutImpl.DumpFullState());
798812
}
799813

800-
void ScheduleWakeup() {
814+
void ScheduleWakeup(TInstant lastWakeup) {
801815
TInstant now = TActivationContext::Now();
802-
while (!PutDeadlines.empty()) {
803-
TInstant deadline = *PutDeadlines.begin();
804-
PutDeadlines.erase(PutDeadlines.begin());
805-
if (deadline > now && deadline != TInstant::Max()) {
806-
Schedule(deadline, new TKikimrEvents::TEvWakeup);
807-
return;
816+
TInstant deadline = lastWakeup + DsMinimumDelayBetweenPutWakeups;
817+
818+
// find first deadline after now
819+
for (auto it = PutDeadlineMasks.begin(); it != PutDeadlineMasks.end(); ++it) {
820+
deadline = std::max(deadline, it->first);
821+
if (it->first > now) {
822+
break;
808823
}
809824
}
825+
826+
Schedule(deadline, new TKikimrEvents::TEvWakeup);
810827
}
811828

812829
STATEFN(StateWait) {

0 commit comments

Comments
 (0)