11#include < ydb/core/fq/libs/compute/ydb/events/events.h>
22#include < ydb/core/fq/libs/control_plane_storage/util.h>
33
4+ #include < ydb/core/kqp/workload_service/common/cpu_quota_manager.h>
5+
46#include < ydb/library/services/services.pb.h>
57
68#include < ydb/library/security/ydb_credentials_provider_factory.h>
@@ -24,17 +26,9 @@ namespace NFq {
2426class TComputeDatabaseMonitoringActor : public NActors ::TActorBootstrapped<TComputeDatabaseMonitoringActor> {
2527 struct TCounters {
2628 ::NMonitoring::TDynamicCounterPtr Counters;
27- struct TCommonMetrics {
28- ::NMonitoring::TDynamicCounters::TCounterPtr Ok;
29- ::NMonitoring::TDynamicCounters::TCounterPtr Error;
30- ::NMonitoring::THistogramPtr LatencyMs;
31- };
32-
33- TCommonMetrics CpuLoadRequest;
34- ::NMonitoring::TDynamicCounters::TCounterPtr InstantLoadPercentage;
35- ::NMonitoring::TDynamicCounters::TCounterPtr AverageLoadPercentage;
36- ::NMonitoring::TDynamicCounters::TCounterPtr QuotedLoadPercentage;
37- ::NMonitoring::TDynamicCounters::TCounterPtr AvailableLoadPercentage;
29+ ::NMonitoring::TDynamicCounterPtr SubComponent;
30+
31+ ::NMonitoring::THistogramPtr CpuLoadRequestLatencyMs;
3832 ::NMonitoring::TDynamicCounters::TCounterPtr TargetLoadPercentage;
3933 ::NMonitoring::TDynamicCounters::TCounterPtr PendingQueueSize;
4034 ::NMonitoring::TDynamicCounters::TCounterPtr PendingQueueOverload;
@@ -48,21 +42,11 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp
4842 private:
4943 void Register () {
5044 ::NMonitoring::TDynamicCounterPtr component = Counters->GetSubgroup (" component" , " ComputeDatabaseMonitoring" );
51- auto subComponent = component->GetSubgroup (" subcomponent" , " CpuLoadRequest" );
52- RegisterCommonMetrics (CpuLoadRequest, subComponent);
53- InstantLoadPercentage = subComponent->GetCounter (" InstantLoadPercentage" , false );
54- AverageLoadPercentage = subComponent->GetCounter (" AverageLoadPercentage" , false );
55- QuotedLoadPercentage = subComponent->GetCounter (" QuotedLoadPercentage" , false );
56- AvailableLoadPercentage = subComponent->GetCounter (" AvailableLoadPercentage" , false );
57- TargetLoadPercentage = subComponent->GetCounter (" TargetLoadPercentage" , false );
58- PendingQueueSize = subComponent->GetCounter (" PendingQueueSize" , false );
59- PendingQueueOverload = subComponent->GetCounter (" PendingQueueOverload" , true );
60- }
61-
62- void RegisterCommonMetrics (TCommonMetrics& metrics, ::NMonitoring::TDynamicCounterPtr subComponent) {
63- metrics.Ok = subComponent->GetCounter (" Ok" , true );
64- metrics.Error = subComponent->GetCounter (" Error" , true );
65- metrics.LatencyMs = subComponent->GetHistogram (" LatencyMs" , GetLatencyHistogramBuckets ());
45+ SubComponent = component->GetSubgroup (" subcomponent" , " CpuLoadRequest" );
46+ CpuLoadRequestLatencyMs = SubComponent->GetHistogram (" LatencyMs" , GetLatencyHistogramBuckets ());
47+ TargetLoadPercentage = SubComponent->GetCounter (" TargetLoadPercentage" , false );
48+ PendingQueueSize = SubComponent->GetCounter (" PendingQueueSize" , false );
49+ PendingQueueOverload = SubComponent->GetCounter (" PendingQueueOverload" , true );
6650 }
6751
6852 static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets () {
@@ -75,15 +59,19 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp
7559 TComputeDatabaseMonitoringActor (const TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters)
7660 : MonitoringClientActorId(monitoringClientActorId)
7761 , Counters(counters)
78- , MonitoringRequestDelay(GetDuration(config.GetMonitoringRequestDelay(), TDuration::Seconds(1 )))
79- , AverageLoadInterval(std::max<TDuration>(GetDuration(config.GetAverageLoadInterval(), TDuration::Seconds(10 )), TDuration::Seconds(1 )))
8062 , MaxClusterLoad(std::min<ui32>(config.GetMaxClusterLoadPercentage(), 100 ) / 100.0 )
81- , DefaultQueryLoad(config.GetDefaultQueryLoadPercentage() ? std::min<ui32>(config.GetDefaultQueryLoadPercentage(), 100 ) / 100.0 : 0.1 )
8263 , PendingQueueSize(config.GetPendingQueueSize())
8364 , Strict(config.GetStrict())
84- , CpuNumber(config.GetCpuNumber())
65+ , CpuQuotaManager(
66+ GetDuration (config.GetMonitoringRequestDelay(), TDuration::Seconds(1 )),
67+ std::max<TDuration>(GetDuration(config.GetAverageLoadInterval(), TDuration::Seconds(10 )), TDuration::Seconds(1 )),
68+ TDuration::Zero(),
69+ config.GetDefaultQueryLoadPercentage() ? std::min<ui32>(config.GetDefaultQueryLoadPercentage(), 100) / 100.0 : 0.1,
70+ config.GetStrict(),
71+ config.GetCpuNumber(),
72+ Counters.SubComponent
73+ )
8574 {
86- *Counters.AvailableLoadPercentage = 100 ;
8775 *Counters.TargetLoadPercentage = static_cast <ui64>(MaxClusterLoad * 100 );
8876 }
8977
@@ -105,54 +93,29 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp
10593 )
10694
10795 void Handle(TEvYdbCompute::TEvCpuLoadRequest::TPtr& ev) {
108- auto response = std::make_unique<TEvYdbCompute::TEvCpuLoadResponse>(InstantLoad, AverageLoad );
109- if (!Ready ) {
96+ auto response = std::make_unique<TEvYdbCompute::TEvCpuLoadResponse>(CpuQuotaManager. GetInstantLoad (), CpuQuotaManager. GetAverageLoad () );
97+ if (!CpuQuotaManager. CheckLoadIsOutdated () ) {
11098 response->Issues .AddIssue (" CPU Load is unavailable" );
11199 }
112100 Send (ev->Sender , response.release (), 0 , ev->Cookie );
113101 }
114102
115103 void Handle (TEvYdbCompute::TEvCpuLoadResponse::TPtr& ev) {
116104 const auto & response = *ev.Get ()->Get ();
117-
118- auto now = TInstant::Now ();
119- if (!response.Issues ) {
120- auto delta = now - LastCpuLoad;
121- LastCpuLoad = now;
122-
123- if (response.CpuNumber ) {
124- CpuNumber = response.CpuNumber ;
125- }
126-
127- InstantLoad = response.InstantLoad ;
128- // exponential moving average
129- if (!Ready || delta >= AverageLoadInterval) {
130- AverageLoad = InstantLoad;
131- QuotedLoad = InstantLoad;
132- } else {
133- auto ratio = static_cast <double >(delta.GetValue ()) / AverageLoadInterval.GetValue ();
134- AverageLoad = (1 - ratio) * AverageLoad + ratio * InstantLoad;
135- QuotedLoad = (1 - ratio) * QuotedLoad + ratio * InstantLoad;
136- }
137- Ready = true ;
138- Counters.CpuLoadRequest .Ok ->Inc ();
139- *Counters.InstantLoadPercentage = static_cast <ui64>(InstantLoad * 100 );
140- *Counters.AverageLoadPercentage = static_cast <ui64>(AverageLoad * 100 );
141- CheckPendingQueue ();
142- *Counters.QuotedLoadPercentage = static_cast <ui64>(QuotedLoad * 100 );
143- } else {
105+ if (response.Issues ) {
144106 LOG_E (" CPU Load Request FAILED: " << response.Issues .ToOneLineString ());
145- Counters.CpuLoadRequest .Error ->Inc ();
146- CheckLoadIsOutdated ();
147107 }
148- Counters.CpuLoadRequest .LatencyMs ->Collect ((now - StartCpuLoad).MilliSeconds ());
108+ Counters.CpuLoadRequestLatencyMs ->Collect ((TInstant::Now () - StartCpuLoad).MilliSeconds ());
109+
110+ CpuQuotaManager.UpdateCpuLoad (response.InstantLoad , response.CpuNumber , !response.Issues );
111+ CheckPendingQueue ();
149112
150113 // TODO: make load pulling reactive
151114 // 1. Long period (i.e. AverageLoadInterval/2) when idle (no requests)
152115 // 2. Active pulling when busy
153116
154- if (MonitoringRequestDelay ) {
155- Schedule (MonitoringRequestDelay , new NActors::TEvents::TEvWakeup ());
117+ if (auto delay = CpuQuotaManager. GetMonitoringRequestDelay () ) {
118+ Schedule (delay , new NActors::TEvents::TEvWakeup ());
156119 } else {
157120 SendCpuLoadRequest ();
158121 }
@@ -164,48 +127,24 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp
164127 if (request.Quota > 1.0 ) {
165128 Send (ev->Sender , new TEvYdbCompute::TEvCpuQuotaResponse (-1 , NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << " Incorrect quota value (exceeds 1.0) " << request.Quota }}), 0 , ev->Cookie );
166129 } else {
167- if (!request.Quota ) {
168- request.Quota = DefaultQueryLoad;
169- }
170- CheckLoadIsOutdated ();
171- if (MaxClusterLoad > 0.0 && ((!Ready && Strict) || QuotedLoad >= MaxClusterLoad)) {
172- if (PendingQueue.size () >= PendingQueueSize) {
173- Send (ev->Sender , new TEvYdbCompute::TEvCpuQuotaResponse (-1 , NYdb::EStatus::OVERLOADED, NYql::TIssues{
174- NYql::TIssue{TStringBuilder{}
175- << " Cluster is overloaded, current quoted load " << static_cast <ui64>(QuotedLoad * 100 )
176- << " %, average load " << static_cast <ui64>(AverageLoad * 100 ) << " %"
177- }}), 0 , ev->Cookie );
130+ auto response = CpuQuotaManager.RequestCpuQuota (request.Quota , MaxClusterLoad);
131+ CheckPendingQueue ();
132+ if (response.Status == NYdb::EStatus::OVERLOADED && PendingQueue.size () < PendingQueueSize) {
133+ PendingQueue.push (ev);
134+ Counters.PendingQueueSize ->Inc ();
135+ } else {
136+ if (response.Status == NYdb::EStatus::OVERLOADED) {
178137 Counters.PendingQueueOverload ->Inc ();
179- } else {
180- PendingQueue.push (ev);
181- Counters.PendingQueueSize ->Inc ();
182138 }
183- } else {
184- QuotedLoad += request.Quota ;
185- *Counters.QuotedLoadPercentage = static_cast <ui64>(QuotedLoad * 100 );
186- Send (ev->Sender , new TEvYdbCompute::TEvCpuQuotaResponse (QuotedLoad * 100 ), 0 , ev->Cookie );
139+ Send (ev->Sender , new TEvYdbCompute::TEvCpuQuotaResponse (response.CurrentLoad , response.Status , response.Issues ), 0 , ev->Cookie );
187140 }
188141 }
189142 }
190143
191144 void Handle (TEvYdbCompute::TEvCpuQuotaAdjust::TPtr& ev) {
192- if (CpuNumber) {
193- auto & request = *ev.Get ()->Get ();
194- if (request.Duration && request.Duration < AverageLoadInterval / 2 && request.Quota <= 1.0 ) {
195- auto load = (request.CpuSecondsConsumed * 1000 / request.Duration .MilliSeconds ()) / CpuNumber;
196- auto quota = request.Quota ? request.Quota : DefaultQueryLoad;
197- if (quota > load) {
198- auto adjustment = (quota - load) / 2 ;
199- if (QuotedLoad > adjustment) {
200- QuotedLoad -= adjustment;
201- } else {
202- QuotedLoad = 0.0 ;
203- }
204- CheckPendingQueue ();
205- *Counters.QuotedLoadPercentage = static_cast <ui64>(QuotedLoad * 100 );
206- }
207- }
208- }
145+ auto & request = *ev.Get ()->Get ();
146+ CpuQuotaManager.AdjustCpuQuota (request.Quota , request.Duration , request.CpuSecondsConsumed );
147+ CheckPendingQueue ();
209148 }
210149
211150 void SendCpuLoadRequest () {
@@ -215,57 +154,51 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp
215154
216155 void CheckLoadIsOutdated () {
217156 // TODO: support timeout to decline quota after request pending time is over, not load info
218- if (TInstant::Now () - LastCpuLoad > AverageLoadInterval) {
219- Ready = false ;
220- QuotedLoad = 0.0 ;
221- if (Strict) {
222- while (PendingQueue.size ()) {
223- auto & ev = PendingQueue.front ();
224- Send (ev->Sender , new TEvYdbCompute::TEvCpuQuotaResponse (-1 , NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << " Cluster load info is not available" }}), 0 , ev->Cookie );
225- PendingQueue.pop ();
226- Counters.PendingQueueSize ->Dec ();
227- }
157+ if (Strict && !CpuQuotaManager.CheckLoadIsOutdated ()) {
158+ while (PendingQueue.size ()) {
159+ auto & ev = PendingQueue.front ();
160+ Send (ev->Sender , new TEvYdbCompute::TEvCpuQuotaResponse (-1 , NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << " Cluster load info is not available" }}), 0 , ev->Cookie );
161+ PendingQueue.pop ();
162+ Counters.PendingQueueSize ->Dec ();
228163 }
229164 }
230165 }
231166
232167 void CheckPendingQueue () {
168+ CheckLoadIsOutdated ();
169+
233170 auto now = TInstant::Now ();
234- while (QuotedLoad < MaxClusterLoad && PendingQueue.size ()) {
171+ while (PendingQueue.size ()) {
235172 auto & ev = PendingQueue.front ();
236173 auto & request = *ev.Get ()->Get ();
237174 if (request.Deadline && now >= request.Deadline ) {
238175 Send (ev->Sender , new TEvYdbCompute::TEvCpuQuotaResponse (-1 , NYdb::EStatus::CANCELLED, NYql::TIssues{
239176 NYql::TIssue{TStringBuilder{} << " Deadline reached " << request.Deadline }}), 0 , ev->Cookie );
240177 } else {
241- QuotedLoad += request.Quota ;
242- Send (ev->Sender , new TEvYdbCompute::TEvCpuQuotaResponse (QuotedLoad * 100 ), 0 , ev->Cookie );
178+ auto response = CpuQuotaManager.RequestCpuQuota (request.Quota , MaxClusterLoad);
179+ if (response.Status == NYdb::EStatus::OVERLOADED) {
180+ break ;
181+ }
182+
183+ Send (ev->Sender , new TEvYdbCompute::TEvCpuQuotaResponse (response.CurrentLoad , response.Status , response.Issues ), 0 , ev->Cookie );
243184 }
185+
244186 PendingQueue.pop ();
245187 Counters.PendingQueueSize ->Dec ();
246188 }
247189 }
248190
249191private:
250- TInstant StartCpuLoad;
251- TInstant LastCpuLoad;
252192 TActorId MonitoringClientActorId;
253193 TCounters Counters;
254-
255- double InstantLoad = 0.0 ;
256- double AverageLoad = 0.0 ;
257- double QuotedLoad = 0.0 ;
258- bool Ready = false ;
259-
260- const TDuration MonitoringRequestDelay;
261- const TDuration AverageLoadInterval;
262194 const double MaxClusterLoad;
263- const double DefaultQueryLoad;
264195 const ui32 PendingQueueSize;
265196 const bool Strict;
266- ui32 CpuNumber = 0 ;
267197
198+ NKikimr::NKqp::NWorkload::TCpuQuotaManager CpuQuotaManager;
268199 TQueue<TEvYdbCompute::TEvCpuQuotaRequest::TPtr> PendingQueue;
200+
201+ TInstant StartCpuLoad;
269202};
270203
271204std::unique_ptr<NActors::IActor> CreateDatabaseMonitoringActor (const NActors::TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters) {
0 commit comments