Skip to content

Commit 3cde06c

Browse files
committed
Add UT for accelerate parameters, fix bugs
1 parent d275a68 commit 3cde06c

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+892
-374
lines changed

ydb/core/base/blobstorage.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -488,6 +488,8 @@ struct TEvBlobStorage {
488488
EvInplacePatch,
489489
EvAssimilate,
490490

491+
EvGetQueuesInfo, // for debugging purposes
492+
491493
//
492494
EvPutResult = EvPut + 512, /// 268 632 576
493495
EvGetResult,
@@ -502,6 +504,8 @@ struct TEvBlobStorage {
502504
EvInplacePatchResult,
503505
EvAssimilateResult,
504506

507+
EvQueuesInfo, // for debugging purposes
508+
505509
// proxy <-> vdisk interface
506510
EvVPut = EvPut + 2 * 512, /// 268 633 088
507511
EvVGet,

ydb/core/blobstorage/backpressure/common.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,39 @@
1515
#define QLOG_DEBUG_S(marker, arg) QLOG_LOG_S(marker, NActors::NLog::PRI_DEBUG , arg)
1616

1717
LWTRACE_USING(BLOBSTORAGE_PROVIDER);
18+
19+
namespace NKikimr::NBsQueue {
20+
21+
// Special timer for debug purposes, which works with virtual time of TTestActorSystem
22+
struct TActivationContextTimer {
23+
TActivationContextTimer()
24+
: CreationTimestamp(NActors::TActivationContext::Monotonic())
25+
{}
26+
27+
double Passed() const {
28+
return (NActors::TActivationContext::Monotonic() - CreationTimestamp).SecondsFloat();
29+
}
30+
31+
TMonotonic CreationTimestamp;
32+
};
33+
34+
struct TBSQueueTimer {
35+
TBSQueueTimer(bool useActorSystemTime)
36+
{
37+
if (useActorSystemTime) {
38+
Timer.emplace<TActivationContextTimer>();
39+
} else {
40+
Timer.emplace<THPTimer>();
41+
}
42+
}
43+
44+
std::variant<THPTimer, TActivationContextTimer> Timer;
45+
46+
double Passed() const {
47+
return std::visit([](const auto& timer) -> double {
48+
return timer.Passed();
49+
}, Timer);
50+
}
51+
};
52+
53+
} // namespace NKikimr::NBsQueue

ydb/core/blobstorage/backpressure/event.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ IEventBase *TEventHolder::MakeErrorReply(NKikimrProto::EReplyStatus status, cons
2727

2828
void TEventHolder::SendToVDisk(const TActorContext& ctx, const TActorId& remoteVDisk, ui64 queueCookie, ui64 msgId,
2929
ui64 sequenceId, bool sendMeCostSettings, NWilson::TTraceId traceId, const NBackpressure::TQueueClientId& clientId,
30-
const THPTimer& processingTimer) {
30+
const TBSQueueTimer& processingTimer) {
3131
// check that we are not discarded yet
3232
Y_ABORT_UNLESS(Type != 0);
3333

ydb/core/blobstorage/backpressure/event.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ class TEventHolder {
142142

143143
void SendToVDisk(const TActorContext& ctx, const TActorId& remoteVDisk, ui64 queueCookie, ui64 msgId, ui64 sequenceId,
144144
bool sendMeCostSettings, NWilson::TTraceId traceId, const NBackpressure::TQueueClientId& clientId,
145-
const THPTimer& processingTimer);
145+
const TBSQueueTimer& processingTimer);
146146

147147
void Discard();
148148
};

ydb/core/blobstorage/backpressure/queue.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ namespace NKikimr::NBsQueue {
44

55
TBlobStorageQueue::TBlobStorageQueue(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, TString& logPrefix,
66
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, ui32 interconnectChannel,
7-
const TBlobStorageGroupType& gType, NMonitoring::TCountableBase::EVisibility visibility)
7+
const TBlobStorageGroupType& gType, NMonitoring::TCountableBase::EVisibility visibility, bool useActorSystemTime)
88
: Queues(bspctx)
99
, WindowSize(0)
1010
, InFlightCost(0)
@@ -16,6 +16,7 @@ TBlobStorageQueue::TBlobStorageQueue(const TIntrusivePtr<::NMonitoring::TDynamic
1616
, ClientId(clientId)
1717
, BytesWaiting(0)
1818
, InterconnectChannel(interconnectChannel)
19+
, UseActorSystemTime(useActorSystemTime)
1920
// use parent group visibility
2021
, QueueWaitingItems(counters->GetCounter("QueueWaitingItems", false, visibility))
2122
, QueueWaitingBytes(counters->GetCounter("QueueWaitingBytes", false, visibility))

ydb/core/blobstorage/backpressure/queue.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,16 @@ class TBlobStorageQueue {
5151
const ui64 QueueCookie;
5252
ui64 Cost;
5353
bool DirtyCost;
54-
THPTimer ProcessingTimer;
54+
TBSQueueTimer ProcessingTimer;
55+
5556
TTrackableList<TItem>::iterator Iterator;
5657

5758
template<typename TEvent>
5859
TItem(TAutoPtr<TEventHandle<TEvent>>& event, TInstant deadline,
5960
const ::NMonitoring::TDynamicCounters::TCounterPtr& serItems,
6061
const ::NMonitoring::TDynamicCounters::TCounterPtr& serBytes,
6162
const TBSProxyContextPtr& bspctx, ui32 interconnectChannel,
62-
bool local)
63+
bool local, bool useActorSystemTime)
6364
: Queue(EItemQueue::NotSet)
6465
, CostEssence(*event->Get())
6566
, Span(TWilson::VDiskTopLevel, std::move(event->TraceId), "Backpressure.InFlight")
@@ -70,6 +71,7 @@ class TBlobStorageQueue {
7071
, QueueCookie(RandomNumber<ui64>())
7172
, Cost(0)
7273
, DirtyCost(true)
74+
, ProcessingTimer(useActorSystemTime)
7375
{
7476
if (Span) {
7577
Span
@@ -129,6 +131,8 @@ class TBlobStorageQueue {
129131

130132
const ui32 InterconnectChannel;
131133

134+
const bool UseActorSystemTime;
135+
132136
public:
133137
::NMonitoring::TDynamicCounters::TCounterPtr QueueWaitingItems;
134138
::NMonitoring::TDynamicCounters::TCounterPtr QueueWaitingBytes;
@@ -156,7 +160,8 @@ class TBlobStorageQueue {
156160
TBlobStorageQueue(const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters, TString& logPrefix,
157161
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, ui32 interconnectChannel,
158162
const TBlobStorageGroupType &gType,
159-
NMonitoring::TCountableBase::EVisibility visibility = NMonitoring::TCountableBase::EVisibility::Public);
163+
NMonitoring::TCountableBase::EVisibility visibility = NMonitoring::TCountableBase::EVisibility::Public,
164+
bool useActorSystemTime = false);
160165

161166
~TBlobStorageQueue();
162167

@@ -213,7 +218,8 @@ class TBlobStorageQueue {
213218
TItemList::iterator newIt;
214219
if (Queues.Unused.empty()) {
215220
newIt = Queues.Waiting.emplace(Queues.Waiting.end(), event, deadline,
216-
QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, InterconnectChannel, local);
221+
QueueSerializedItems, QueueSerializedBytes, BSProxyCtx, InterconnectChannel, local,
222+
UseActorSystemTime);
217223
++*QueueSize;
218224
} else {
219225
newIt = Queues.Unused.begin();
@@ -222,7 +228,7 @@ class TBlobStorageQueue {
222228
TItem& item = *newIt;
223229
item.~TItem();
224230
new(&item) TItem(event, deadline, QueueSerializedItems, QueueSerializedBytes, BSProxyCtx,
225-
InterconnectChannel, local);
231+
InterconnectChannel, local, UseActorSystemTime);
226232
}
227233

228234
newIt->Iterator = newIt;

ydb/core/blobstorage/backpressure/queue_backpressure_client.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,13 @@ class TVDiskBackpressureClientActor : public TActorBootstrapped<TVDiskBackpressu
7777
NKikimrBlobStorage::EVDiskQueueId queueId,const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
7878
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, const TString& queueName,
7979
ui32 interconnectChannel, bool /*local*/, TDuration watchdogTimeout,
80-
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility)
80+
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility,
81+
bool useActorSystemTime)
8182
: BSProxyCtx(bspctx)
8283
, QueueName(queueName)
8384
, Counters(counters->GetSubgroup("queue", queueName))
8485
, Queue(Counters, LogPrefix, bspctx, clientId, interconnectChannel,
85-
(info ? info->Type : TErasureType::ErasureNone), visibility)
86+
(info ? info->Type : TErasureType::ErasureNone), visibility, useActorSystemTime)
8687
, VDiskIdShort(vdiskId)
8788
, QueueId(queueId)
8889
, QueueWatchdogTimeout(watchdogTimeout)
@@ -975,9 +976,10 @@ IActor* CreateVDiskBackpressureClient(const TIntrusivePtr<TBlobStorageGroupInfo>
975976
NKikimrBlobStorage::EVDiskQueueId queueId,const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
976977
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, const TString& queueName,
977978
ui32 interconnectChannel, bool local, TDuration watchdogTimeout,
978-
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility) {
979+
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility,
980+
bool useActorSystemTime) {
979981
return new NBsQueue::TVDiskBackpressureClientActor(info, vdiskId, queueId, counters, bspctx, clientId, queueName,
980-
interconnectChannel, local, watchdogTimeout, flowRecord, visibility);
982+
interconnectChannel, local, watchdogTimeout, flowRecord, visibility, useActorSystemTime);
981983
}
982984

983985
} // NKikimr

ydb/core/blobstorage/backpressure/queue_backpressure_client.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ namespace NKikimr {
5050
NKikimrBlobStorage::EVDiskQueueId queueId,const TIntrusivePtr<::NMonitoring::TDynamicCounters>& counters,
5151
const TBSProxyContextPtr& bspctx, const NBackpressure::TQueueClientId& clientId, const TString& queueName,
5252
ui32 interconnectChannel, bool local, TDuration watchdogTimeout,
53-
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility);
53+
TIntrusivePtr<NBackpressure::TFlowRecord> &flowRecord, NMonitoring::TCountableBase::EVisibility visibility,
54+
bool useActorSystemTime = false);
5455

5556
} // NKikimr

ydb/core/blobstorage/dsproxy/dsproxy.h

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ const ui32 MaskSizeBits = 32;
5454

5555
constexpr bool DefaultEnablePutBatching = true;
5656
constexpr bool DefaultEnableVPatch = false;
57-
constexpr float DefaultSlowDiskThreshold = 2000;
57+
58+
constexpr float DefaultSlowDiskThreshold = 2;
59+
constexpr float DefaultPredictedDelayMultiplier = 1;
5860

5961
constexpr bool WithMovingPatchRequestToStaticNode = true;
6062

@@ -169,6 +171,11 @@ inline void SetExecutionRelay(IEventBase& ev, std::shared_ptr<TEvBlobStorage::TE
169171
}
170172
}
171173

174+
struct TAccelerationParams {
175+
double SlowDiskThreshold = 2;
176+
double PredictedDelayMultiplier = 1;
177+
};
178+
172179
template<typename TDerived>
173180
class TBlobStorageGroupRequestActor : public TActor<TDerived> {
174181
public:
@@ -649,7 +656,7 @@ IActor* CreateBlobStorageGroupPutRequest(const TIntrusivePtr<TBlobStorageGroupIn
649656
ui64 cookie, NWilson::TTraceId traceId, bool timeStatsEnabled,
650657
TDiskResponsivenessTracker::TPerDiskStatsPtr stats,
651658
TMaybe<TGroupStat::EKind> latencyQueueKind, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters,
652-
bool enableRequestMod3x3ForMinLatecy, float slowDiskThreshold);
659+
bool enableRequestMod3x3ForMinLatecy, const TAccelerationParams& accelerationParams);
653660

654661
IActor* CreateBlobStorageGroupPutRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
655662
const TIntrusivePtr<TGroupQueues> &state,
@@ -659,14 +666,14 @@ IActor* CreateBlobStorageGroupPutRequest(const TIntrusivePtr<TBlobStorageGroupIn
659666
TDiskResponsivenessTracker::TPerDiskStatsPtr stats,
660667
TMaybe<TGroupStat::EKind> latencyQueueKind, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters,
661668
NKikimrBlobStorage::EPutHandleClass handleClass, TEvBlobStorage::TEvPut::ETactic tactic,
662-
bool enableRequestMod3x3ForMinLatecy, float slowDiskThreshold);
669+
bool enableRequestMod3x3ForMinLatecy, const TAccelerationParams& accelerationParams);
663670

664671
IActor* CreateBlobStorageGroupGetRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
665672
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
666673
const TIntrusivePtr<TBlobStorageGroupProxyMon> &mon, TEvBlobStorage::TEvGet *ev,
667674
ui64 cookie, NWilson::TTraceId traceId, TNodeLayoutInfoPtr&& nodeLayout,
668675
TMaybe<TGroupStat::EKind> latencyQueueKind, TInstant now, TIntrusivePtr<TStoragePoolCounters> &storagePoolCounters,
669-
float slowDiskThreshold);
676+
const TAccelerationParams& accelerationParams);
670677

671678
IActor* CreateBlobStorageGroupPatchRequest(const TIntrusivePtr<TBlobStorageGroupInfo> &info,
672679
const TIntrusivePtr<TGroupQueues> &state, const TActorId &source,
@@ -729,12 +736,14 @@ IActor* CreateBlobStorageGroupAssimilateRequest(const TIntrusivePtr<TBlobStorage
729736
IActor* CreateBlobStorageGroupEjectedProxy(ui32 groupId, TIntrusivePtr<TDsProxyNodeMon> &nodeMon);
730737

731738
IActor* CreateBlobStorageGroupProxyConfigured(TIntrusivePtr<TBlobStorageGroupInfo>&& info,
732-
bool forceWaitAllDrives, TIntrusivePtr<TDsProxyNodeMon> &nodeMon,
739+
bool forceWaitAllDrives, bool useActorSystemTimeInBSQueue, TIntrusivePtr<TDsProxyNodeMon> &nodeMon,
733740
TIntrusivePtr<TStoragePoolCounters>&& storagePoolCounters, const TControlWrapper &enablePutBatching,
734-
const TControlWrapper &enableVPatch, const TControlWrapper &slowDiskThreshold);
741+
const TControlWrapper &enableVPatch, const TControlWrapper &slowDiskThreshold,
742+
const TControlWrapper &predictedDelayMultiplier);
735743

736-
IActor* CreateBlobStorageGroupProxyUnconfigured(ui32 groupId, TIntrusivePtr<TDsProxyNodeMon> &nodeMon,
737-
const TControlWrapper &enablePutBatching, const TControlWrapper &enableVPatch,
738-
const TControlWrapper &slowDiskThreshold);
744+
IActor* CreateBlobStorageGroupProxyUnconfigured(ui32 groupId, bool useActorSystemInBSQueue,
745+
TIntrusivePtr<TDsProxyNodeMon> &nodeMon, const TControlWrapper &enablePutBatching,
746+
const TControlWrapper &enableVPatch, const TControlWrapper &slowDiskThreshold,
747+
const TControlWrapper &predictedDelayMultiplier);
739748

740749
}//NKikimr

ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -174,16 +174,22 @@ void TBlobState::AddNotYetResponse(const TBlobStorageGroupInfo &info, const TLog
174174
ui64 TBlobState::GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
175175
ui32 diskIdxInSubring, NKikimrBlobStorage::EVDiskQueueId queueId) const {
176176
Y_UNUSED(info);
177-
return groupQueues.GetPredictedDelayNsByOrderNumber(Disks[diskIdxInSubring].OrderNumber, queueId);
177+
auto res = groupQueues.GetPredictedDelayNsByOrderNumber(Disks[diskIdxInSubring].OrderNumber, queueId);
178+
return res;
178179
}
179180

180181
void TBlobState::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
181-
NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const {
182+
NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst,
183+
double multiplier) const {
182184
outNWorst->resize(Disks.size());
183185
for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
184-
(*outNWorst)[diskIdx] = { GetPredictedDelayNs(info, groupQueues, diskIdx, queueId), diskIdx };
186+
(*outNWorst)[diskIdx] = {
187+
static_cast<ui64>(GetPredictedDelayNs(info, groupQueues, diskIdx, queueId) * multiplier),
188+
diskIdx
189+
};
185190
}
186-
std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(nWorst, (ui32)Disks.size()), outNWorst->end());
191+
ui32 sortedPrefixSize = std::min(3u, (ui32)Disks.size());
192+
std::partial_sort(outNWorst->begin(), outNWorst->begin() + sortedPrefixSize, outNWorst->end());
187193
}
188194

189195
bool TBlobState::HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const {
@@ -361,7 +367,7 @@ void TBlackboard::AddErrorResponse(const TLogoBlobID &id, ui32 orderNumber) {
361367
}
362368

363369
EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec<IStrategy*, 1>& s,
364-
float slowDiskThreshold, TBatchedVec<TFinishedBlob> *finished,
370+
const TAccelerationParams& accelerationParams, TBatchedVec<TFinishedBlob> *finished,
365371
const TBlobStorageGroupInfo::TGroupVDisks *expired) {
366372
for (auto it = BlobStates.begin(); it != BlobStates.end(); ) {
367373
auto& blob = it->second;
@@ -374,7 +380,7 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec
374380
NKikimrProto::EReplyStatus status = NKikimrProto::OK;
375381
TString errorReason;
376382
for (IStrategy *strategy : s) {
377-
switch (auto res = strategy->Process(logCtx, blob, *Info, *this, GroupDiskRequests, slowDiskThreshold)) {
383+
switch (auto res = strategy->Process(logCtx, blob, *Info, *this, GroupDiskRequests, accelerationParams)) {
378384
case EStrategyOutcome::IN_PROGRESS:
379385
status = NKikimrProto::UNKNOWN;
380386
break;
@@ -415,9 +421,10 @@ EStrategyOutcome TBlackboard::RunStrategies(TLogContext &logCtx, const TStackVec
415421
return BlobStates.empty() ? EStrategyOutcome::DONE : EStrategyOutcome::IN_PROGRESS;
416422
}
417423

418-
EStrategyOutcome TBlackboard::RunStrategy(TLogContext &logCtx, const IStrategy& s, float slowDiskThreshold,
419-
TBatchedVec<TFinishedBlob> *finished, const TBlobStorageGroupInfo::TGroupVDisks *expired) {
420-
return RunStrategies(logCtx, {const_cast<IStrategy*>(&s)}, slowDiskThreshold, finished, expired);
424+
EStrategyOutcome TBlackboard::RunStrategy(TLogContext &logCtx, const IStrategy& s,
425+
const TAccelerationParams& accelerationParams, TBatchedVec<TFinishedBlob> *finished,
426+
const TBlobStorageGroupInfo::TGroupVDisks *expired) {
427+
return RunStrategies(logCtx, {const_cast<IStrategy*>(&s)}, accelerationParams, finished, expired);
421428
}
422429

423430
TBlobState& TBlackboard::GetState(const TLogoBlobID &id) {
@@ -459,13 +466,17 @@ void TBlackboard::ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapInde
459466
}
460467

461468
void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
462-
NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const {
469+
NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst,
470+
double multiplier) const {
463471
ui32 totalVDisks = info.GetTotalVDisksNum();
464472
outNWorst->resize(totalVDisks);
465473
for (ui32 orderNumber = 0; orderNumber < totalVDisks; ++orderNumber) {
466-
(*outNWorst)[orderNumber] = { groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId), orderNumber };
474+
(*outNWorst)[orderNumber] = {
475+
static_cast<ui64>(groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId) * multiplier),
476+
orderNumber
477+
};
467478
}
468-
std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(nWorst, totalVDisks), outNWorst->end());
479+
std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(3u, totalVDisks), outNWorst->end());
469480
}
470481

471482
void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx) {

0 commit comments

Comments
 (0)