Skip to content

Commit 7f6d8cc

Browse files
authored
Fix deadlines in DSProxy for Status, MultiPut, Patch requests, add UTs for deadlines (#11780)
1 parent 65fee37 commit 7f6d8cc

File tree

5 files changed

+386
-64
lines changed

5 files changed

+386
-64
lines changed

ydb/core/blobstorage/dsproxy/dsproxy.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ const ui64 UnconfiguredBufferSizeLimit = 32 << 20;
3939

4040
const TDuration ProxyEstablishSessionsTimeout = TDuration::Seconds(100);
4141

42-
const ui64 DsPutWakeupMs = 60000;
42+
const TDuration DsMinimumDelayBetweenPutWakeups = TDuration::MilliSeconds(1);
4343

4444
const ui64 BufferSizeThreshold = 1 << 20;
4545

ydb/core/blobstorage/dsproxy/dsproxy_put.cpp

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
4242
TErasureSplitContext ErasureSplitContext = TErasureSplitContext::Init(MaxBytesToSplitAtOnce);
4343
TBatchedVec<TStackVec<TRope, TypicalPartsInBlob>> PartSets;
4444

45+
using TDeadlineMask = std::bitset<MaxBatchedPutRequests>;
46+
std::map<TInstant, TDeadlineMask> PutDeadlineMasks;
47+
TDeadlineMask DeadlineMask;
48+
4549
TStackVec<ui64, TypicalDisksInGroup> WaitingVDiskResponseCount;
4650
ui64 WaitingVDiskCount = 0;
4751

@@ -647,6 +651,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
647651
<< " RestartCounter# " << RestartCounter);
648652

649653
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
654+
PutDeadlineMasks[PutImpl.Blobs[blobIdx].Deadline].set(blobIdx);
650655
LWTRACK(DSProxyPutBootstrapStart, PutImpl.Blobs[blobIdx].Orbit);
651656
}
652657

@@ -667,7 +672,8 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
667672
getTotalSize()
668673
);
669674

670-
Become(&TBlobStorageGroupPutRequest::StateWait, TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup);
675+
Become(&TBlobStorageGroupPutRequest::StateWait);
676+
ScheduleWakeup(TInstant::Zero());
671677

672678
PartSets.resize(PutImpl.Blobs.size());
673679
for (auto& partSet : PartSets) {
@@ -718,15 +724,27 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
718724
<< " BlobIDs# " << BlobIdSequenceToString()
719725
<< " Not answered in "
720726
<< (TActivationContext::Monotonic() - RequestStartTime) << " seconds");
727+
721728
const TInstant now = TActivationContext::Now();
729+
while (!PutDeadlineMasks.empty()) {
730+
auto [deadline, mask] = *PutDeadlineMasks.begin();
731+
if (deadline <= now) {
732+
DeadlineMask |= mask;
733+
PutDeadlineMasks.erase(PutDeadlineMasks.begin());
734+
} else {
735+
break;
736+
}
737+
}
738+
722739
TPutImpl::TPutResultVec putResults;
723740
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
724-
if (!PutImpl.Blobs[blobIdx].Replied && now > PutImpl.Blobs[blobIdx].Deadline) {
741+
if (!PutImpl.Blobs[blobIdx].Replied && DeadlineMask[blobIdx]) {
725742
PutImpl.PrepareOneReply(NKikimrProto::DEADLINE, blobIdx, LogCtx, "Deadline timer hit", putResults);
726743
}
727744
}
728-
ReplyAndDieWithLastResponse(putResults);
729-
Schedule(TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup);
745+
if (!ReplyAndDieWithLastResponse(putResults)) {
746+
ScheduleWakeup(now);
747+
}
730748
}
731749

732750
void UpdatePengingVDiskResponseCount(const TDeque<TPutImpl::TPutEvent>& putEvents) {
@@ -793,6 +811,21 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
793811
<< " State# " << PutImpl.DumpFullState());
794812
}
795813

814+
void ScheduleWakeup(TInstant lastWakeup) {
815+
TInstant now = TActivationContext::Now();
816+
TInstant deadline = lastWakeup + DsMinimumDelayBetweenPutWakeups;
817+
818+
// find first deadline after now
819+
for (auto it = PutDeadlineMasks.begin(); it != PutDeadlineMasks.end(); ++it) {
820+
deadline = std::max(deadline, it->first);
821+
if (it->first > now) {
822+
break;
823+
}
824+
}
825+
826+
Schedule(deadline, new TKikimrEvents::TEvWakeup);
827+
}
828+
796829
STATEFN(StateWait) {
797830
if (ProcessEvent(ev, true)) {
798831
return;

ydb/core/blobstorage/dsproxy/dsproxy_request.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,7 @@ namespace NKikimr {
448448
.ExecutionRelay = ev->Get()->ExecutionRelay
449449
}
450450
}),
451-
TInstant::Max()
451+
ev->Get()->Deadline
452452
);
453453
}
454454

0 commit comments

Comments
 (0)