Skip to content

Commit 882617a

Browse files
authored
Merge 53903ba into 8e44a97
2 parents 8e44a97 + 53903ba commit 882617a

15 files changed

+156
-23
lines changed

ydb/core/blobstorage/dsproxy/dsproxy.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -354,8 +354,7 @@ struct TBlobStorageGroupRangeParameters {
354354
TBlobStorageGroupRequestActor::TTypeSpecificParameters TypeSpecific = {
355355
.LogComponent = NKikimrServices::BS_PROXY_RANGE,
356356
.Name = "DSProxy.Range",
357-
.Activity = NKikimrServices::TActivity::BS_GROUP_RANGE
358-
,
357+
.Activity = NKikimrServices::TActivity::BS_GROUP_RANGE,
359358
};
360359
};
361360
IActor* CreateBlobStorageGroupRangeRequest(TBlobStorageGroupRangeParameters params);
@@ -371,6 +370,7 @@ struct TBlobStorageGroupPutParameters {
371370
TDiskResponsivenessTracker::TPerDiskStatsPtr Stats;
372371
bool EnableRequestMod3x3ForMinLatency;
373372
TAccelerationParams AccelerationParams;
373+
TDuration LongRequestThreshold;
374374
};
375375
IActor* CreateBlobStorageGroupPutRequest(TBlobStorageGroupPutParameters params);
376376

@@ -389,6 +389,7 @@ struct TBlobStorageGroupMultiPutParameters {
389389
TEvBlobStorage::TEvPut::ETactic Tactic;
390390
bool EnableRequestMod3x3ForMinLatency;
391391
TAccelerationParams AccelerationParams;
392+
TDuration LongRequestThreshold;
392393

393394
static ui32 CalculateRestartCounter(TBatchedVec<TEvBlobStorage::TEvPut::TPtr>& events) {
394395
ui32 maxRestarts = 0;
@@ -409,6 +410,7 @@ struct TBlobStorageGroupGetParameters {
409410
};
410411
TNodeLayoutInfoPtr NodeLayout;
411412
TAccelerationParams AccelerationParams;
413+
TDuration LongRequestThreshold;
412414
};
413415
IActor* CreateBlobStorageGroupGetRequest(TBlobStorageGroupGetParameters params);
414416

@@ -511,6 +513,8 @@ IActor* CreateBlobStorageGroupEjectedProxy(ui32 groupId, TIntrusivePtr<TDsProxyN
511513

512514
struct TBlobStorageProxyParameters {
513515
bool UseActorSystemTimeInBSQueue = false;
516+
TDuration RequestReportingThrottlerDelay = TDuration::Seconds(60);
517+
TDuration LongRequestThreshold = TDuration::Seconds(50);
514518

515519
const TControlWrapper& EnablePutBatching;
516520
const TControlWrapper& EnableVPatch;

ydb/core/blobstorage/dsproxy/dsproxy_get.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "dsproxy.h"
22
#include "dsproxy_mon.h"
33
#include "root_cause.h"
4+
#include <ydb/core/blobstorage/dsproxy/dsproxy_request_reporting.h>
45
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
56
#include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h>
67
#include <library/cpp/containers/stack_vector/stack_vec.h>
@@ -58,6 +59,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
5859
bool IsPutAccelerateScheduled = false;
5960

6061
TAccelerationParams AccelerationParams;
62+
TDuration LongRequestThreshold;
6163

6264
void Handle(TEvAccelerateGet::TPtr &ev) {
6365
IsGetAccelerateScheduled = false;
@@ -380,6 +382,14 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
380382
evResult->GroupId, channel, NKikimrBlobStorage::EGetHandleClass_Name(GetImpl.GetHandleClass()),
381383
success);
382384
A_LOG_LOG_S(true, success ? NLog::PRI_INFO : NLog::PRI_NOTICE, "BPG68", "Result# " << evResult->Print(false));
385+
386+
bool allowToReport = AllowToReport(GetImpl.GetHandleClass());
387+
if (TActivationContext::Now() - StartTime >= LongRequestThreshold) {
388+
if (allowToReport) {
389+
R_LOG_WARN_S("BPG71", "TEvGet Request was being processed for more than " << LongRequestThreshold
390+
<< ", serialized RootCause# " << RootCauseTrack.ToString());
391+
}
392+
}
383393
return SendResponseAndDie(std::unique_ptr<TEvBlobStorage::TEvGetResult>(evResult.Release()));
384394
}
385395

@@ -409,6 +419,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
409419
, GroupSize(Info->Type.BlobSubgroupSize())
410420
, ReportedBytes(0)
411421
, AccelerationParams(params.AccelerationParams)
422+
, LongRequestThreshold(params.LongRequestThreshold)
412423
{
413424
ReportBytes(sizeof(*this));
414425
MaxSaneRequests = params.Common.Event->QuerySize * Info->Type.TotalPartCount() *

ydb/core/blobstorage/dsproxy/dsproxy_impl.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ namespace NKikimr {
1919
, EnableVPatch(params.EnableVPatch)
2020
, SlowDiskThreshold(params.SlowDiskThreshold)
2121
, PredictedDelayMultiplier(params.PredictedDelayMultiplier)
22+
, LongRequestThreshold(params.LongRequestThreshold)
2223
{}
2324

2425
TBlobStorageGroupProxy::TBlobStorageGroupProxy(ui32 groupId, bool isEjected,TIntrusivePtr<TDsProxyNodeMon> &nodeMon,
@@ -32,6 +33,7 @@ namespace NKikimr {
3233
, EnableVPatch(params.EnableVPatch)
3334
, SlowDiskThreshold(params.SlowDiskThreshold)
3435
, PredictedDelayMultiplier(params.PredictedDelayMultiplier)
36+
, LongRequestThreshold(params.LongRequestThreshold)
3537
{}
3638

3739
IActor* CreateBlobStorageGroupEjectedProxy(ui32 groupId, TIntrusivePtr<TDsProxyNodeMon> &nodeMon) {

ydb/core/blobstorage/dsproxy/dsproxy_impl.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
#include "defs.h"
44
#include "dsproxy.h"
55

6+
#include <ydb/core/blobstorage/base/utility.h>
7+
68
namespace NKikimr {
79

810
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -122,6 +124,8 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
122124
TMemorizableControlWrapper SlowDiskThreshold;
123125
TMemorizableControlWrapper PredictedDelayMultiplier;
124126

127+
TDuration LongRequestThreshold;
128+
125129
TAccelerationParams GetAccelerationParams();
126130

127131
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

ydb/core/blobstorage/dsproxy/dsproxy_put.cpp

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "dsproxy_mon.h"
33
#include "root_cause.h"
44
#include "dsproxy_put_impl.h"
5+
#include <ydb/core/blobstorage/dsproxy/dsproxy_request_reporting.h>
56
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
67

78
#include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h>
@@ -83,6 +84,8 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
8384

8485
TBlobStorageGroupInfo::TGroupVDisks ExpiredVDiskSet;
8586

87+
TDuration LongRequestThreshold;
88+
8689
void SanityCheck() {
8790
if (RequestsSent <= MaxSaneRequests) {
8891
return;
@@ -277,11 +280,11 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
277280
GetTotalTimeMs(record.GetTimestamps()) - GetVDiskTimeMs(record.GetTimestamps()),
278281
NKikimrBlobStorage::EPutHandleClass_Name(PutImpl.GetPutHandleClass()),
279282
NKikimrProto::EReplyStatus_Name(status));
280-
//if (RootCauseTrack.IsOn) {
281-
// RootCauseTrack.OnReply(cookie.GetCauseIdx(),
282-
// GetTotalTimeMs(record.GetTimestamps()) - GetVDiskTimeMs(record.GetTimestamps()),
283-
// GetVDiskTimeMs(record.GetTimestamps()));
284-
//}
283+
if (RootCauseTrack.IsOn) {
284+
RootCauseTrack.OnReply(record.GetCookie(),
285+
GetTotalTimeMs(record.GetTimestamps()) - GetVDiskTimeMs(record.GetTimestamps()),
286+
GetVDiskTimeMs(record.GetTimestamps()));
287+
}
285288

286289
if (status == NKikimrProto::BLOCKED || status == NKikimrProto::DEADLINE) {
287290
TString error = TStringBuilder() << "Got VPutResult status# " << status << " from VDiskId# " << vdiskId;
@@ -359,14 +362,14 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
359362
}
360363

361364
// Handle put results
362-
//bool isCauseRegistered = !RootCauseTrack.IsOn;
365+
bool isCauseRegistered = !RootCauseTrack.IsOn;
363366
TPutImpl::TPutResultVec putResults;
364367
for (auto &item : record.GetItems()) {
365-
//if (!isCauseRegistered) {
366-
// isCauseRegistered = RootCauseTrack.OnReply(cookie.GetCauseIdx(),
367-
// GetTotalTimeMs(record.GetTimestamps()) - GetVDiskTimeMs(record.GetTimestamps()),
368-
// GetVDiskTimeMs(record.GetTimestamps()));
369-
//}
368+
if (!isCauseRegistered) {
369+
isCauseRegistered = RootCauseTrack.OnReply(record.GetCookie(),
370+
GetTotalTimeMs(record.GetTimestamps()) - GetVDiskTimeMs(record.GetTimestamps()),
371+
GetVDiskTimeMs(record.GetTimestamps()));
372+
}
370373

371374
Y_ABORT_UNLESS(item.HasStatus());
372375
Y_ABORT_UNLESS(item.HasBlobID());
@@ -472,6 +475,16 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
472475
PutImpl.Blobs[blobIdx].Recipient, PutImpl.Blobs[blobIdx].Cookie, false);
473476
PutImpl.Blobs[blobIdx].Replied = true;
474477
}
478+
479+
if (TActivationContext::Monotonic() - StartTime >= LongRequestThreshold) {
480+
bool allowToReport = AllowToReport(HandleClass);
481+
R_LOG_WARN_S("DEBUG", TActivationContext::Monotonic() - StartTime << " " << LongRequestThreshold << " " << allowToReport << " "
482+
<< NKikimrBlobStorage::EPutHandleClass_Name(PutImpl.GetPutHandleClass()));
483+
if (allowToReport) {
484+
R_LOG_WARN_S("BPP71", "TEvPut Request was being processed for more than " << LongRequestThreshold
485+
<< ", serialized RootCause# " << RootCauseTrack.ToString());
486+
}
487+
}
475488
}
476489

477490
TString BlobIdSequenceToString() const {
@@ -535,6 +548,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
535548
, AccelerationParams(params.AccelerationParams)
536549
, IncarnationRecords(Info->GetTotalVDisksNum())
537550
, ExpiredVDiskSet(&Info->GetTopology())
551+
, LongRequestThreshold(params.LongRequestThreshold)
538552
{
539553
if (params.Common.Event->Orbit.HasShuttles()) {
540554
RootCauseTrack.IsOn = true;
@@ -560,6 +574,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
560574
, AccelerationParams(params.AccelerationParams)
561575
, IncarnationRecords(Info->GetTotalVDisksNum())
562576
, ExpiredVDiskSet(&Info->GetTopology())
577+
, LongRequestThreshold(params.LongRequestThreshold)
563578
{
564579
Y_DEBUG_ABORT_UNLESS(params.Events.size() <= MaxBatchedPutRequests);
565580
for (auto &ev : params.Events) {
@@ -665,12 +680,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
665680
void UpdatePengingVDiskResponseCount(const TDeque<TPutImpl::TPutEvent>& putEvents) {
666681
for (auto& event : putEvents) {
667682
std::visit([&](auto& event) {
668-
//Y_ABORT_UNLESS(event->Record.HasCookie());
669-
//TCookie cookie(event->Record.GetCookie());
670-
//if (RootCauseTrack.IsOn) {
671-
// cookie.SetCauseIdx(RootCauseTrack.RegisterCause());
672-
// event->Record.SetCookie(cookie);
673-
//}
683+
ui64 causeIdx = RootCauseTrack.RegisterCause();
684+
if (event->Record.HasCookie() && RootCauseTrack.IsOn) {
685+
event->Record.SetCookie(causeIdx);
686+
}
674687
const ui32 orderNumber = Info->GetOrderNumber(VDiskIDFromVDiskID(event->Record.GetVDiskID()));
675688
Y_ABORT_UNLESS(orderNumber < WaitingVDiskResponseCount.size());
676689
WaitingVDiskCount += !WaitingVDiskResponseCount[orderNumber]++;

ydb/core/blobstorage/dsproxy/dsproxy_request.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ namespace NKikimr {
112112
},
113113
.NodeLayout = TNodeLayoutInfoPtr(NodeLayoutInfo),
114114
.AccelerationParams = GetAccelerationParams(),
115+
.LongRequestThreshold = LongRequestThreshold,
115116
}),
116117
ev->Get()->Deadline
117118
);
@@ -225,6 +226,7 @@ namespace NKikimr {
225226
.Stats = PerDiskStats,
226227
.EnableRequestMod3x3ForMinLatency = enableRequestMod3x3ForMinLatency,
227228
.AccelerationParams = GetAccelerationParams(),
229+
.LongRequestThreshold = LongRequestThreshold,
228230
}),
229231
ev->Get()->Deadline
230232
);
@@ -499,6 +501,7 @@ namespace NKikimr {
499501
.Stats = PerDiskStats,
500502
.EnableRequestMod3x3ForMinLatency = enableRequestMod3x3ForMinLatency,
501503
.AccelerationParams = GetAccelerationParams(),
504+
.LongRequestThreshold = LongRequestThreshold,
502505
}),
503506
ev->Get()->Deadline
504507
);
@@ -521,6 +524,7 @@ namespace NKikimr {
521524
.Tactic = tactic,
522525
.EnableRequestMod3x3ForMinLatency = enableRequestMod3x3ForMinLatency,
523526
.AccelerationParams = GetAccelerationParams(),
527+
.LongRequestThreshold = LongRequestThreshold,
524528
}),
525529
TInstant::Max()
526530
);
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#include "dsproxy_request_reporting.h"
2+
3+
namespace NKikimr {
4+
5+
static std::array<std::atomic<bool>, 7> ReportPermissions;
6+
7+
bool AllowToReport(NKikimrBlobStorage::EPutHandleClass handleClass) {
8+
return ReportPermissions[(ui32)handleClass - 1].exchange(false);
9+
}
10+
11+
bool AllowToReport(NKikimrBlobStorage::EGetHandleClass handleClass) {
12+
return ReportPermissions[(ui32)handleClass - 1 + 3].exchange(false);
13+
}
14+
15+
class TRequestReportingThrottler : public TActorBootstrapped<TRequestReportingThrottler> {
16+
public:
17+
TRequestReportingThrottler(TDuration updatePermissionsDelay)
18+
: UpdatePermissionsDelay(updatePermissionsDelay)
19+
{}
20+
21+
void Bootstrap() {
22+
Become(&TThis::StateFunc);
23+
HandleWakeup();
24+
}
25+
26+
STRICT_STFUNC(StateFunc,
27+
cFunc(TEvents::TEvWakeup::EventType, HandleWakeup);
28+
)
29+
30+
private:
31+
void HandleWakeup() {
32+
for (std::atomic<bool>& permission : ReportPermissions) {
33+
permission.store(true);
34+
}
35+
Schedule(UpdatePermissionsDelay, new TEvents::TEvWakeup);
36+
}
37+
38+
private:
39+
TDuration UpdatePermissionsDelay;
40+
};
41+
42+
IActor* CreateRequestReportingThrottler(TDuration updatePermissionsDelay) {
43+
return new TRequestReportingThrottler(updatePermissionsDelay);
44+
}
45+
46+
} // namespace NKikimr
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#pragma once
2+
3+
#include "defs.h"
4+
#include <array>
5+
6+
namespace NKikimr {
7+
8+
bool AllowToReport(NKikimrBlobStorage::EPutHandleClass handleClass);
9+
bool AllowToReport(NKikimrBlobStorage::EGetHandleClass handleClass);
10+
11+
IActor* CreateRequestReportingThrottler(TDuration updatePermissionsDelay);
12+
13+
} // namespace NKikimr

ydb/core/blobstorage/dsproxy/root_cause.h

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ struct TRootCause {
1616
ui64 TransferCycles;
1717
ui64 VDiskReplyCycles;
1818
bool IsAccelerate;
19+
TInstant Timestamp;
1920

2021
TRootCauseItem(ui64 causeIdx, ui64 startCycles, bool isAccelerate)
2122
: CauseIdx(causeIdx)
2223
, StartCycles(startCycles)
2324
, TransferCycles(startCycles)
2425
, VDiskReplyCycles(startCycles)
25-
, IsAccelerate(isAccelerate) {
26+
, IsAccelerate(isAccelerate)
27+
, Timestamp(TActivationContext::Now()) {
2628
}
2729
};
2830
static constexpr ui64 InvalidCauseIdx = 255;
@@ -59,8 +61,22 @@ struct TRootCause {
5961
#endif //LWTRACE_DISABLE
6062
}
6163

64+
TString ToString() {
65+
TStringStream str("TRootCause {");
66+
str << " CurrentCauseIdx# " << CurrentCauseIdx;
67+
str << " Items# [";
68+
for (const TRootCauseItem& item : Items) {
69+
str << " { Timestamp# " << item.Timestamp
70+
<< " IsAccelerate# " << item.IsAccelerate
71+
<< " CauseIdx# " << item.CauseIdx
72+
<< " }";
73+
}
74+
str << " ] }";
75+
return str.Str();
76+
}
77+
6278
ui64 RegisterCause() {
63-
if (IsOn && Items.size() < InvalidCauseIdx - 1) {
79+
if (Items.size() < InvalidCauseIdx - 1) {
6480
Items.emplace_back(CurrentCauseIdx, GetCycleCountFast(), false);
6581
return Items.size() - 1;
6682
} else {
@@ -69,7 +85,7 @@ struct TRootCause {
6985
}
7086

7187
ui64 RegisterAccelerate() {
72-
if (IsOn && Items.size() < InvalidCauseIdx - 1) {
88+
if (Items.size() < InvalidCauseIdx - 1) {
7389
Items.emplace_back(CurrentCauseIdx, GetCycleCountFast(), true);
7490
return Items.size() - 1;
7591
} else {

ydb/core/blobstorage/dsproxy/ut/dsproxy_env_mock_ut.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,6 @@ struct TDSProxyEnv {
202202
}
203203
};
204204

205-
206205
inline bool ScheduledFilterFunc(TTestActorRuntimeBase& runtime, TAutoPtr<IEventHandle>& event,
207206
TDuration delay, TInstant& deadline) {
208207
if (runtime.IsScheduleForActorEnabled(event->GetRecipientRewrite())) {

0 commit comments

Comments
 (0)