@@ -14,13 +14,15 @@ namespace NKikimr {
1414 class TEmergencyQueue {
1515 struct TItem {
1616 std::unique_ptr<IEventHandle> Ev;
17+ ui64 Size = 0 ;
1718 NWilson::TSpan Span;
1819
1920 TItem () = default ;
2021
2122 template <typename T>
22- TItem (TAutoPtr<TEventHandle<T>> ev)
23+ TItem (TAutoPtr<TEventHandle<T>> ev, ui64 size )
2324 : Ev(ev.Release())
25+ , Size(size)
2426 , Span(TWilson::VDiskInternals, std::move(Ev->TraceId), " VDisk.Skeleton.EmergencyQueue" )
2527 {
2628 Ev->TraceId = Span.GetTraceId ();
@@ -65,96 +67,209 @@ namespace NKikimr {
6567
6668 void Push (TEvBlobStorage::TEvVMovedPatch::TPtr ev) {
6769 ++Mon.EmergencyMovedPatchQueueItems ();
68- Mon.EmergencyMovedPatchQueueBytes () += ev->Get ()->Record .ByteSize ();
69- Queue.Push (TItem (ev));
70+ auto size = ev->GetSize ();
71+ Mon.EmergencyMovedPatchQueueBytes () += size;
72+ Queue.Push (TItem (ev, size));
7073 }
7174
7275 void Push (TEvBlobStorage::TEvVPatchStart::TPtr ev) {
7376 ++Mon.EmergencyPatchStartQueueItems ();
74- Mon.EmergencyPatchStartQueueBytes () += ev->Get ()->Record .ByteSize ();
75- Queue.Push (TItem (ev));
77+ auto size = ev->GetSize ();
78+ Mon.EmergencyPatchStartQueueBytes () += size;
79+ Queue.Push (TItem (ev, size));
7680 }
7781
7882 void Push (TEvBlobStorage::TEvVPut::TPtr ev) {
7983 ++Mon.EmergencyPutQueueItems ();
80- Mon.EmergencyPutQueueBytes () += ev->Get ()->Record .ByteSize ();
81- Queue.Push (TItem (ev));
84+ auto size = ev->GetSize ();
85+ Mon.EmergencyPutQueueBytes () += size;
86+ Queue.Push (TItem (ev, size));
8287 }
8388
8489 void Push (TEvBlobStorage::TEvVMultiPut::TPtr ev) {
8590 ++Mon.EmergencyMultiPutQueueItems ();
86- Mon.EmergencyMultiPutQueueBytes () += ev->Get ()->Record .ByteSize ();
87- Queue.Push (TItem (ev));
91+ auto size = ev->GetSize ();
92+ Mon.EmergencyMultiPutQueueBytes () += size;
93+ Queue.Push (TItem (ev, size));
8894 }
8995
9096 void Push (TEvLocalSyncData::TPtr ev) {
9197 ++Mon.EmergencyLocalSyncDataQueueItems ();
92- Mon.EmergencyLocalSyncDataQueueBytes () += ev->Get ()->ByteSize ();
93- Queue.Push (TItem (ev));
98+ auto size = ev->Get ()->ByteSize ();
99+ Mon.EmergencyLocalSyncDataQueueBytes () += size;
100+ Queue.Push (TItem (ev, size));
94101 }
95102
96103 void Push (TEvAnubisOsirisPut::TPtr ev) {
97104 ++Mon.EmergencyAnubisOsirisPutQueueItems ();
98- Mon.EmergencyAnubisOsirisPutQueueBytes () += ev->Get ()->ByteSize ();
99- Queue.Push (TItem (ev));
105+ auto size = ev->Get ()->ByteSize ();
106+ Mon.EmergencyAnubisOsirisPutQueueBytes () += size;
107+ Queue.Push (TItem (ev, size));
100108 }
101109
102110 bool Empty () {
103111 return !Queue.Head ();
104112 }
105113
106- void Process (const TActorContext &ctx) {
114+ ui64 GetHeadEventSize () {
115+ Y_ABORT_UNLESS (Queue.Head ());
116+ return Queue.Head ()->Size ;
117+ }
118+
119+ ui64 Process (const TActorContext &ctx) {
107120 auto item = Queue.Head ();
108121 Y_ABORT_UNLESS (item);
122+ auto size = item->Size ;
109123 TAutoPtr<IEventHandle> ev = item->Ev .release ();
110124 item->Span .EndOk ();
111125 Queue.Pop ();
112126 switch (ev->GetTypeRewrite ()) {
113127 case TEvBlobStorage::EvVMovedPatch: {
114128 auto *evMovedPatch = reinterpret_cast <TEvBlobStorage::TEvVMovedPatch::TPtr*>(&ev);
115129 --Mon.EmergencyMovedPatchQueueItems ();
116- Mon.EmergencyMovedPatchQueueBytes () -= (*evMovedPatch)-> Get ()-> Record . ByteSize () ;
130+ Mon.EmergencyMovedPatchQueueBytes () -= size ;
117131 VMovedPatchHandler (ctx, *evMovedPatch);
118132 break ;
119133 }
120134 case TEvBlobStorage::EvVPatchStart: {
121135 auto *evPatchStart = reinterpret_cast <TEvBlobStorage::TEvVPatchStart::TPtr*>(&ev);
122136 --Mon.EmergencyPatchStartQueueItems ();
123- Mon.EmergencyPatchStartQueueBytes () -= (*evPatchStart)-> Get ()-> Record . ByteSize () ;
137+ Mon.EmergencyPatchStartQueueBytes () -= size ;
124138 VPatchStartHandler (ctx, *evPatchStart);
125139 break ;
126140 }
127141 case TEvBlobStorage::EvVPut: {
128142 auto *evPut = reinterpret_cast <TEvBlobStorage::TEvVPut::TPtr*>(&ev);
129143 --Mon.EmergencyPutQueueItems ();
130- Mon.EmergencyPutQueueBytes () -= (*evPut)-> Get ()-> Record . ByteSize () ;
144+ Mon.EmergencyPutQueueBytes () -= size ;
131145 VPutHandler (ctx, *evPut);
132146 break ;
133147 }
134148 case TEvBlobStorage::EvVMultiPut: {
135149 auto *evMultiPut = reinterpret_cast <TEvBlobStorage::TEvVMultiPut::TPtr*>(&ev);
136150 --Mon.EmergencyMultiPutQueueItems ();
137- Mon.EmergencyMultiPutQueueBytes () -= (*evMultiPut)-> Get ()-> Record . ByteSize () ;
151+ Mon.EmergencyMultiPutQueueBytes () -= size ;
138152 VMultiPutHandler (ctx, *evMultiPut);
139153 break ;
140154 }
141155 case TEvBlobStorage::EvLocalSyncData: {
142156 auto *evLocalSyncData = reinterpret_cast <TEvLocalSyncData::TPtr*>(&ev);
143157 --Mon.EmergencyLocalSyncDataQueueItems ();
144- Mon.EmergencyLocalSyncDataQueueBytes () -= (*evLocalSyncData)-> Get ()-> ByteSize () ;
158+ Mon.EmergencyLocalSyncDataQueueBytes () -= size ;
145159 LocalSyncDataHandler (ctx,*evLocalSyncData);
146160 break ;
147161 }
148162 case TEvBlobStorage::EvAnubisOsirisPut: {
149163 auto *evAnubisOsirisPut = reinterpret_cast <TEvAnubisOsirisPut::TPtr*>(&ev);
150164 --Mon.EmergencyAnubisOsirisPutQueueItems ();
151- Mon.EmergencyAnubisOsirisPutQueueBytes () -= (*evAnubisOsirisPut)-> Get ()-> ByteSize () ;
165+ Mon.EmergencyAnubisOsirisPutQueueBytes () -= size ;
152166 AnubisOsirisPutHandler (ctx, *evAnubisOsirisPut);
153167 break ;
154168 }
155169 default :
156170 Y_ABORT (" unexpected event type in emergency queue(%" PRIu64 " )" , (ui64)ev->GetTypeRewrite ());
157171 }
172+ return size;
173+ }
174+ };
175+
176+ // /////////////////////////////////////////////////////////////////////////////////////////////////
177+ // TThrottlingController
178+ // /////////////////////////////////////////////////////////////////////////////////////////////////
179+
180+ class TThrottlingController {
181+ private:
182+ struct TControls {
183+ TControlWrapper MinSstCount;
184+ TControlWrapper MaxSstCount;
185+ TControlWrapper DeviceSpeed;
186+
187+ TControls ()
188+ : MinSstCount(16 , 1 , 200 )
189+ , MaxSstCount(64 , 1 , 200 )
190+ , DeviceSpeed(50 << 20 , 1 << 20 , 1 << 30 )
191+ {}
192+
193+ void Register (TIntrusivePtr<TControlBoard> icb) {
194+ icb->RegisterSharedControl (MinSstCount, " VDiskControls.ThrottlingMinSstCount" );
195+ icb->RegisterSharedControl (MaxSstCount, " VDiskControls.ThrottlingMaxSstCount" );
196+ icb->RegisterSharedControl (DeviceSpeed, " VDiskControls.ThrottlingDeviceSpeed" );
197+ }
198+ };
199+ TControls Controls;
200+
201+ ui64 CurrentSstCount = 0 ;
202+
203+ TInstant CurrentTime;
204+ ui64 AvailableBytes = 0 ;
205+
206+ ui64 GetCurrentSpeedLimit () const {
207+ ui64 minSstCount = (ui64)Controls.MinSstCount ;
208+ ui64 maxSstCount = (ui64)Controls.MaxSstCount ;
209+ ui64 deviceSpeed = (ui64)Controls.DeviceSpeed ;
210+
211+ if (maxSstCount <= minSstCount) {
212+ return deviceSpeed;
213+ }
214+ if (CurrentSstCount <= minSstCount) {
215+ return deviceSpeed;
216+ }
217+ if (CurrentSstCount >= maxSstCount) {
218+ return 0 ;
219+ }
220+ return (double )(maxSstCount - CurrentSstCount) * deviceSpeed / (maxSstCount - minSstCount);
221+ }
222+
223+ public:
224+ void RegisterIcbControls (TIntrusivePtr<TControlBoard> icb) {
225+ Controls.Register (icb);
226+ }
227+
228+ bool IsActive () const {
229+ ui64 minSstCount = (ui64)Controls.MinSstCount ;
230+ return CurrentSstCount > minSstCount;
231+ }
232+
233+ TDuration BytesToDuration (ui64 bytes) const {
234+ auto limit = GetCurrentSpeedLimit ();
235+ if (limit == 0 ) {
236+ return TDuration::Seconds (1 );
237+ }
238+ return TDuration::Seconds ((double )bytes / limit);
239+ }
240+
241+ ui64 GetAvailableBytes () const {
242+ return AvailableBytes;
243+ }
244+
245+ ui64 Consume (ui64 bytes) {
246+ AvailableBytes -= bytes;
247+ return AvailableBytes;
248+ }
249+
250+ void UpdateState (TInstant now, ui64 sstCount) {
251+ bool prevActive = IsActive ();
252+
253+ CurrentSstCount = sstCount;
254+
255+ if (!IsActive ()) {
256+ CurrentTime = {};
257+ AvailableBytes = 0 ;
258+ } else if (!prevActive) {
259+ CurrentTime = now;
260+ AvailableBytes = 0 ;
261+ }
262+ }
263+
264+ void UpdateTime (TInstant now) {
265+ if (now <= CurrentTime) {
266+ return ;
267+ }
268+ auto us = (now - CurrentTime).MicroSeconds ();
269+ AvailableBytes += GetCurrentSpeedLimit () * us / 1000000 ; // overflow ?
270+ ui64 deviceSpeed = (ui64)Controls.DeviceSpeed ;
271+ AvailableBytes = Min (AvailableBytes, deviceSpeed);
272+ CurrentTime = now;
158273 }
159274 };
160275
@@ -177,6 +292,7 @@ namespace NKikimr {
177292 , EmergencyQueue(new TEmergencyQueue(Mon, std::move(vMovedPatch), std::move(vPatchStart), std::move(vput),
178293 std::move (vMultiPut), std::move(loc), std::move(aoput)))
179294 , DynamicPDiskWeightsManager(std::make_shared<TDynamicPDiskWeightsManager>(vctx, pdiskCtx))
295+ , ThrottlingController(new TThrottlingController)
180296 {}
181297
182298 TOverloadHandler::~TOverloadHandler () {}
@@ -209,12 +325,59 @@ namespace NKikimr {
209325 bool TOverloadHandler::ProcessPostponedEvents (const TActorContext &ctx, int batchSize, bool actualizeLevels) {
210326 ActualizeWeights (ctx, AllEHullDbTypes, actualizeLevels);
211327
328+ if (DynamicPDiskWeightsManager->StopPuts ()) {
329+ return false ;
330+ }
331+
332+ if (AppData ()->FeatureFlags .GetEnableVDiskThrottling ()) {
333+ auto snapshot = Hull->GetSnapshot (); // THullDsSnap
334+ auto & logoBlobsSnap = snapshot.LogoBlobsSnap ; // TLogoBlobsSnapshot
335+ auto & sliceSnap = logoBlobsSnap.SliceSnap ; // TLevelSliceSnapshot
336+ auto sstCount = sliceSnap.GetLevel0SstsNum ();
337+
338+ auto now = ctx.Now ();
339+ ThrottlingController->UpdateState (now, sstCount);
340+
341+ if (ThrottlingController->IsActive ()) {
342+ ThrottlingController->UpdateTime (now);
343+
344+ int count = batchSize;
345+ auto bytes = ThrottlingController->GetAvailableBytes ();
346+
347+ while (count > 0 &&
348+ !EmergencyQueue->Empty () &&
349+ bytes >= EmergencyQueue->GetHeadEventSize ())
350+ {
351+ auto size = EmergencyQueue->Process (ctx);
352+ bytes = ThrottlingController->Consume (size);
353+ --count;
354+ }
355+
356+ if (EmergencyQueue->Empty ()) {
357+ return false ;
358+ }
359+
360+ if (bytes >= EmergencyQueue->GetHeadEventSize ()) {
361+ return true ;
362+ }
363+
364+ if (!KickInFlight) {
365+ auto left = EmergencyQueue->GetHeadEventSize () - bytes;
366+ auto duration = ThrottlingController->BytesToDuration (left);
367+ duration += TDuration::MilliSeconds (1 );
368+ ctx.Schedule (duration, new TEvKickEmergencyPutQueue);
369+ KickInFlight = true ;
370+ }
371+
372+ return false ;
373+ }
374+ }
375+
212376 // process batchSize events maximum and once
213377 int count = batchSize;
214378 while (count > 0 && !DynamicPDiskWeightsManager->StopPuts () && !EmergencyQueue->Empty ()) {
215379 // process single event from the emergency queue
216380 EmergencyQueue->Process (ctx);
217-
218381 --count;
219382 }
220383
@@ -245,9 +408,20 @@ namespace NKikimr {
245408 DynamicPDiskWeightsManager->RenderHtml (str);
246409 }
247410
411+ void TOverloadHandler::OnKickEmergencyPutQueue () {
412+ KickInFlight = false ;
413+ }
414+
415+ void TOverloadHandler::RegisterIcbControls (TIntrusivePtr<TControlBoard> icb) {
416+ ThrottlingController->RegisterIcbControls (icb);
417+ }
418+
248419 template <class TEv >
249420 inline bool TOverloadHandler::PostponeEvent (TAutoPtr<TEventHandle<TEv>> &ev) {
250- if (DynamicPDiskWeightsManager->StopPuts () || !EmergencyQueue->Empty ()) {
421+ if (DynamicPDiskWeightsManager->StopPuts () ||
422+ ThrottlingController->IsActive () ||
423+ !EmergencyQueue->Empty ())
424+ {
251425 EmergencyQueue->Push (ev);
252426 return true ;
253427 } else {
0 commit comments