Skip to content

Commit 2ca9ad2

Browse files
committed
Support load cpu threshold
1 parent 1d57390 commit 2ca9ad2

21 files changed

+900
-255
lines changed
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
#include "cpu_quota_manager.h"
2+
3+
#include <util/string/builder.h>
4+
5+
6+
namespace NFq {
7+
8+
//// TCpuQuotaManager::TCounters
9+
10+
TCpuQuotaManager::TCounters::TCounters(const ::NMonitoring::TDynamicCounterPtr& subComponent)
11+
: SubComponent(subComponent)
12+
{
13+
Register();
14+
}
15+
16+
void TCpuQuotaManager::TCounters::Register() {
17+
RegisterCommonMetrics(CpuLoadRequest);
18+
InstantLoadPercentage = SubComponent->GetCounter("InstantLoadPercentage", false);
19+
AverageLoadPercentage = SubComponent->GetCounter("AverageLoadPercentage", false);
20+
QuotedLoadPercentage = SubComponent->GetCounter("QuotedLoadPercentage", false);
21+
}
22+
23+
void TCpuQuotaManager::TCounters::RegisterCommonMetrics(TCommonMetrics& metrics) const {
24+
metrics.Ok = SubComponent->GetCounter("Ok", true);
25+
metrics.Error = SubComponent->GetCounter("Error", true);
26+
}
27+
28+
//// TCpuQuotaManager::TCpuQuotaResponse
29+
30+
TCpuQuotaManager::TCpuQuotaResponse::TCpuQuotaResponse(int32_t currentLoad, NYdb::EStatus status, NYql::TIssues issues)
31+
: CurrentLoad(currentLoad)
32+
, Status(status)
33+
, Issues(std::move(issues))
34+
{}
35+
36+
//// TCpuQuotaManager
37+
38+
TCpuQuotaManager::TCpuQuotaManager(TDuration monitoringRequestDelay, TDuration averageLoadInterval, TDuration idleTimeout, double defaultQueryLoad, bool strict, ui64 cpuNumber, const ::NMonitoring::TDynamicCounterPtr& subComponent)
39+
: Counters(subComponent)
40+
, MonitoringRequestDelay(monitoringRequestDelay)
41+
, AverageLoadInterval(averageLoadInterval)
42+
, IdleTimeout(idleTimeout)
43+
, DefaultQueryLoad(defaultQueryLoad)
44+
, Strict(strict)
45+
, CpuNumber(cpuNumber)
46+
{}
47+
48+
double TCpuQuotaManager::GetInstantLoad() const {
49+
return InstantLoad;
50+
}
51+
52+
double TCpuQuotaManager::GetAverageLoad() const {
53+
return AverageLoad;
54+
}
55+
56+
TDuration TCpuQuotaManager::GetMonitoringRequestDelay() const {
57+
return GetMonitoringRequestTime() - TInstant::Now();
58+
}
59+
60+
TInstant TCpuQuotaManager::GetMonitoringRequestTime() const {
61+
TDuration delay = MonitoringRequestDelay;
62+
if (IdleTimeout && TInstant::Now() - LastRequestCpuQuota > IdleTimeout) {
63+
delay = AverageLoadInterval / 2;
64+
}
65+
66+
return LastUpdateCpuLoad ? LastUpdateCpuLoad + delay : TInstant::Now();
67+
}
68+
69+
void TCpuQuotaManager::UpdateCpuLoad(double instantLoad, ui64 cpuNumber, bool success) {
70+
auto now = TInstant::Now();
71+
LastUpdateCpuLoad = now;
72+
73+
if (!success) {
74+
Counters.CpuLoadRequest.Error->Inc();
75+
CheckLoadIsOutdated();
76+
return;
77+
}
78+
79+
auto delta = now - LastCpuLoad;
80+
LastCpuLoad = now;
81+
82+
if (cpuNumber) {
83+
CpuNumber = cpuNumber;
84+
}
85+
86+
InstantLoad = instantLoad;
87+
// exponential moving average
88+
if (!Ready || delta >= AverageLoadInterval) {
89+
AverageLoad = InstantLoad;
90+
QuotedLoad = InstantLoad;
91+
} else {
92+
auto ratio = static_cast<double>(delta.GetValue()) / AverageLoadInterval.GetValue();
93+
AverageLoad = (1 - ratio) * AverageLoad + ratio * InstantLoad;
94+
QuotedLoad = (1 - ratio) * QuotedLoad + ratio * InstantLoad;
95+
}
96+
Ready = true;
97+
Counters.CpuLoadRequest.Ok->Inc();
98+
Counters.InstantLoadPercentage->Set(static_cast<ui64>(InstantLoad * 100));
99+
Counters.AverageLoadPercentage->Set(static_cast<ui64>(AverageLoad * 100));
100+
Counters.QuotedLoadPercentage->Set(static_cast<ui64>(QuotedLoad * 100));
101+
}
102+
103+
bool TCpuQuotaManager::CheckLoadIsOutdated() {
104+
if (TInstant::Now() - LastCpuLoad > AverageLoadInterval) {
105+
Ready = false;
106+
QuotedLoad = 0.0;
107+
Counters.QuotedLoadPercentage->Set(0);
108+
}
109+
return Ready;
110+
}
111+
112+
bool TCpuQuotaManager::HasCpuQuota(double maxClusterLoad) {
113+
LastRequestCpuQuota = TInstant::Now();
114+
return maxClusterLoad == 0.0 || ((Ready || !Strict) && QuotedLoad < maxClusterLoad);
115+
}
116+
117+
TCpuQuotaManager::TCpuQuotaResponse TCpuQuotaManager::RequestCpuQuota(double quota, double maxClusterLoad) {
118+
if (quota < 0.0 || quota > 1.0) {
119+
return TCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, {NYql::TIssue(TStringBuilder() << "Incorrect quota value (exceeds 1.0 or less than 0.0) " << quota)});
120+
}
121+
quota = quota ? quota : DefaultQueryLoad;
122+
123+
CheckLoadIsOutdated();
124+
if (!HasCpuQuota(maxClusterLoad)) {
125+
return TCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, {NYql::TIssue(TStringBuilder()
126+
<< "Cluster is overloaded, current quoted load " << static_cast<ui64>(QuotedLoad * 100)
127+
<< "%, average load " << static_cast<ui64>(AverageLoad * 100) << "%"
128+
)});
129+
}
130+
131+
QuotedLoad += quota;
132+
Counters.QuotedLoadPercentage->Set(static_cast<ui64>(QuotedLoad * 100));
133+
return TCpuQuotaResponse(QuotedLoad * 100);
134+
}
135+
136+
void TCpuQuotaManager::AdjustCpuQuota(double quota, TDuration duration, double cpuSecondsConsumed) {
137+
if (!CpuNumber) {
138+
return;
139+
}
140+
141+
if (duration && duration < AverageLoadInterval / 2 && quota <= 1.0) {
142+
quota = quota ? quota : DefaultQueryLoad;
143+
auto load = (cpuSecondsConsumed * 1000.0 / duration.MilliSeconds()) / CpuNumber;
144+
if (quota > load) {
145+
auto adjustment = (quota - load) / 2;
146+
if (QuotedLoad > adjustment) {
147+
QuotedLoad -= adjustment;
148+
} else {
149+
QuotedLoad = 0.0;
150+
}
151+
Counters.QuotedLoadPercentage->Set(static_cast<ui64>(QuotedLoad * 100));
152+
}
153+
}
154+
}
155+
156+
} // namespace NFq
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
#pragma once
2+
3+
#include <library/cpp/monlib/dynamic_counters/counters.h>
4+
5+
#include <ydb/library/yql/public/issue/yql_issue.h>
6+
7+
#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
8+
9+
10+
namespace NFq {
11+
12+
class TCpuQuotaManager {
13+
struct TCounters {
14+
const ::NMonitoring::TDynamicCounterPtr SubComponent;
15+
struct TCommonMetrics {
16+
::NMonitoring::TDynamicCounters::TCounterPtr Ok;
17+
::NMonitoring::TDynamicCounters::TCounterPtr Error;
18+
};
19+
20+
TCommonMetrics CpuLoadRequest;
21+
::NMonitoring::TDynamicCounters::TCounterPtr InstantLoadPercentage;
22+
::NMonitoring::TDynamicCounters::TCounterPtr AverageLoadPercentage;
23+
::NMonitoring::TDynamicCounters::TCounterPtr QuotedLoadPercentage;
24+
25+
explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& subComponent);
26+
27+
private:
28+
void Register();
29+
void RegisterCommonMetrics(TCommonMetrics& metrics) const;
30+
};
31+
32+
public:
33+
struct TCpuQuotaResponse {
34+
explicit TCpuQuotaResponse(int32_t currentLoad, NYdb::EStatus status = NYdb::EStatus::SUCCESS, NYql::TIssues issues = {});
35+
36+
const int32_t CurrentLoad;
37+
const NYdb::EStatus Status;
38+
const NYql::TIssues Issues;
39+
};
40+
41+
public:
42+
TCpuQuotaManager(TDuration monitoringRequestDelay, TDuration averageLoadInterval, TDuration idleTimeout, double defaultQueryLoad, bool strict, ui64 cpuNumber, const ::NMonitoring::TDynamicCounterPtr& subComponent);
43+
44+
double GetInstantLoad() const;
45+
double GetAverageLoad() const;
46+
TDuration GetMonitoringRequestDelay() const;
47+
TInstant GetMonitoringRequestTime() const;
48+
49+
void UpdateCpuLoad(double instantLoad, ui64 cpuNumber, bool success);
50+
bool CheckLoadIsOutdated();
51+
52+
bool HasCpuQuota(double maxClusterLoad);
53+
TCpuQuotaResponse RequestCpuQuota(double quota, double maxClusterLoad);
54+
void AdjustCpuQuota(double quota, TDuration duration, double cpuSecondsConsumed);
55+
56+
private:
57+
TCounters Counters;
58+
59+
const TDuration MonitoringRequestDelay;
60+
const TDuration AverageLoadInterval;
61+
const TDuration IdleTimeout;
62+
const double DefaultQueryLoad;
63+
const bool Strict;
64+
ui64 CpuNumber = 0;
65+
66+
TInstant LastCpuLoad;
67+
TInstant LastUpdateCpuLoad;
68+
TInstant LastRequestCpuQuota;
69+
70+
double InstantLoad = 0.0;
71+
double AverageLoad = 0.0;
72+
double QuotedLoad = 0.0;
73+
bool Ready = false;
74+
};
75+
76+
} // namespace NFq

ydb/core/fq/libs/compute/common/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
LIBRARY()
22

33
SRCS(
4+
cpu_quota_manager.cpp
45
pinger.cpp
56
run_actor_params.cpp
67
utils.cpp

0 commit comments

Comments
 (0)