Skip to content

Commit 60b8704

Browse files
authored
Merge 5db5426 into 142b531
2 parents 142b531 + 5db5426 commit 60b8704

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+1027
-310
lines changed

ydb/core/base/blobstorage.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,8 @@ struct TEvBlobStorage {
492492
EvInplacePatch,
493493
EvAssimilate,
494494

495+
EvGetQueuesInfo, // for debugging purposes
496+
495497
//
496498
EvPutResult = EvPut + 512, /// 268 632 576
497499
EvGetResult,
@@ -506,6 +508,8 @@ struct TEvBlobStorage {
506508
EvInplacePatchResult,
507509
EvAssimilateResult,
508510

511+
EvQueuesInfo, // for debugging purposes
512+
509513
// proxy <-> vdisk interface
510514
EvVPut = EvPut + 2 * 512, /// 268 633 088
511515
EvVGet,
@@ -878,6 +882,7 @@ struct TEvBlobStorage {
878882
EvRunActor = EvPut + 15 * 512,
879883
EvVMockCtlRequest,
880884
EvVMockCtlResponse,
885+
EvDelayedMessageWrapper,
881886

882887
// incremental huge blob keeper
883888
EvIncrHugeInit = EvPut + 17 * 512,

ydb/core/blobstorage/backpressure/common.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,39 @@
1515
#define QLOG_DEBUG_S(marker, arg) QLOG_LOG_S(marker, NActors::NLog::PRI_DEBUG , arg)
1616

1717
LWTRACE_USING(BLOBSTORAGE_PROVIDER);
18+
19+
namespace NKikimr::NBsQueue {
20+
21+
// Special timer for debug purposes, which works with virtual time of TTestActorSystem
22+
struct TActivationContextTimer {
23+
TActivationContextTimer()
24+
: CreationTimestamp(NActors::TActivationContext::Monotonic())
25+
{}
26+
27+
double Passed() const {
28+
return (NActors::TActivationContext::Monotonic() - CreationTimestamp).SecondsFloat();
29+
}
30+
31+
TMonotonic CreationTimestamp;
32+
};
33+
34+
struct TBSQueueTimer {
35+
TBSQueueTimer(bool useActorSystemTime)
36+
{
37+
if (useActorSystemTime) {
38+
Timer.emplace<TActivationContextTimer>();
39+
} else {
40+
Timer.emplace<THPTimer>();
41+
}
42+
}
43+
44+
std::variant<THPTimer, TActivationContextTimer> Timer;
45+
46+
double Passed() const {
47+
return std::visit([](const auto& timer) -> double {
48+
return timer.Passed();
49+
}, Timer);
50+
}
51+
};
52+
53+
} // namespace NKikimr::NBsQueue

ydb/core/blobstorage/backpressure/event.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ IEventBase *TEventHolder::MakeErrorReply(NKikimrProto::EReplyStatus status, cons
2727

2828
void TEventHolder::SendToVDisk(const TActorContext& ctx, const TActorId& remoteVDisk, ui64 queueCookie, ui64 msgId,
2929
ui64 sequenceId, bool sendMeCostSettings, NWilson::TTraceId traceId, const NBackpressure::TQueueClientId& clientId,
30-
const THPTimer& processingTimer) {
30+
const TBSQueueTimer& processingTimer) {
3131
// check that we are not discarded yet
3232
Y_ABORT_UNLESS(Type != 0);
3333

ydb/core/blobstorage/backpressure/event.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ class TEventHolder {
142142

143143
void SendToVDisk(const TActorContext& ctx, const TActorId& remoteVDisk, ui64 queueCookie, ui64 msgId, ui64 sequenceId,
144144
bool sendMeCostSettings, NWilson::TTraceId traceId, const NBackpressure::TQueueClientId& clientId,
145-
const THPTimer& processingTimer);
145+
const TBSQueueTimer& processingTimer);
146146

147147
void Discard();
148148
};

ydb/core/blobstorage/backpressure/queue.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ namespace NKikimr::NBsQueue {
44

55
TBlobStorageQueue::TBlobStorageQueue(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, TString& logPrefix,
66
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, ui32 interconnectChannel,
7-
const TBlobStorageGroupType& gType, NMonitoring::TCountableBase::EVisibility visibility)
7+
const TBlobStorageGroupType& gType, NMonitoring::TCountableBase::EVisibility visibility, bool useActorSystemTime)
88
: Queues(bspctx)
99
, WindowSize(0)
1010
, InFlightCost(0)
@@ -16,6 +16,7 @@ TBlobStorageQueue::TBlobStorageQueue(const TIntrusivePtr<::NMonitoring::TDynamic
1616
, ClientId(clientId)
1717
, BytesWaiting(0)
1818
, InterconnectChannel(interconnectChannel)
19+
, UseActorSystemTime(useActorSystemTime)
1920
// use parent group visibility
2021
, QueueWaitingItems(counters->GetCounter("QueueWaitingItems", false, visibility))
2122
, QueueWaitingBytes(counters->GetCounter("QueueWaitingBytes", false, visibility))

ydb/core/blobstorage/backpressure/queue.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,16 @@ class TBlobStorageQueue {
5151
const ui64 QueueCookie;
5252
ui64 Cost;
5353
bool DirtyCost;
54-
THPTimer ProcessingTimer;
54+
TBSQueueTimer ProcessingTimer;
55+
5556
TTrackableList<TItem>::iterator Iterator;
5657

5758
template<typename TEvent>
5859
TItem(TAutoPtr<TEventHandle<TEvent>>& event, TInstant deadline,
5960
const ::NMonitoring::TDynamicCounters::TCounterPtr& serItems,
6061
const ::NMonitoring::TDynamicCounters::TCounterPtr& serBytes,
6162
const TBSProxyContextPtr& bspctx, ui32 interconnectChannel,
62-
bool local)
63+
bool local, bool useActorSystemTime)
6364
: Queue(EItemQueue::NotSet)
6465
, CostEssence(*event->Get())
6566
, Span(TWilson::VDiskTopLevel, std::move(event->TraceId), "Backpressure.InFlight")
@@ -70,6 +71,7 @@ class TBlobStorageQueue {
7071
, QueueCookie(RandomNumber<ui64>())
7172
, Cost(0)
7273
, DirtyCost(true)
74+
, ProcessingTimer(useActorSystemTime)
7375
{
7476
if (Span) {
7577
Span
@@ -129,6 +131,8 @@ class TBlobStorageQueue {
129131

130132
const ui32 InterconnectChannel;
131133

134+
const bool UseActorSystemTime;
135+
132136
public:
133137
::NMonitoring::TDynamicCounters::TCounterPtr QueueWaitingItems;
134138
::NMonitoring::TDynamicCounters::TCounterPtr QueueWaitingBytes;
@@ -156,7 +160,8 @@ class TBlobStorageQueue {
156160
TBlobStorageQueue(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, TString& logPrefix,
157161
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, ui32 interconnectChannel,
158162
const TBlobStorageGroupType &gType,
159-
NMonitoring::TCountableBase::EVisibility visibility = NMonitoring::TCountableBase::EVisibility::Public);
163+
NMonitoring::TCountableBase::EVisibility visibility = NMonitoring::TCountableBase::EVisibility::Public,
164+
bool useActorSystemTime = false);
160165

161166
~TBlobStorageQueue();
162167

@@ -213,7 +218,8 @@ class TBlobStorageQueue {
213218
TItemList::iterator newIt;
214219
if (Queues.Unused.empty()) {
215220
newIt = Queues.Waiting.emplace(Queues.Waiting.end(), event, deadline,
216-
QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, InterconnectChannel, local);
221+
QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, InterconnectChannel, local,
222+
UseActorSystemTime);
217223
++*QueueSize;
218224
} else {
219225
newIt = Queues.Unused.begin();
@@ -222,7 +228,7 @@ class TBlobStorageQueue {
222228
TItem& item = *newIt;
223229
item.~TItem();
224230
new(&item) TItem(event, deadline, QueueSerializedItems, QueueSerializedBytes, BSProxyCtx,
225-
InterconnectChannel, local);
231+
InterconnectChannel, local, UseActorSystemTime);
226232
}
227233

228234
newIt->Iterator = newIt;

ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,13 @@ class TVDiskBackpressureClientActor : public TActorBootstrapped<TVDiskBackpressu
7777
NKikimrBlobStorage::EVDiskQueueId queueId,const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
7878
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, const TString& queueName,
7979
ui32 interconnectChannel, bool /*local*/, TDuration watchdogTimeout,
80-
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility)
80+
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility,
81+
bool useActorSystemTime)
8182
: BSProxyCtx(bspctx)
8283
, QueueName(queueName)
8384
, Counters(counters->GetSubgroup("queue", queueName))
8485
, Queue(Counters, LogPrefix, bspctx, clientId, interconnectChannel,
85-
(info ? info->Type : TErasureType::ErasureNone), visibility)
86+
(info ? info->Type : TErasureType::ErasureNone), visibility, useActorSystemTime)
8687
, VDiskIdShort(vdiskId)
8788
, QueueId(queueId)
8889
, QueueWatchdogTimeout(watchdogTimeout)
@@ -975,9 +976,10 @@ IActor* CreateVDiskBackpressureClient(const TIntrusivePtr<TBlobStorageGroupInfo>
975976
NKikimrBlobStorage::EVDiskQueueId queueId,const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
976977
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, const TString& queueName,
977978
ui32 interconnectChannel, bool local, TDuration watchdogTimeout,
978-
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility) {
979+
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility,
980+
bool useActorSystemTime) {
979981
return new NBsQueue::TVDiskBackpressureClientActor(info, vdiskId, queueId, counters, bspctx, clientId, queueName,
980-
interconnectChannel, local, watchdogTimeout, flowRecord, visibility);
982+
interconnectChannel, local, watchdogTimeout, flowRecord, visibility, useActorSystemTime);
981983
}
982984

983985
} // NKikimr

ydb/core/blobstorage/backpressure/queue_backpressure_client.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ namespace NKikimr {
5050
NKikimrBlobStorage::EVDiskQueueId queueId,const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
5151
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, const TString& queueName,
5252
ui32 interconnectChannel, bool local, TDuration watchdogTimeout,
53-
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility);
53+
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility,
54+
bool useActorSystemTime = false);
5455

5556
} // NKikimr

ydb/core/blobstorage/dsproxy/dsproxy.h

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ const ui32 MaskSizeBits = 32;
5555
constexpr bool DefaultEnablePutBatching = true;
5656
constexpr bool DefaultEnableVPatch = false;
5757

58+
constexpr float DefaultSlowDiskThreshold = 2;
59+
constexpr float DefaultPredictedDelayMultiplier = 1;
60+
5861
constexpr bool WithMovingPatchRequestToStaticNode = true;
5962

6063
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -188,6 +191,11 @@ inline void SetExecutionRelay(IEventBase& ev, std::shared_ptr<TEvBlobStorage::TE
188191
}
189192
}
190193

194+
struct TAccelerationParams {
195+
double SlowDiskThreshold = 2;
196+
double PredictedDelayMultiplier = 1;
197+
};
198+
191199
class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActor> {
192200
public:
193201
template<typename TEv>
@@ -332,7 +340,7 @@ IActor* CreateBlobStorageGroupPutRequest(const TIntrusivePtr<TBlobStorageGroupIn
332340
ui64 cookie, NWilson::TTraceId traceId, bool timeStatsEnabled,
333341
TDiskResponsivenessTracker::TPerDiskStatsPtr stats,
334342
TMaybe<TGroupStat::EKind> latencyQueueKind, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters,
335-
bool enableRequestMod3x3ForMinLatecy);
343+
bool enableRequestMod3x3ForMinLatecy, const TAccelerationParams& accelerationParams);
336344

337345
IActor* CreateBlobStorageGroupPutRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
338346
const TIntrusivePtr<TGroupQueues> &state,
@@ -342,13 +350,14 @@ IActor* CreateBlobStorageGroupPutRequest(const TIntrusivePtr<TBlobStorageGroupIn
342350
TDiskResponsivenessTracker::TPerDiskStatsPtr stats,
343351
TMaybe<TGroupStat::EKind> latencyQueueKind, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters,
344352
NKikimrBlobStorage::EPutHandleClass handleClass, TEvBlobStorage::TEvPut::ETactic tactic,
345-
bool enableRequestMod3x3ForMinLatecy);
353+
bool enableRequestMod3x3ForMinLatecy, const TAccelerationParams& accelerationParams);
346354

347355
IActor* CreateBlobStorageGroupGetRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
348356
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
349357
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvGet *ev,
350358
ui64 cookie, NWilson::TTraceId traceId, TNodeLayoutInfoPtr&& nodeLayout,
351-
TMaybe<TGroupStat::EKind> latencyQueueKind, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters);
359+
TMaybe<TGroupStat::EKind> latencyQueueKind, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters,
360+
const TAccelerationParams& accelerationParams);
352361

353362
IActor* CreateBlobStorageGroupPatchRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
354363
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
@@ -411,11 +420,14 @@ IActor* CreateBlobStorageGroupAssimilateRequest(const TIntrusivePtr<TBlobStorage
411420
IActor* CreateBlobStorageGroupEjectedProxy(ui32 groupId, TIntrusivePtr<TDsProxyNodeMon> &nodeMon);
412421

413422
IActor* CreateBlobStorageGroupProxyConfigured(TIntrusivePtr<TBlobStorageGroupInfo>&& info,
414-
bool forceWaitAllDrives, TIntrusivePtr<TDsProxyNodeMon> &nodeMon,
423+
bool forceWaitAllDrives, bool useActorSystemTimeInBSQueue, TIntrusivePtr<TDsProxyNodeMon> &nodeMon,
415424
TIntrusivePtr<TStoragePoolCounters>&& storagePoolCounters, const TControlWrapper &enablePutBatching,
416-
const TControlWrapper &enableVPatch);
425+
const TControlWrapper &enableVPatch, const TControlWrapper &slowDiskThreshold,
426+
const TControlWrapper &predictedDelayMultiplier);
417427

418-
IActor* CreateBlobStorageGroupProxyUnconfigured(ui32 groupId, TIntrusivePtr<TDsProxyNodeMon> &nodeMon,
419-
const TControlWrapper &enablePutBatching, const TControlWrapper &enableVPatch);
428+
IActor* CreateBlobStorageGroupProxyUnconfigured(ui32 groupId, bool useActorSystemInBSQueue,
429+
TIntrusivePtr<TDsProxyNodeMon> &nodeMon, const TControlWrapper &enablePutBatching,
430+
const TControlWrapper &enableVPatch, const TControlWrapper &slowDiskThreshold,
431+
const TControlWrapper &predictedDelayMultiplier);
420432

421433
}//NKikimr

ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -174,16 +174,22 @@ void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLog
174174
ui64 TBlobState::GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
175175
ui32 diskIdxInSubring, NKikimrBlobStorage::EVDiskQueueId queueId) const {
176176
Y_UNUSED(info);
177-
return groupQueues.GetPredictedDelayNsByOrderNumber(Disks[diskIdxInSubring].OrderNumber, queueId);
177+
auto res = groupQueues.GetPredictedDelayNsByOrderNumber(Disks[diskIdxInSubring].OrderNumber, queueId);
178+
return res;
178179
}
179180

180181
void TBlobState::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
181-
NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const {
182+
NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst,
183+
double multiplier) const {
182184
outNWorst->resize(Disks.size());
183185
for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
184-
(*outNWorst)[diskIdx] = { GetPredictedDelayNs(info, groupQueues, diskIdx, queueId), diskIdx };
186+
(*outNWorst)[diskIdx] = {
187+
static_cast<ui64>(GetPredictedDelayNs(info, groupQueues, diskIdx, queueId) * multiplier),
188+
diskIdx
189+
};
185190
}
186-
std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(nWorst, (ui32)Disks.size()), outNWorst->end());
191+
ui32 sortedPrefixSize = std::min(3u, (ui32)Disks.size());
192+
std::partial_sort(outNWorst->begin(), outNWorst->begin() + sortedPrefixSize, outNWorst->end());
187193
}
188194

189195
bool TBlobState::HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const {
@@ -361,7 +367,8 @@ void TBlackboard::AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber) {
361367
}
362368

363369
EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec<IStrategy*, 1>& s,
364-
TBatchedVec<TFinishedBlob> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
370+
const TAccelerationParams& accelerationParams, TBatchedVec<TFinishedBlob> *finished,
371+
const TBlobStorageGroupInfo::TGroupVDisks *expired) {
365372
for (auto it = BlobStates.begin(); it != BlobStates.end(); ) {
366373
auto& blob = it->second;
367374
if (!std::exchange(blob.IsChanged, false)) {
@@ -373,7 +380,7 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec
373380
NKikimrProto::EReplyStatus status = NKikimrProto::OK;
374381
TString errorReason;
375382
for (IStrategy *strategy : s) {
376-
switch (auto res = strategy->Process(logCtx, blob, *Info, *this, GroupDiskRequests)) {
383+
switch (auto res = strategy->Process(logCtx, blob, *Info, *this, GroupDiskRequests, accelerationParams)) {
377384
case EStrategyOutcome::IN_PROGRESS:
378385
status = NKikimrProto::UNKNOWN;
379386
break;
@@ -415,8 +422,9 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec
415422
}
416423

417424
EStrategyOutcome TBlackboard::RunStrategy(TLogContext &logCtx, const IStrategy& s,
418-
TBatchedVec<TFinishedBlob> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
419-
return RunStrategies(logCtx, {const_cast<IStrategy*>(&s)}, finished, expired);
425+
const TAccelerationParams& accelerationParams, TBatchedVec<TFinishedBlob> *finished,
426+
const TBlobStorageGroupInfo::TGroupVDisks *expired) {
427+
return RunStrategies(logCtx, {const_cast<IStrategy*>(&s)}, accelerationParams, finished, expired);
420428
}
421429

422430
TBlobState& TBlackboard::GetState(const TLogoBlobID &id) {
@@ -458,13 +466,17 @@ void TBlackboard::ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapInde
458466
}
459467

460468
void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
461-
NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const {
469+
NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst,
470+
double multiplier) const {
462471
ui32 totalVDisks = info.GetTotalVDisksNum();
463472
outNWorst->resize(totalVDisks);
464473
for (ui32 orderNumber = 0; orderNumber < totalVDisks; ++orderNumber) {
465-
(*outNWorst)[orderNumber] = { groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId), orderNumber };
474+
(*outNWorst)[orderNumber] = {
475+
static_cast<ui64>(groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId) * multiplier),
476+
orderNumber
477+
};
466478
}
467-
std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(nWorst, totalVDisks), outNWorst->end());
479+
std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(3u, totalVDisks), outNWorst->end());
468480
}
469481

470482
void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx) {

0 commit comments

Comments
 (0)