Skip to content

Commit 8e6739e

Browse files
authored
Merge d6e5831 into 1d57390
2 parents 1d57390 + d6e5831 commit 8e6739e

23 files changed

+752
-223
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
#include "cpu_quota_manager.h"
2+
3+
#include <util/string/builder.h>
4+
5+
6+
namespace NFq {
7+
8+
//// TCpuQuotaManager::TCounters
9+
10+
TCpuQuotaManager::TCounters::TCounters(const ::NMonitoring::TDynamicCounterPtr& subComponent)
11+
: SubComponent(subComponent)
12+
{
13+
Register();
14+
}
15+
16+
void TCpuQuotaManager::TCounters::Register() {
17+
RegisterCommonMetrics(CpuLoadRequest);
18+
InstantLoadPercentage = SubComponent->GetCounter("InstantLoadPercentage", false);
19+
AverageLoadPercentage = SubComponent->GetCounter("AverageLoadPercentage", false);
20+
QuotedLoadPercentage = SubComponent->GetCounter("QuotedLoadPercentage", false);
21+
}
22+
23+
void TCpuQuotaManager::TCounters::RegisterCommonMetrics(TCommonMetrics& metrics) const {
24+
metrics.Ok = SubComponent->GetCounter("Ok", true);
25+
metrics.Error = SubComponent->GetCounter("Error", true);
26+
}
27+
28+
//// TCpuQuotaManager::TCpuQuotaResponse
29+
30+
TCpuQuotaManager::TCpuQuotaResponse::TCpuQuotaResponse(int32_t currentLoad, NYdb::EStatus status, NYql::TIssues issues)
31+
: CurrentLoad(currentLoad)
32+
, Status(status)
33+
, Issues(std::move(issues))
34+
{}
35+
36+
//// TCpuQuotaManager
37+
38+
TCpuQuotaManager::TCpuQuotaManager(TDuration monitoringRequestDelay, TDuration averageLoadInterval, TDuration idleTimeout, double defaultQueryLoad, bool strict, ui32 cpuNumber, const ::NMonitoring::TDynamicCounterPtr& subComponent)
39+
: Counters(subComponent)
40+
, MonitoringRequestDelay(monitoringRequestDelay)
41+
, AverageLoadInterval(averageLoadInterval)
42+
, IdleTimeout(idleTimeout)
43+
, DefaultQueryLoad(defaultQueryLoad)
44+
, Strict(strict)
45+
, CpuNumber(cpuNumber)
46+
{}
47+
48+
double TCpuQuotaManager::GetInstantLoad() const {
49+
return InstantLoad;
50+
}
51+
52+
double TCpuQuotaManager::GetAverageLoad() const {
53+
return AverageLoad;
54+
}
55+
56+
TDuration TCpuQuotaManager::GetMonitoringRequestDelay() const {
57+
TDuration delay = MonitoringRequestDelay;
58+
if (IdleTimeout && TInstant::Now() - LastRequestCpuQuota > IdleTimeout) {
59+
delay = AverageLoadInterval / 2;
60+
}
61+
62+
return LastUpdateCpuLoad ? (LastUpdateCpuLoad + delay) - TInstant::Now() : TDuration::Zero();
63+
}
64+
65+
void TCpuQuotaManager::UpdateCpuLoad(double instantLoad, ui32 cpuNumber, bool success) {
66+
auto now = TInstant::Now();
67+
LastUpdateCpuLoad = now;
68+
69+
if (!success) {
70+
Counters.CpuLoadRequest.Error->Inc();
71+
CheckLoadIsOutdated();
72+
return;
73+
}
74+
75+
auto delta = now - LastCpuLoad;
76+
LastCpuLoad = now;
77+
78+
if (cpuNumber) {
79+
CpuNumber = cpuNumber;
80+
}
81+
82+
InstantLoad = instantLoad;
83+
// exponential moving average
84+
if (!Ready || delta >= AverageLoadInterval) {
85+
AverageLoad = InstantLoad;
86+
QuotedLoad = InstantLoad;
87+
} else {
88+
auto ratio = static_cast<double>(delta.GetValue()) / AverageLoadInterval.GetValue();
89+
AverageLoad = (1 - ratio) * AverageLoad + ratio * InstantLoad;
90+
QuotedLoad = (1 - ratio) * QuotedLoad + ratio * InstantLoad;
91+
}
92+
Ready = true;
93+
Counters.CpuLoadRequest.Ok->Inc();
94+
Counters.InstantLoadPercentage->Set(static_cast<ui64>(InstantLoad * 100));
95+
Counters.AverageLoadPercentage->Set(static_cast<ui64>(AverageLoad * 100));
96+
Counters.QuotedLoadPercentage->Set(static_cast<ui64>(QuotedLoad * 100));
97+
}
98+
99+
bool TCpuQuotaManager::CheckLoadIsOutdated() {
100+
if (TInstant::Now() - LastCpuLoad > AverageLoadInterval) {
101+
Ready = false;
102+
QuotedLoad = 0.0;
103+
Counters.QuotedLoadPercentage->Set(0);
104+
}
105+
return Ready;
106+
}
107+
108+
bool TCpuQuotaManager::HasCpuQuota(double maxClusterLoad) {
109+
LastRequestCpuQuota = TInstant::Now();
110+
return maxClusterLoad == 0.0 || ((Ready || !Strict) && QuotedLoad < maxClusterLoad);
111+
}
112+
113+
TCpuQuotaManager::TCpuQuotaResponse TCpuQuotaManager::RequestCpuQuota(double quota, double maxClusterLoad) {
114+
if (quota < 0.0 || quota > 1.0) {
115+
return TCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, {NYql::TIssue(TStringBuilder() << "Incorrect quota value (exceeds 1.0 or less than 0.0) " << quota)});
116+
}
117+
quota = quota ? quota : DefaultQueryLoad;
118+
119+
CheckLoadIsOutdated();
120+
if (!HasCpuQuota(maxClusterLoad)) {
121+
return TCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, {NYql::TIssue(TStringBuilder()
122+
<< "Cluster is overloaded, current quoted load " << static_cast<ui64>(QuotedLoad * 100)
123+
<< "%, average load " << static_cast<ui64>(AverageLoad * 100) << "%"
124+
)});
125+
}
126+
127+
QuotedLoad += quota;
128+
Counters.QuotedLoadPercentage->Set(static_cast<ui64>(QuotedLoad * 100));
129+
return TCpuQuotaResponse(QuotedLoad * 100);
130+
}
131+
132+
void TCpuQuotaManager::AdjustCpuQuota(double quota, TDuration duration, double cpuSecondsConsumed) {
133+
if (!CpuNumber) {
134+
return;
135+
}
136+
137+
if (duration && duration < AverageLoadInterval / 2 && quota <= 1.0) {
138+
quota = quota ? quota : DefaultQueryLoad;
139+
auto load = (cpuSecondsConsumed * 1000.0 / duration.MilliSeconds()) / CpuNumber;
140+
if (quota > load) {
141+
auto adjustment = (quota - load) / 2;
142+
if (QuotedLoad > adjustment) {
143+
QuotedLoad -= adjustment;
144+
} else {
145+
QuotedLoad = 0.0;
146+
}
147+
Counters.QuotedLoadPercentage->Set(static_cast<ui64>(QuotedLoad * 100));
148+
}
149+
}
150+
}
151+
152+
} // namespace NFq
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
#pragma once
2+
3+
#include <library/cpp/monlib/dynamic_counters/counters.h>
4+
5+
#include <ydb/library/yql/public/issue/yql_issue.h>
6+
7+
#include <ydb/public/sdk/cpp/client/ydb_types/status_codes.h>
8+
9+
10+
namespace NFq {
11+
12+
class TCpuQuotaManager {
13+
struct TCounters {
14+
const ::NMonitoring::TDynamicCounterPtr SubComponent;
15+
struct TCommonMetrics {
16+
::NMonitoring::TDynamicCounters::TCounterPtr Ok;
17+
::NMonitoring::TDynamicCounters::TCounterPtr Error;
18+
};
19+
20+
TCommonMetrics CpuLoadRequest;
21+
::NMonitoring::TDynamicCounters::TCounterPtr InstantLoadPercentage;
22+
::NMonitoring::TDynamicCounters::TCounterPtr AverageLoadPercentage;
23+
::NMonitoring::TDynamicCounters::TCounterPtr QuotedLoadPercentage;
24+
25+
explicit TCounters(const ::NMonitoring::TDynamicCounterPtr& subComponent);
26+
27+
private:
28+
void Register();
29+
void RegisterCommonMetrics(TCommonMetrics& metrics) const;
30+
};
31+
32+
public:
33+
struct TCpuQuotaResponse {
34+
explicit TCpuQuotaResponse(int32_t currentLoad, NYdb::EStatus status = NYdb::EStatus::SUCCESS, NYql::TIssues issues = {});
35+
36+
const int32_t CurrentLoad;
37+
const NYdb::EStatus Status;
38+
const NYql::TIssues Issues;
39+
};
40+
41+
public:
42+
TCpuQuotaManager(TDuration monitoringRequestDelay, TDuration averageLoadInterval, TDuration idleTimeout, double defaultQueryLoad, bool strict, ui32 cpuNumber, const ::NMonitoring::TDynamicCounterPtr& subComponent);
43+
44+
double GetInstantLoad() const;
45+
double GetAverageLoad() const;
46+
TDuration GetMonitoringRequestDelay() const;
47+
48+
void UpdateCpuLoad(double instantLoad, ui32 cpuNumber, bool success);
49+
bool CheckLoadIsOutdated();
50+
51+
bool HasCpuQuota(double maxClusterLoad);
52+
TCpuQuotaResponse RequestCpuQuota(double quota, double maxClusterLoad);
53+
void AdjustCpuQuota(double quota, TDuration duration, double cpuSecondsConsumed);
54+
55+
private:
56+
TCounters Counters;
57+
58+
const TDuration MonitoringRequestDelay;
59+
const TDuration AverageLoadInterval;
60+
const TDuration IdleTimeout;
61+
const double DefaultQueryLoad;
62+
const bool Strict;
63+
ui32 CpuNumber = 0;
64+
65+
TInstant LastCpuLoad;
66+
TInstant LastUpdateCpuLoad;
67+
TInstant LastRequestCpuQuota;
68+
69+
double InstantLoad = 0.0;
70+
double AverageLoad = 0.0;
71+
double QuotedLoad = 0.0;
72+
bool Ready = false;
73+
};
74+
75+
} // namespace NFq

ydb/core/fq/libs/compute/common/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
LIBRARY()
22

33
SRCS(
4+
cpu_quota_manager.cpp
45
pinger.cpp
56
run_actor_params.cpp
67
utils.cpp

0 commit comments

Comments
 (0)