Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions ydb/core/blobstorage/dsproxy/dsproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ constexpr bool DefaultEnableVPatch = false;

constexpr float DefaultSlowDiskThreshold = 2;
constexpr float DefaultPredictedDelayMultiplier = 1;
constexpr TDuration DefaultLongRequestThreshold = TDuration::Seconds(50);

constexpr bool WithMovingPatchRequestToStaticNode = true;

Expand Down Expand Up @@ -205,7 +206,7 @@ class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActo
TIntrusivePtr<TBlobStorageGroupProxyMon> Mon;
TActorId Source = TActorId{};
ui64 Cookie = 0;
TInstant Now;
TMonotonic Now;
TIntrusivePtr<TStoragePoolCounters>& StoragePoolCounters;
ui32 RestartCounter;
NWilson::TTraceId TraceId = {};
Expand Down Expand Up @@ -234,10 +235,10 @@ class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActo
, ParentSpan(TWilson::BlobStorage, std::move(params.Common.TraceId), params.TypeSpecific.Name)
, RestartCounter(params.Common.RestartCounter)
, CostModel(GroupQueues->CostModel)
, RequestStartTime(params.Common.Now)
, Source(params.Common.Source)
, Cookie(params.Common.Cookie)
, LatencyQueueKind(params.Common.LatencyQueueKind)
, RequestStartTime(params.Common.Now)
, RacingDomains(&Info->GetTopology())
, ExecutionRelay(std::move(params.Common.ExecutionRelay))
{
Expand Down Expand Up @@ -323,6 +324,7 @@ class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActo
bool Dead = false;
const ui32 RestartCounter = 0;
std::shared_ptr<const TCostModel> CostModel;
const TMonotonic RequestStartTime;

private:
const TActorId Source;
Expand All @@ -331,7 +333,6 @@ class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActo
ui32 RequestsInFlight = 0;
std::unique_ptr<IEventBase> Response;
const TMaybe<TGroupStat::EKind> LatencyQueueKind;
const TInstant RequestStartTime;
THPTimer Timer;
std::deque<std::unique_ptr<IEventHandle>> PostponedQ;
TBlobStorageGroupInfo::TGroupFailDomains RacingDomains; // a set of domains we've received RACE from
Expand All @@ -354,8 +355,7 @@ struct TBlobStorageGroupRangeParameters {
TBlobStorageGroupRequestActor::TTypeSpecificParameters TypeSpecific = {
.LogComponent = NKikimrServices::BS_PROXY_RANGE,
.Name = "DSProxy.Range",
.Activity = NKikimrServices::TActivity::BS_GROUP_RANGE
,
.Activity = NKikimrServices::TActivity::BS_GROUP_RANGE,
};
};
IActor* CreateBlobStorageGroupRangeRequest(TBlobStorageGroupRangeParameters params);
Expand All @@ -371,6 +371,7 @@ struct TBlobStorageGroupPutParameters {
TDiskResponsivenessTracker::TPerDiskStatsPtr Stats;
bool EnableRequestMod3x3ForMinLatency;
TAccelerationParams AccelerationParams;
TDuration LongRequestThreshold;
};
IActor* CreateBlobStorageGroupPutRequest(TBlobStorageGroupPutParameters params);

Expand All @@ -389,6 +390,7 @@ struct TBlobStorageGroupMultiPutParameters {
TEvBlobStorage::TEvPut::ETactic Tactic;
bool EnableRequestMod3x3ForMinLatency;
TAccelerationParams AccelerationParams;
TDuration LongRequestThreshold;

static ui32 CalculateRestartCounter(TBatchedVec<TEvBlobStorage::TEvPut::TPtr>& events) {
ui32 maxRestarts = 0;
Expand All @@ -409,6 +411,7 @@ struct TBlobStorageGroupGetParameters {
};
TNodeLayoutInfoPtr NodeLayout;
TAccelerationParams AccelerationParams;
TDuration LongRequestThreshold;
};
IActor* CreateBlobStorageGroupGetRequest(TBlobStorageGroupGetParameters params);

Expand Down Expand Up @@ -516,6 +519,7 @@ struct TBlobStorageProxyParameters {
const TControlWrapper& EnableVPatch;
const TControlWrapper& SlowDiskThreshold;
const TControlWrapper& PredictedDelayMultiplier;
const TControlWrapper& LongRequestThresholdMs = TControlWrapper(DefaultLongRequestThreshold.MilliSeconds(), 1, 1'000'000);
};

IActor* CreateBlobStorageGroupProxyConfigured(TIntrusivePtr<TBlobStorageGroupInfo>&& info,
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "defs.h"

#include "dsproxy.h"
#include "request_history.h"

#include <ydb/core/blobstorage/base/batched_vec.h>
#include <ydb/core/blobstorage/groupinfo/blobstorage_groupinfo.h>
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/blobstorage/dsproxy/dsproxy_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ class TBlobStorageGroupBlockRequest : public TBlobStorageGroupRequestActor {
const ui32 Generation;
const TInstant Deadline;
const ui64 IssuerGuid;
TInstant StartTime;
bool SeenAlready = false;

TGroupQuorumTracker QuorumTracker;
Expand Down Expand Up @@ -99,7 +98,7 @@ class TBlobStorageGroupBlockRequest : public TBlobStorageGroupRequestActor {
std::unique_ptr<TEvBlobStorage::TEvBlockResult> result(new TEvBlobStorage::TEvBlockResult(status));
result->ErrorReason = ErrorReason;
DSP_LOG_LOG_S(PriorityForStatusResult(status), "DSPB04", "Result# " << result->Print(false));
Mon->CountBlockResponseTime(TActivationContext::Now() - StartTime);
Mon->CountBlockResponseTime(TActivationContext::Monotonic() - RequestStartTime);
return SendResponseAndDie(std::move(result));
}

Expand Down Expand Up @@ -138,7 +137,6 @@ class TBlobStorageGroupBlockRequest : public TBlobStorageGroupRequestActor {
, Generation(params.Common.Event->Generation)
, Deadline(params.Common.Event->Deadline)
, IssuerGuid(params.Common.Event->IssuerGuid)
, StartTime(params.Common.Now)
, QuorumTracker(Info.Get())
{}

Expand Down
2 changes: 0 additions & 2 deletions ydb/core/blobstorage/dsproxy/dsproxy_collect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc
const bool Decommission;

TGroupQuorumTracker QuorumTracker;
TInstant StartTime;

ui32 RequestsSent = 0;
ui32 ResponsesReceived = 0;
Expand Down Expand Up @@ -155,7 +154,6 @@ class TBlobStorageGroupCollectGarbageRequest : public TBlobStorageGroupRequestAc
, Collect(params.Common.Event->Collect)
, Decommission(params.Common.Event->Decommission)
, QuorumTracker(Info.Get())
, StartTime(params.Common.Now)
{}

void Bootstrap() override {
Expand Down
4 changes: 1 addition & 3 deletions ydb/core/blobstorage/dsproxy/dsproxy_discover.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor {
const bool ReadBody;
const bool DiscoverBlockedGeneration;
const TInstant Deadline;
const TInstant StartTime;

TGroupResponseTracker GroupResponseTracker;
std::unique_ptr<TEvBlobStorage::TEvDiscoverResult> PendingResult;
Expand All @@ -295,7 +294,7 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor {
template<typename TPtr>
void SendResult(TPtr& result) {
Y_ABORT_UNLESS(result);
const TDuration duration = TActivationContext::Now() - StartTime;
const TDuration duration = TActivationContext::Monotonic() - RequestStartTime;
Mon->CountDiscoverResponseTime(duration);
const bool success = result->Status == NKikimrProto::OK;
LWPROBE(DSProxyRequestDuration, TEvBlobStorage::EvDiscover, 0, duration.SecondsFloat() * 1000.0,
Expand Down Expand Up @@ -876,7 +875,6 @@ class TBlobStorageGroupDiscoverRequest : public TBlobStorageGroupRequestActor {
, ReadBody(params.Common.Event->ReadBody)
, DiscoverBlockedGeneration(params.Common.Event->DiscoverBlockedGeneration)
, Deadline(params.Common.Event->Deadline)
, StartTime(params.Common.Now)
, GroupResponseTracker(Info)
, IsGetBlockDone(!DiscoverBlockedGeneration)
, ForceBlockedGeneration(params.Common.Event->ForceBlockedGeneration)
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/blobstorage/dsproxy/dsproxy_discover_m3dc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ class TDiscoverWorker {
class TBlobStorageGroupMirror3dcDiscoverRequest : public TBlobStorageGroupRequestActor {
const ui64 TabletId;
const ui32 MinGeneration;
const TInstant StartTime;
const TInstant Deadline;
const bool ReadBody;
const bool DiscoverBlockedGeneration;
Expand Down Expand Up @@ -461,7 +460,6 @@ class TBlobStorageGroupMirror3dcDiscoverRequest : public TBlobStorageGroupReques
: TBlobStorageGroupRequestActor(params)
, TabletId(params.Common.Event->TabletId)
, MinGeneration(params.Common.Event->MinGeneration)
, StartTime(params.Common.Now)
, Deadline(params.Common.Event->Deadline)
, ReadBody(params.Common.Event->ReadBody)
, DiscoverBlockedGeneration(params.Common.Event->DiscoverBlockedGeneration)
Expand Down Expand Up @@ -651,7 +649,7 @@ class TBlobStorageGroupMirror3dcDiscoverRequest : public TBlobStorageGroupReques
DSP_LOG_DEBUG_S("DSPDM03", "Response# " << response->ToString());

Y_ABORT_UNLESS(!Responded);
const TDuration duration = TActivationContext::Now() - StartTime;
const TDuration duration = TActivationContext::Monotonic() - RequestStartTime;
LWPROBE(DSProxyRequestDuration, TEvBlobStorage::EvDiscover, 0, duration.SecondsFloat() * 1000.0,
TabletId, Info->GroupID.GetRawId(), TLogoBlobID::MaxChannel, "", true);
SendResponseAndDie(std::move(response));
Expand All @@ -664,7 +662,7 @@ class TBlobStorageGroupMirror3dcDiscoverRequest : public TBlobStorageGroupReques

Y_ABORT_UNLESS(!Responded);
Y_ABORT_UNLESS(status != NKikimrProto::OK);
const TDuration duration = TActivationContext::Now() - StartTime;
const TDuration duration = TActivationContext::Monotonic() - RequestStartTime;
LWPROBE(DSProxyRequestDuration, TEvBlobStorage::EvDiscover, 0, duration.SecondsFloat() * 1000.0,
TabletId, Info->GroupID.GetRawId(), TLogoBlobID::MaxChannel, "", false);
std::unique_ptr<TEvBlobStorage::TEvDiscoverResult> response(new TEvBlobStorage::TEvDiscoverResult(
Expand Down
2 changes: 0 additions & 2 deletions ydb/core/blobstorage/dsproxy/dsproxy_discover_m3of4.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ namespace NKikimr {
class TBlobStorageGroupMirror3of4DiscoverRequest : public TBlobStorageGroupRequestActor {
const ui64 TabletId;
const ui32 MinGeneration;
const TInstant StartTime;
const TInstant Deadline;
const bool ReadBody;
const bool DiscoverBlockedGeneration;
Expand All @@ -31,7 +30,6 @@ class TBlobStorageGroupMirror3of4DiscoverRequest : public TBlobStorageGroupReque
: TBlobStorageGroupRequestActor(params)
, TabletId(params.Common.Event->TabletId)
, MinGeneration(params.Common.Event->MinGeneration)
, StartTime(params.Common.Now)
, Deadline(params.Common.Event->Deadline)
, ReadBody(params.Common.Event->ReadBody)
, DiscoverBlockedGeneration(params.Common.Event->DiscoverBlockedGeneration)
Expand Down
45 changes: 32 additions & 13 deletions ydb/core/blobstorage/dsproxy/dsproxy_get.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#include "dsproxy.h"
#include "dsproxy_mon.h"
#include "root_cause.h"
#include <ydb/core/blobstorage/dsproxy/dsproxy_request_reporting.h>
#include <ydb/core/blobstorage/vdisk/common/vdisk_events.h>
#include <ydb/core/blobstorage/lwtrace_probes/blobstorage_probes.h>
#include <ydb/core/util/stlog.h>
#include <library/cpp/containers/stack_vector/stack_vec.h>
#include <library/cpp/digest/crc32c/crc32c.h>
#include <util/generic/set.h>
Expand Down Expand Up @@ -36,8 +38,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
TRootCause RootCauseTrack;
NLWTrace::TOrbit Orbit;
const TInstant Deadline;
TInstant StartTime;
TInstant StartTimePut;
TMonotonic StartTimePut;
ui32 RequestsSent = 0;
ui32 ResponsesReceived = 0;
ui32 GroupSize;
Expand All @@ -58,6 +59,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
bool IsPutAccelerateScheduled = false;

TAccelerationParams AccelerationParams;
TDuration LongRequestThreshold;

void Handle(TEvAccelerateGet::TPtr &ev) {
IsGetAccelerateScheduled = false;
Expand All @@ -77,6 +79,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
}
GetsAccelerated++;

GetImpl.History.AddAcceleration(false);
TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets;
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
GetImpl.AccelerateGet(LogCtx, GetUnresponsiveDisksMask(), vGets, vPuts);
Expand All @@ -92,6 +95,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
}
PutsAccelerated++;

GetImpl.History.AddAcceleration(true);
TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets;
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
GetImpl.AcceleratePut(LogCtx, GetUnresponsiveDisksMask(), vGets, vPuts);
Expand All @@ -111,7 +115,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
if (vPuts.size()) {
if (!IsPutStarted) {
IsPutStarted = true;
StartTimePut = TActivationContext::Now();
StartTimePut = TActivationContext::Monotonic();
}
}
for (size_t i = 0; i < vGets.size(); ++i) {
Expand Down Expand Up @@ -148,6 +152,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
const ui64 cookie = ev->Record.GetCookie();
SendToQueue(std::move(ev), cookie);
}
GetImpl.History.AddAllWaiting();
}

ui32 CountDisksWithActiveRequests() {
Expand Down Expand Up @@ -326,8 +331,8 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
if (CountDisksWithActiveRequests() <= 2) {
ui64 timeToAccelerateUs = GetImpl.GetTimeToAccelerateGetNs(LogCtx) / 1000;
TDuration timeToAccelerate = TDuration::MicroSeconds(timeToAccelerateUs);
TInstant now = TActivationContext::Now();
TInstant nextAcceleration = StartTime + timeToAccelerate;
TMonotonic now = TActivationContext::Monotonic();
TMonotonic nextAcceleration = RequestStartTime + timeToAccelerate;
if (nextAcceleration > now) {
ui64 causeIdx = RootCauseTrack.RegisterAccelerate();
Schedule(nextAcceleration - now, new TEvAccelerateGet(causeIdx));
Expand All @@ -345,8 +350,8 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
if (CountDisksWithActiveRequests() <= 2) {
ui64 timeToAccelerateUs = GetImpl.GetTimeToAcceleratePutNs(LogCtx) / 1000;
TDuration timeToAccelerate = TDuration::MicroSeconds(timeToAccelerateUs);
TInstant now = TActivationContext::Now();
TInstant nextAcceleration = StartTime + timeToAccelerate;
TMonotonic now = TActivationContext::Monotonic();
TMonotonic nextAcceleration = RequestStartTime + timeToAccelerate;
if (nextAcceleration > now) {
ui64 causeIdx = RootCauseTrack.RegisterAccelerate();
Schedule(nextAcceleration - now, new TEvAcceleratePut(causeIdx));
Expand All @@ -359,9 +364,10 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
}

void SendReplyAndDie(TAutoPtr<TEvBlobStorage::TEvGetResult> &evResult) {
const TInstant now = TActivationContext::Now();
const TDuration duration = (now > StartTime) ? (now - StartTime) : TDuration::MilliSeconds(0);
Mon->CountGetResponseTime(Info->GetDeviceType(), GetImpl.GetHandleClass(), evResult->PayloadSizeBytes(), duration);
const TMonotonic now = TActivationContext::Monotonic();
const TDuration duration = now - RequestStartTime;
NKikimrBlobStorage::EGetHandleClass handleClass = GetImpl.GetHandleClass();
Mon->CountGetResponseTime(Info->GetDeviceType(), handleClass, evResult->PayloadSizeBytes(), duration);
*Mon->ActiveGetCapacity -= ReportedBytes;
ReportedBytes = 0;
bool success = evResult->Status == NKikimrProto::OK;
Expand All @@ -377,9 +383,22 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
LWTRACK(DSProxyGetReply, Orbit);
evResult->Orbit = std::move(Orbit);
LWPROBE(DSProxyRequestDuration, TEvBlobStorage::EvGet, requestSize, duration.SecondsFloat() * 1000.0, tabletId,
evResult->GroupId, channel, NKikimrBlobStorage::EGetHandleClass_Name(GetImpl.GetHandleClass()),
evResult->GroupId, channel, NKikimrBlobStorage::EGetHandleClass_Name(handleClass),
success);
DSP_LOG_LOG_S(success ? NLog::PRI_INFO : NLog::PRI_NOTICE, "BPG68", "Result# " << evResult->Print(false));

if (TActivationContext::Monotonic() - RequestStartTime >= LongRequestThreshold) {
if (AllowToReport(GetImpl.GetHandleClass())) {
STLOG(PRI_WARN, BS_PROXY_GET, BPG71, "Long TEvGet request detected", \
(LongRequestThreshold, LongRequestThreshold), \
(GroupId, Info->GroupID), \
(SubrequestsCount, evResult->ResponseSz), \
(RequestTotalSize, requestSize), \
(HandleClass, NKikimrBlobStorage::EGetHandleClass_Name(handleClass)), \
(RestartCounter, RestartCounter), \
(History, GetImpl.PrintHistory()));
}
}
return SendResponseAndDie(std::unique_ptr<TEvBlobStorage::TEvGetResult>(evResult.Release()));
}

Expand All @@ -404,11 +423,11 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
params.AccelerationParams, LogCtx.RequestPrefix)
, Orbit(std::move(params.Common.Event->Orbit))
, Deadline(params.Common.Event->Deadline)
, StartTime(params.Common.Now)
, StartTimePut(StartTime)
, StartTimePut(RequestStartTime)
, GroupSize(Info->Type.BlobSubgroupSize())
, ReportedBytes(0)
, AccelerationParams(params.AccelerationParams)
, LongRequestThreshold(params.LongRequestThreshold)
{
ReportBytes(sizeof(*this));
MaxSaneRequests = params.Common.Event->QuerySize * Info->Type.TotalPartCount() *
Expand Down
13 changes: 10 additions & 3 deletions ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,9 +294,14 @@ void TGetImpl::PrepareRequests(TLogContext &logCtx, TDeque<std::unique_ptr<TEvBl

for (auto& vget : gets) {
if (vget) {
DSP_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# "
<< Info->GetTopology().GetOrderNumber(VDiskIDFromVDiskID(vget->Record.GetVDiskID()))
<< " vget# " << vget->ToString());
ui32 orderNumber = Info->GetTopology().GetOrderNumber(VDiskIDFromVDiskID(vget->Record.GetVDiskID()));
DSP_LOG_DEBUG_SX(logCtx, "BPG14", "Send get to orderNumber# " << orderNumber << " vget# " << vget->ToString());
if (vget->Record.ExtremeQueriesSize() > 0) {
TLogoBlobID blobId = LogoBlobIDFromLogoBlobID(vget->Record.GetExtremeQueries(0).GetId());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why only the first?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's enough for now to track the request's execution path

History.AddVGetToWaitingList(blobId.PartId(), vget->Record.ExtremeQueriesSize(), orderNumber);
} else {
History.AddVGetToWaitingList(THistory::InvalidPartId, 0, orderNumber);
}
outVGets.push_back(std::move(vget));
++RequestIndex;
}
Expand All @@ -313,6 +318,7 @@ void TGetImpl::PrepareVPuts(TLogContext &logCtx, TDeque<std::unique_ptr<TEvBlobS
auto vput = std::make_unique<TEvBlobStorage::TEvVPut>(put.Id, put.Buffer, vdiskId, true, nullptr, Deadline,
Blackboard.PutHandleClass);
DSP_LOG_DEBUG_SX(logCtx, "BPG15", "Send put to orderNumber# " << put.OrderNumber << " vput# " << vput->ToString());
History.AddVPutToWaitingList(put.Id.PartId(), 1, put.OrderNumber);
outVPuts.push_back(std::move(vput));
++VPutRequests;
}
Expand Down Expand Up @@ -389,6 +395,7 @@ void TGetImpl::OnVPutResult(TLogContext &logCtx, TEvBlobStorage::TEvVPutResult &
Y_ABORT("Unexpected status# %s", NKikimrProto::EReplyStatus_Name(status).data());
}
Step(logCtx, outVGets, outVPuts, outGetResult);
History.AddVPutResult(orderNumber, status, record.GetErrorReason());
}

}//NKikimr
Loading