Skip to content

Commit 3f8a1a6

Browse files
authored
Merge 6f20084 into 5b7a720
2 parents 5b7a720 + 6f20084 commit 3f8a1a6

File tree

8 files changed

+111
-88
lines changed

8 files changed

+111
-88
lines changed

ydb/core/base/blobstorage.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -788,6 +788,7 @@ struct TEvBlobStorage {
788788
EvRequestProxySessionsState,
789789
EvProxySessionsState,
790790
EvBunchOfEvents,
791+
EvDeadline,
791792

792793
// blobstorage controller interface
793794
EvControllerRegisterNode = 0x10031602,

ydb/core/blobstorage/dsproxy/dsproxy.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,12 @@ class TBlobStorageGroupRequestActor : public TActor<TDerived> {
349349
Derived().ReplyAndDie(NKikimrProto::ERROR);
350350
return true;
351351
}
352+
353+
case TEvBlobStorage::EvDeadline: {
354+
ErrorReason = "Deadline timer hit";
355+
Derived().ReplyAndDie(NKikimrProto::DEADLINE);
356+
return true;
357+
}
352358
}
353359

354360
return false;

ydb/core/blobstorage/dsproxy/dsproxy_impl.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
2222
EvConfigureQueryTimeout,
2323
EvEstablishingSessionTimeout,
2424
Ev5min,
25+
EvCheckDeadlines,
2526
};
2627

2728
struct TEvUpdateResponsiveness : TEventLocal<TEvUpdateResponsiveness, EvUpdateResponsiveness> {};
@@ -61,7 +62,8 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
6162
TIntrusivePtr<TStoragePoolCounters> StoragePoolCounters;
6263
TIntrusivePtr<TGroupSessions> Sessions;
6364
TDeque<std::unique_ptr<IEventHandle>> InitQueue;
64-
THashSet<TActorId, TActorId::THash> ActiveRequests;
65+
std::multimap<TInstant, TActorId> DeadlineMap;
66+
THashMap<TActorId, std::multimap<TInstant, TActorId>::iterator, TActorId::THash> ActiveRequests;
6567
ui64 UnconfiguredBufferSize = 0;
6668
const bool IsEjected;
6769
bool ForceWaitAllDrives;
@@ -250,6 +252,8 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
250252
void Handle(TEvStopBatchingGetRequests::TPtr& ev);
251253

252254
// todo: in-fly tracking for cancelation and
255+
void PushRequest(IActor *actor, TInstant deadline);
256+
void CheckDeadlines();
253257
void HandleNormal(TEvBlobStorage::TEvGet::TPtr &ev);
254258
void HandleNormal(TEvBlobStorage::TEvPut::TPtr &ev);
255259
void HandleNormal(TEvBlobStorage::TEvBlock::TPtr &ev);
@@ -356,6 +360,7 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
356360
IgnoreFunc(TEvConfigureQueryTimeout);
357361
IgnoreFunc(TEvEstablishingSessionTimeout);
358362
fFunc(Ev5min, Handle5min);
363+
cFunc(EvCheckDeadlines, CheckDeadlines);
359364
)
360365

361366
#define HANDLE_EVENTS(HANDLER) \

ydb/core/blobstorage/dsproxy/dsproxy_put.cpp

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
4545

4646
bool IsManyPuts = false;
4747

48-
TInstant Deadline;
4948
ui64 RequestsSent = 0;
5049
ui64 ResponsesReceived = 0;
5150
ui64 MaxSaneRequests = 0;
@@ -468,7 +467,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
468467
ev->Bunch.emplace_back(new IEventHandle(
469468
TActorId() /*recipient*/,
470469
item.Recipient,
471-
put = new TEvBlobStorage::TEvPut(item.BlobId, TRcBuf(item.Buffer), Deadline, HandleClass, Tactic),
470+
put = new TEvBlobStorage::TEvPut(item.BlobId, TRcBuf(item.Buffer), item.Deadline, HandleClass, Tactic),
472471
0 /*flags*/,
473472
item.Cookie,
474473
nullptr /*forwardOnNondelivery*/,
@@ -506,7 +505,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
506505
ev->RestartCounter, std::move(span), nullptr)
507506
, PutImpl(info, state, ev, mon, enableRequestMod3x3ForMinLatecy, source, cookie, Span.GetTraceId())
508507
, WaitingVDiskResponseCount(info->GetTotalVDisksNum())
509-
, Deadline(ev->Deadline)
510508
, HandleClass(ev->HandleClass)
511509
, ReportedBytes(0)
512510
, TimeStatsEnabled(timeStatsEnabled)
@@ -550,7 +548,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
550548
, PutImpl(info, state, events, mon, handleClass, tactic, enableRequestMod3x3ForMinLatecy)
551549
, WaitingVDiskResponseCount(info->GetTotalVDisksNum())
552550
, IsManyPuts(true)
553-
, Deadline(TInstant::Zero())
554551
, HandleClass(handleClass)
555552
, ReportedBytes(0)
556553
, TimeStatsEnabled(timeStatsEnabled)
@@ -565,7 +562,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
565562
Y_DEBUG_ABORT_UNLESS(events.size() <= MaxBatchedPutRequests);
566563
for (auto &ev : events) {
567564
auto& msg = *ev->Get();
568-
Deadline = Max(Deadline, msg.Deadline);
569565
if (msg.Orbit.HasShuttles()) {
570566
RootCauseTrack.IsOn = true;
571567
}
@@ -594,7 +590,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
594590
<< " BlobIDs# " << BlobIdSequenceToString()
595591
<< " HandleClass# " << NKikimrBlobStorage::EPutHandleClass_Name(HandleClass)
596592
<< " Tactic# " << TEvBlobStorage::TEvPut::TacticName(Tactic)
597-
<< " Deadline# " << Deadline
598593
<< " RestartCounter# " << RestartCounter);
599594

600595
StartTime = TActivationContext::Monotonic();
@@ -647,19 +642,21 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
647642
return true;
648643
}
649644

650-
void Handle(TKikimrEvents::TEvWakeup::TPtr &ev) {
651-
Y_UNUSED(ev);
645+
void HandleWakeup() {
652646
A_LOG_WARN_S("BPP14", "Wakeup "
653647
<< " ActorId# " << SelfId()
654648
<< " Group# " << Info->GroupID
655649
<< " BlobIDs# " << BlobIdSequenceToString()
656650
<< " Not answered in "
657651
<< (TActivationContext::Monotonic() - StartTime) << " seconds");
658-
if (TInstant::Now() > Deadline) {
659-
ErrorReason = "Deadline exceeded";
660-
ReplyAndDie(NKikimrProto::DEADLINE);
661-
return;
652+
const TInstant now = TActivationContext::Now();
653+
TPutImpl::TPutResultVec putResults;
654+
for (size_t blobIdx = 0; blobIdx < PutImpl.Blobs.size(); ++blobIdx) {
655+
if (!PutImpl.Blobs[blobIdx].Replied && now > PutImpl.Blobs[blobIdx].Deadline) {
656+
PutImpl.PrepareOneReply(NKikimrProto::DEADLINE, blobIdx, LogCtx, "Deadline timer hit", putResults);
657+
}
662658
}
659+
ReplyAndDieWithLastResponse(putResults);
663660
Schedule(TDuration::MilliSeconds(DsPutWakeupMs), new TKikimrEvents::TEvWakeup);
664661
}
665662

@@ -740,7 +737,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
740737
hFunc(TEvBlobStorage::TEvVMultiPutResult, Handle);
741738
hFunc(TEvAccelerate, Handle);
742739
cFunc(TEvBlobStorage::EvResume, ResumeBootstrap);
743-
hFunc(TKikimrEvents::TEvWakeup, Handle);
740+
cFunc(TEvents::TSystem::Wakeup, HandleWakeup);
744741

745742
default:
746743
Y_DEBUG_ABORT("unexpected event Type# 0x%08" PRIx32, type);

ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,7 @@ ui64 TPutImpl::GetTimeToAccelerateNs(TLogContext &logCtx) {
100100

101101
TString TPutImpl::DumpFullState() const {
102102
TStringStream str;
103-
str << "{Deadline# " << Deadline;
104-
str << Endl;
105-
str << " Info# " << Info->ToString();
103+
str << "{Info# " << Info->ToString();
106104
str << Endl;
107105
str << " Blackboard# " << Blackboard.ToString();
108106
str << Endl;

ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ class TPutImpl {
2626
TBlobStorageGroupInfo::TServiceIds VDisksSvc;
2727
TBlobStorageGroupInfo::TVDiskIds VDisksId;
2828

29-
TInstant Deadline;
3029
const TIntrusivePtr<TBlobStorageGroupInfo> Info;
3130

3231
TBlackboard Blackboard;
@@ -56,10 +55,11 @@ class TPutImpl {
5655
std::vector<std::pair<ui64, ui32>> ExtraBlockChecks;
5756
NWilson::TSpan Span;
5857
std::shared_ptr<TEvBlobStorage::TExecutionRelay> ExecutionRelay;
58+
TInstant Deadline;
5959

6060
TBlobInfo(TLogoBlobID id, TRope&& buffer, TActorId recipient, ui64 cookie, NWilson::TTraceId traceId,
6161
NLWTrace::TOrbit&& orbit, std::vector<std::pair<ui64, ui32>> extraBlockChecks, bool single,
62-
std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay)
62+
std::shared_ptr<TEvBlobStorage::TExecutionRelay> executionRelay, TInstant deadline)
6363
: BlobId(id)
6464
, Buffer(std::move(buffer))
6565
, BufferSize(Buffer.size())
@@ -69,6 +69,7 @@ class TPutImpl {
6969
, ExtraBlockChecks(std::move(extraBlockChecks))
7070
, Span(single ? NWilson::TSpan() : NWilson::TSpan(TWilson::BlobStorage, std::move(traceId), "DSProxy.Put.Blob"))
7171
, ExecutionRelay(std::move(executionRelay))
72+
, Deadline(deadline)
7273
{}
7374

7475
void Output(IOutputStream& s) const {
@@ -103,8 +104,7 @@ class TPutImpl {
103104
TPutImpl(const TIntrusivePtr<TBlobStorageGroupInfo> &info, const TIntrusivePtr<TGroupQueues> &state,
104105
TEvBlobStorage::TEvPut *ev, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon,
105106
bool enableRequestMod3x3ForMinLatecy, TActorId recipient, ui64 cookie, NWilson::TTraceId traceId)
106-
: Deadline(ev->Deadline)
107-
, Info(info)
107+
: Info(info)
108108
, Blackboard(info, state, ev->HandleClass, NKikimrBlobStorage::EGetHandleClass::AsyncRead)
109109
, IsDone(1)
110110
, WrittenBeyondBarrier(1)
@@ -116,7 +116,7 @@ class TPutImpl {
116116
{
117117
BlobMap.emplace(ev->Id, Blobs.size());
118118
Blobs.emplace_back(ev->Id, TRope(ev->Buffer), recipient, cookie, std::move(traceId), std::move(ev->Orbit),
119-
std::move(ev->ExtraBlockChecks), true, std::move(ev->ExecutionRelay));
119+
std::move(ev->ExtraBlockChecks), true, std::move(ev->ExecutionRelay), ev->Deadline);
120120

121121
auto& blob = Blobs.back();
122122
LWPROBE(DSProxyBlobPutTactics, blob.BlobId.TabletID(), Info->GroupID, blob.BlobId.ToString(), Tactic,
@@ -127,8 +127,7 @@ class TPutImpl {
127127
TBatchedVec<TEvBlobStorage::TEvPut::TPtr> &events, const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon,
128128
NKikimrBlobStorage::EPutHandleClass putHandleClass, TEvBlobStorage::TEvPut::ETactic tactic,
129129
bool enableRequestMod3x3ForMinLatecy)
130-
: Deadline(TInstant::Zero())
131-
, Info(info)
130+
: Info(info)
132131
, Blackboard(info, state, putHandleClass, NKikimrBlobStorage::EGetHandleClass::AsyncRead)
133132
, IsDone(events.size())
134133
, WrittenBeyondBarrier(events.size())
@@ -146,8 +145,8 @@ class TPutImpl {
146145
Y_ABORT_UNLESS(msg.Tactic == tactic);
147146
BlobMap.emplace(msg.Id, Blobs.size());
148147
Blobs.emplace_back(msg.Id, TRope(msg.Buffer), ev->Sender, ev->Cookie, std::move(ev->TraceId),
149-
std::move(msg.Orbit), std::move(msg.ExtraBlockChecks), false, std::move(msg.ExecutionRelay));
150-
Deadline = Max(Deadline, msg.Deadline);
148+
std::move(msg.Orbit), std::move(msg.ExtraBlockChecks), false, std::move(msg.ExecutionRelay),
149+
msg.Deadline);
151150

152151
auto& blob = Blobs.back();
153152
LWPROBE(DSProxyBlobPutTactics, blob.BlobId.TabletID(), Info->GroupID, blob.BlobId.ToString(), Tactic,
@@ -222,7 +221,7 @@ class TPutImpl {
222221
if (std::next(it) == end) { // TEvVPut
223222
auto [orderNumber, ptr] = *it++;
224223
auto ev = std::make_unique<TEvBlobStorage::TEvVPut>(ptr->Id, ptr->Buffer, Info->GetVDiskId(orderNumber),
225-
false, nullptr, Deadline, Blackboard.PutHandleClass);
224+
false, nullptr, Blobs[ptr->BlobIdx].Deadline, Blackboard.PutHandleClass);
226225

227226
auto& record = ev->Record;
228227
for (const auto& [tabletId, generation] : Blobs[ptr->BlobIdx].ExtraBlockChecks) {
@@ -235,7 +234,12 @@ class TPutImpl {
235234
HandoffPartsSent += ptr->IsHandoff;
236235
++VPutRequests;
237236
} else { // TEvVMultiPut
238-
auto ev = std::make_unique<TEvBlobStorage::TEvVMultiPut>(Info->GetVDiskId(it->first), Deadline,
237+
TInstant deadline;
238+
for (auto temp = it; temp != end; ++temp) {
239+
auto [orderNumber, ptr] = *temp;
240+
deadline = Max(deadline, Blobs[ptr->BlobIdx].Deadline);
241+
}
242+
auto ev = std::make_unique<TEvBlobStorage::TEvVMultiPut>(Info->GetVDiskId(it->first), deadline,
239243
Blackboard.PutHandleClass, false);
240244
while (it != end) {
241245
auto [orderNumber, ptr] = *it++;

0 commit comments

Comments
 (0)