Skip to content

Commit 20d589a

Browse files
authored
Merge c4b96cf into 6b225ec
2 parents 6b225ec + c4b96cf commit 20d589a

File tree

4 files changed

+420
-66
lines changed

4 files changed

+420
-66
lines changed

ydb/core/blobstorage/dsproxy/dsproxy_put.cpp

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
4242
TErasureSplitContext ErasureSplitContext = TErasureSplitContext::Init(MaxBytesToSplitAtOnce);
4343
TBatchedVec<TStackVec<TRope, TypicalPartsInBlob>> PartSets;
4444

45+
std::set<TInstant> PutDeadlines;
4546
TStackVec<ui64, TypicalDisksInGroup> WaitingVDiskResponseCount;
4647
ui64 WaitingVDiskCount = 0;
4748

@@ -647,6 +648,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
647648
<< " RestartCounter# " << RestartCounter);
648649

649650
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
651+
PutDeadlines.insert(PutImpl.Blobs[blobIdx].Deadline);
650652
LWTRACK(DSProxyPutBootstrapStart, PutImpl.Blobs[blobIdx].Orbit);
651653
}
652654

@@ -667,7 +669,8 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
667669
getTotalSize()
668670
);
669671

670-
Become(&TBlobStorageGroupPutRequest::StateWait, TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup);
672+
Become(&TBlobStorageGroupPutRequest::StateWait);
673+
ScheduleWakeup();
671674

672675
PartSets.resize(PutImpl.Blobs.size());
673676
for (auto& partSet : PartSets) {
@@ -721,12 +724,12 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
721724
const TInstant now = TActivationContext::Now();
722725
TPutImpl::TPutResultVec putResults;
723726
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
724-
if (!PutImpl.Blobs[blobIdx].Replied && now > PutImpl.Blobs[blobIdx].Deadline) {
727+
if (!PutImpl.Blobs[blobIdx].Replied && now >= PutImpl.Blobs[blobIdx].Deadline) {
725728
PutImpl.PrepareOneReply(NKikimrProto::DEADLINE, blobIdx, LogCtx, "Deadline timer hit", putResults);
726729
}
727730
}
728731
ReplyAndDieWithLastResponse(putResults);
729-
Schedule(TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup);
732+
ScheduleWakeup();
730733
}
731734

732735
void UpdatePengingVDiskResponseCount(const TDeque<TPutImpl::TPutEvent>& putEvents) {
@@ -793,6 +796,19 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
793796
<< " State# " << PutImpl.DumpFullState());
794797
}
795798

799+
void ScheduleWakeup() {
800+
TInstant now = TActivationContext::Now();
801+
while (!PutDeadlines.empty()) {
802+
TInstant deadline = *PutDeadlines.begin();
803+
PutDeadlines.erase(PutDeadlines.begin());
804+
if (deadline > now) {
805+
Schedule(deadline, new TKikimrEvents::TEvWakeup);
806+
return;
807+
}
808+
}
809+
Schedule(TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup);
810+
}
811+
796812
STATEFN(StateWait) {
797813
if (ProcessEvent(ev, true)) {
798814
return;

ydb/core/blobstorage/dsproxy/dsproxy_request.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,7 +425,7 @@ namespace NKikimr {
425425
.ExecutionRelay = ev->Get()->ExecutionRelay
426426
}
427427
}),
428-
TInstant::Max()
428+
ev->Get()->Deadline
429429
);
430430
}
431431

0 commit comments

Comments
 (0)