Skip to content

Commit 011e5bb

Browse files
authored
Merge 714ec12 into 8f51162
2 parents 8f51162 + 714ec12 commit 011e5bb

File tree

5 files changed

+392
-64
lines changed

5 files changed

+392
-64
lines changed

ydb/core/blobstorage/dsproxy/dsproxy.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ const ui64 UnconfiguredBufferSizeLimit = 32 << 20;
3737

3838
const TDuration ProxyEstablishSessionsTimeout = TDuration::Seconds(5);
3939

40-
const ui64 DsPutWakeupMs = 60000;
40+
const TDuration DsMinimumDelayBetweenPutWakeups = TDuration::Seconds(1);
41+
const TDuration DsMaximumPutTimeout = TDuration::Seconds(60);
4142

4243
const ui64 BufferSizeThreshold = 1 << 20;
4344

ydb/core/blobstorage/dsproxy/dsproxy_put.cpp

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
4242
TErasureSplitContext ErasureSplitContext = TErasureSplitContext::Init(MaxBytesToSplitAtOnce);
4343
TBatchedVec<TStackVec<TRope, TypicalPartsInBlob>> PartSets;
4444

45+
using TDeadlineMask = std::bitset<MaxBatchedPutRequests>;
46+
std::map<TInstant, TDeadlineMask> PutDeadlineMasks;
47+
TDeadlineMask DeadlineMask;
48+
4549
TStackVec<ui64, TypicalDisksInGroup> WaitingVDiskResponseCount;
4650
ui64 WaitingVDiskCount = 0;
4751

@@ -645,7 +649,11 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
645649
<< " Tactic# " << TEvBlobStorage::TEvPut::TacticName(Tactic)
646650
<< " RestartCounter# " << RestartCounter);
647651

652+
TInstant now = TActivationContext::Now();
653+
648654
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
655+
TInstant deadline = std::min(now + DsMaximumPutTimeout, PutImpl.Blobs[blobIdx].Deadline);
656+
PutDeadlineMasks[deadline].set(blobIdx);
649657
LWTRACK(DSProxyPutBootstrapStart, PutImpl.Blobs[blobIdx].Orbit);
650658
}
651659

@@ -666,7 +674,8 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
666674
getTotalSize()
667675
);
668676

669-
Become(&TBlobStorageGroupPutRequest::StateWait, TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup);
677+
Become(&TBlobStorageGroupPutRequest::StateWait);
678+
ScheduleNextWakeup(TInstant::Zero());
670679

671680
PartSets.resize(PutImpl.Blobs.size());
672681
for (auto& partSet : PartSets) {
@@ -717,15 +726,27 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
717726
<< " BlobIDs# " << BlobIdSequenceToString()
718727
<< " Not answered in "
719728
<< (TActivationContext::Monotonic() - RequestStartTime) << " seconds");
729+
720730
const TInstant now = TActivationContext::Now();
731+
while (!PutDeadlineMasks.empty()) {
732+
auto [deadline, mask] = *PutDeadlineMasks.begin();
733+
if (deadline <= now) {
734+
DeadlineMask |= mask;
735+
PutDeadlineMasks.erase(PutDeadlineMasks.begin());
736+
} else {
737+
break;
738+
}
739+
}
740+
721741
TPutImpl::TPutResultVec putResults;
722742
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
723-
if (!PutImpl.Blobs[blobIdx].Replied && now > PutImpl.Blobs[blobIdx].Deadline) {
743+
if (!PutImpl.Blobs[blobIdx].Replied && DeadlineMask[blobIdx]) {
724744
PutImpl.PrepareOneReply(NKikimrProto::DEADLINE, blobIdx, LogCtx, "Deadline timer hit", putResults);
725745
}
726746
}
727-
ReplyAndDieWithLastResponse(putResults);
728-
Schedule(TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup);
747+
if (!ReplyAndDieWithLastResponse(putResults)) {
748+
ScheduleNextWakeup(now);
749+
}
729750
}
730751

731752
void UpdatePengingVDiskResponseCount(const TDeque<TPutImpl::TPutEvent>& putEvents) {
@@ -792,6 +813,23 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
792813
<< " State# " << PutImpl.DumpFullState());
793814
}
794815

816+
void ScheduleNextWakeup(TInstant lastWakeup) {
817+
TInstant now = TActivationContext::Now();
818+
TInstant deadline = lastWakeup + DsMinimumDelayBetweenPutWakeups;
819+
820+
// find first deadline after now
821+
for (auto it = PutDeadlineMasks.begin(); it != PutDeadlineMasks.end(); ++it) {
822+
deadline = std::max(deadline, it->first);
823+
if (it->first > now) {
824+
break;
825+
}
826+
}
827+
828+
if (deadline != TInstant::Max()) {
829+
Schedule(deadline, new TKikimrEvents::TEvWakeup);
830+
}
831+
}
832+
795833
STATEFN(StateWait) {
796834
if (ProcessEvent(ev, true)) {
797835
return;

ydb/core/blobstorage/dsproxy/dsproxy_request.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -502,7 +502,7 @@ namespace NKikimr {
502502
.ForceGroupGeneration = ev->Get()->ForceGroupGeneration,
503503
}
504504
}),
505-
TInstant::Max()
505+
ev->Get()->Deadline
506506
);
507507
}
508508

0 commit comments

Comments
 (0)