Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/dsproxy/dsproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const ui64 UnconfiguredBufferSizeLimit = 32 << 20;

const TDuration ProxyEstablishSessionsTimeout = TDuration::Seconds(100);

const ui64 DsPutWakeupMs = 60000;
const TDuration DsMinimumDelayBetweenPutWakeups = TDuration::MilliSeconds(1);

const ui64 BufferSizeThreshold = 1 << 20;

Expand Down
41 changes: 37 additions & 4 deletions ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
TErasureSplitContext ErasureSplitContext = TErasureSplitContext::Init(MaxBytesToSplitAtOnce);
TBatchedVec<TStackVec<TRope, TypicalPartsInBlob>> PartSets;

using TDeadlineMask = std::bitset<MaxBatchedPutRequests>;
std::map<TInstant, TDeadlineMask> PutDeadlineMasks;
TDeadlineMask DeadlineMask;

TStackVec<ui64, TypicalDisksInGroup> WaitingVDiskResponseCount;
ui64 WaitingVDiskCount = 0;

Expand Down Expand Up @@ -647,6 +651,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
<< " RestartCounter# " << RestartCounter);

for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
PutDeadlineMasks[PutImpl.Blobs[blobIdx].Deadline].set(blobIdx);
LWTRACK(DSProxyPutBootstrapStart, PutImpl.Blobs[blobIdx].Orbit);
}

Expand All @@ -667,7 +672,8 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
getTotalSize()
);

Become(&TBlobStorageGroupPutRequest::StateWait, TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup);
Become(&TBlobStorageGroupPutRequest::StateWait);
ScheduleWakeup(TInstant::Zero());

PartSets.resize(PutImpl.Blobs.size());
for (auto& partSet : PartSets) {
Expand Down Expand Up @@ -718,15 +724,27 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
<< " BlobIDs# " << BlobIdSequenceToString()
<< " Not answered in "
<< (TActivationContext::Monotonic() - RequestStartTime) << " seconds");

const TInstant now = TActivationContext::Now();
while (!PutDeadlineMasks.empty()) {
auto [deadline, mask] = *PutDeadlineMasks.begin();
if (deadline <= now) {
DeadlineMask |= mask;
PutDeadlineMasks.erase(PutDeadlineMasks.begin());
} else {
break;
}
}

TPutImpl::TPutResultVec putResults;
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
if (!PutImpl.Blobs[blobIdx].Replied && now > PutImpl.Blobs[blobIdx].Deadline) {
if (!PutImpl.Blobs[blobIdx].Replied && DeadlineMask[blobIdx]) {
PutImpl.PrepareOneReply(NKikimrProto::DEADLINE, blobIdx, LogCtx, "Deadline timer hit", putResults);
}
}
ReplyAndDieWithLastResponse(putResults);
Schedule(TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup);
if (!ReplyAndDieWithLastResponse(putResults)) {
ScheduleWakeup(now);
}
}

void UpdatePengingVDiskResponseCount(const TDeque<TPutImpl::TPutEvent>& putEvents) {
Expand Down Expand Up @@ -793,6 +811,21 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
<< " State# " << PutImpl.DumpFullState());
}

void ScheduleWakeup(TInstant lastWakeup) {
TInstant now = TActivationContext::Now();
TInstant deadline = lastWakeup + DsMinimumDelayBetweenPutWakeups;

// find first deadline after now
for (auto it = PutDeadlineMasks.begin(); it != PutDeadlineMasks.end(); ++it) {
deadline = std::max(deadline, it->first);
if (it->first > now) {
break;
}
}

Schedule(deadline, new TKikimrEvents::TEvWakeup);
}

STATEFN(StateWait) {
if (ProcessEvent(ev, true)) {
return;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/dsproxy/dsproxy_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ namespace NKikimr {
.ExecutionRelay = ev->Get()->ExecutionRelay
}
}),
TInstant::Max()
ev->Get()->Deadline
);
}

Expand Down
Loading
Loading