Skip to content

Commit d606f15

Browse files
authored
Merge 620351c into 56c68e1
2 parents 56c68e1 + 620351c commit d606f15

31 files changed

+1200
-385
lines changed

ydb/core/fq/libs/compute/ydb/control_plane/database_monitoring.cpp

+58-125
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
#include <ydb/core/fq/libs/compute/ydb/events/events.h>
22
#include <ydb/core/fq/libs/control_plane_storage/util.h>
33

4+
#include <ydb/core/kqp/workload_service/common/cpu_quota_manager.h>
5+
46
#include <ydb/library/services/services.pb.h>
57

68
#include <ydb/library/security/ydb_credentials_provider_factory.h>
@@ -24,17 +26,9 @@ namespace NFq {
2426
class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComputeDatabaseMonitoringActor> {
2527
struct TCounters {
2628
::NMonitoring::TDynamicCounterPtr Counters;
27-
struct TCommonMetrics {
28-
::NMonitoring::TDynamicCounters::TCounterPtr Ok;
29-
::NMonitoring::TDynamicCounters::TCounterPtr Error;
30-
::NMonitoring::THistogramPtr LatencyMs;
31-
};
32-
33-
TCommonMetrics CpuLoadRequest;
34-
::NMonitoring::TDynamicCounters::TCounterPtr InstantLoadPercentage;
35-
::NMonitoring::TDynamicCounters::TCounterPtr AverageLoadPercentage;
36-
::NMonitoring::TDynamicCounters::TCounterPtr QuotedLoadPercentage;
37-
::NMonitoring::TDynamicCounters::TCounterPtr AvailableLoadPercentage;
29+
::NMonitoring::TDynamicCounterPtr SubComponent;
30+
31+
::NMonitoring::THistogramPtr CpuLoadRequestLatencyMs;
3832
::NMonitoring::TDynamicCounters::TCounterPtr TargetLoadPercentage;
3933
::NMonitoring::TDynamicCounters::TCounterPtr PendingQueueSize;
4034
::NMonitoring::TDynamicCounters::TCounterPtr PendingQueueOverload;
@@ -48,21 +42,11 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp
4842
private:
4943
void Register() {
5044
::NMonitoring::TDynamicCounterPtr component = Counters->GetSubgroup("component", "ComputeDatabaseMonitoring");
51-
auto subComponent = component->GetSubgroup("subcomponent", "CpuLoadRequest");
52-
RegisterCommonMetrics(CpuLoadRequest, subComponent);
53-
InstantLoadPercentage = subComponent->GetCounter("InstantLoadPercentage", false);
54-
AverageLoadPercentage = subComponent->GetCounter("AverageLoadPercentage", false);
55-
QuotedLoadPercentage = subComponent->GetCounter("QuotedLoadPercentage", false);
56-
AvailableLoadPercentage = subComponent->GetCounter("AvailableLoadPercentage", false);
57-
TargetLoadPercentage = subComponent->GetCounter("TargetLoadPercentage", false);
58-
PendingQueueSize = subComponent->GetCounter("PendingQueueSize", false);
59-
PendingQueueOverload = subComponent->GetCounter("PendingQueueOverload", true);
60-
}
61-
62-
void RegisterCommonMetrics(TCommonMetrics& metrics, ::NMonitoring::TDynamicCounterPtr subComponent) {
63-
metrics.Ok = subComponent->GetCounter("Ok", true);
64-
metrics.Error = subComponent->GetCounter("Error", true);
65-
metrics.LatencyMs = subComponent->GetHistogram("LatencyMs", GetLatencyHistogramBuckets());
45+
SubComponent = component->GetSubgroup("subcomponent", "CpuLoadRequest");
46+
CpuLoadRequestLatencyMs = SubComponent->GetHistogram("LatencyMs", GetLatencyHistogramBuckets());
47+
TargetLoadPercentage = SubComponent->GetCounter("TargetLoadPercentage", false);
48+
PendingQueueSize = SubComponent->GetCounter("PendingQueueSize", false);
49+
PendingQueueOverload = SubComponent->GetCounter("PendingQueueOverload", true);
6650
}
6751

6852
static ::NMonitoring::IHistogramCollectorPtr GetLatencyHistogramBuckets() {
@@ -75,15 +59,19 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp
7559
TComputeDatabaseMonitoringActor(const TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters)
7660
: MonitoringClientActorId(monitoringClientActorId)
7761
, Counters(counters)
78-
, MonitoringRequestDelay(GetDuration(config.GetMonitoringRequestDelay(), TDuration::Seconds(1)))
79-
, AverageLoadInterval(std::max<TDuration>(GetDuration(config.GetAverageLoadInterval(), TDuration::Seconds(10)), TDuration::Seconds(1)))
8062
, MaxClusterLoad(std::min<ui32>(config.GetMaxClusterLoadPercentage(), 100) / 100.0)
81-
, DefaultQueryLoad(config.GetDefaultQueryLoadPercentage() ? std::min<ui32>(config.GetDefaultQueryLoadPercentage(), 100) / 100.0 : 0.1)
8263
, PendingQueueSize(config.GetPendingQueueSize())
8364
, Strict(config.GetStrict())
84-
, CpuNumber(config.GetCpuNumber())
65+
, CpuQuotaManager(
66+
GetDuration(config.GetMonitoringRequestDelay(), TDuration::Seconds(1)),
67+
std::max<TDuration>(GetDuration(config.GetAverageLoadInterval(), TDuration::Seconds(10)), TDuration::Seconds(1)),
68+
TDuration::Zero(),
69+
config.GetDefaultQueryLoadPercentage() ? std::min<ui32>(config.GetDefaultQueryLoadPercentage(), 100) / 100.0 : 0.1,
70+
config.GetStrict(),
71+
config.GetCpuNumber(),
72+
Counters.SubComponent
73+
)
8574
{
86-
*Counters.AvailableLoadPercentage = 100;
8775
*Counters.TargetLoadPercentage = static_cast<ui64>(MaxClusterLoad * 100);
8876
}
8977

@@ -105,54 +93,29 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp
10593
)
10694

10795
void Handle(TEvYdbCompute::TEvCpuLoadRequest::TPtr& ev) {
108-
auto response = std::make_unique<TEvYdbCompute::TEvCpuLoadResponse>(InstantLoad, AverageLoad);
109-
if (!Ready) {
96+
auto response = std::make_unique<TEvYdbCompute::TEvCpuLoadResponse>(CpuQuotaManager.GetInstantLoad(), CpuQuotaManager.GetAverageLoad());
97+
if (!CpuQuotaManager.CheckLoadIsOutdated()) {
11098
response->Issues.AddIssue("CPU Load is unavailable");
11199
}
112100
Send(ev->Sender, response.release(), 0, ev->Cookie);
113101
}
114102

115103
void Handle(TEvYdbCompute::TEvCpuLoadResponse::TPtr& ev) {
116104
const auto& response = *ev.Get()->Get();
117-
118-
auto now = TInstant::Now();
119-
if (!response.Issues) {
120-
auto delta = now - LastCpuLoad;
121-
LastCpuLoad = now;
122-
123-
if (response.CpuNumber) {
124-
CpuNumber = response.CpuNumber;
125-
}
126-
127-
InstantLoad = response.InstantLoad;
128-
// exponential moving average
129-
if (!Ready || delta >= AverageLoadInterval) {
130-
AverageLoad = InstantLoad;
131-
QuotedLoad = InstantLoad;
132-
} else {
133-
auto ratio = static_cast<double>(delta.GetValue()) / AverageLoadInterval.GetValue();
134-
AverageLoad = (1 - ratio) * AverageLoad + ratio * InstantLoad;
135-
QuotedLoad = (1 - ratio) * QuotedLoad + ratio * InstantLoad;
136-
}
137-
Ready = true;
138-
Counters.CpuLoadRequest.Ok->Inc();
139-
*Counters.InstantLoadPercentage = static_cast<ui64>(InstantLoad * 100);
140-
*Counters.AverageLoadPercentage = static_cast<ui64>(AverageLoad * 100);
141-
CheckPendingQueue();
142-
*Counters.QuotedLoadPercentage = static_cast<ui64>(QuotedLoad * 100);
143-
} else {
105+
if (response.Issues) {
144106
LOG_E("CPU Load Request FAILED: " << response.Issues.ToOneLineString());
145-
Counters.CpuLoadRequest.Error->Inc();
146-
CheckLoadIsOutdated();
147107
}
148-
Counters.CpuLoadRequest.LatencyMs->Collect((now - StartCpuLoad).MilliSeconds());
108+
Counters.CpuLoadRequestLatencyMs->Collect((TInstant::Now() - StartCpuLoad).MilliSeconds());
109+
110+
CpuQuotaManager.UpdateCpuLoad(response.InstantLoad, response.CpuNumber, !response.Issues);
111+
CheckPendingQueue();
149112

150113
// TODO: make load pulling reactive
151114
// 1. Long period (i.e. AverageLoadInterval/2) when idle (no requests)
152115
// 2. Active pulling when busy
153116

154-
if (MonitoringRequestDelay) {
155-
Schedule(MonitoringRequestDelay, new NActors::TEvents::TEvWakeup());
117+
if (auto delay = CpuQuotaManager.GetMonitoringRequestDelay()) {
118+
Schedule(delay, new NActors::TEvents::TEvWakeup());
156119
} else {
157120
SendCpuLoadRequest();
158121
}
@@ -164,48 +127,24 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp
164127
if (request.Quota > 1.0) {
165128
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Incorrect quota value (exceeds 1.0) " << request.Quota}}), 0, ev->Cookie);
166129
} else {
167-
if (!request.Quota) {
168-
request.Quota = DefaultQueryLoad;
169-
}
170-
CheckLoadIsOutdated();
171-
if (MaxClusterLoad > 0.0 && ((!Ready && Strict) || QuotedLoad >= MaxClusterLoad)) {
172-
if (PendingQueue.size() >= PendingQueueSize) {
173-
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{
174-
NYql::TIssue{TStringBuilder{}
175-
<< "Cluster is overloaded, current quoted load " << static_cast<ui64>(QuotedLoad * 100)
176-
<< "%, average load " << static_cast<ui64>(AverageLoad * 100) << "%"
177-
}}), 0, ev->Cookie);
130+
auto response = CpuQuotaManager.RequestCpuQuota(request.Quota, MaxClusterLoad);
131+
CheckPendingQueue();
132+
if (response.Status == NYdb::EStatus::OVERLOADED && PendingQueue.size() < PendingQueueSize) {
133+
PendingQueue.push(ev);
134+
Counters.PendingQueueSize->Inc();
135+
} else {
136+
if (response.Status == NYdb::EStatus::OVERLOADED) {
178137
Counters.PendingQueueOverload->Inc();
179-
} else {
180-
PendingQueue.push(ev);
181-
Counters.PendingQueueSize->Inc();
182138
}
183-
} else {
184-
QuotedLoad += request.Quota;
185-
*Counters.QuotedLoadPercentage = static_cast<ui64>(QuotedLoad * 100);
186-
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(QuotedLoad * 100), 0, ev->Cookie);
139+
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(response.CurrentLoad, response.Status, response.Issues), 0, ev->Cookie);
187140
}
188141
}
189142
}
190143

191144
void Handle(TEvYdbCompute::TEvCpuQuotaAdjust::TPtr& ev) {
192-
if (CpuNumber) {
193-
auto& request = *ev.Get()->Get();
194-
if (request.Duration && request.Duration < AverageLoadInterval / 2 && request.Quota <= 1.0) {
195-
auto load = (request.CpuSecondsConsumed * 1000 / request.Duration.MilliSeconds()) / CpuNumber;
196-
auto quota = request.Quota ? request.Quota : DefaultQueryLoad;
197-
if (quota > load) {
198-
auto adjustment = (quota - load) / 2;
199-
if (QuotedLoad > adjustment) {
200-
QuotedLoad -= adjustment;
201-
} else {
202-
QuotedLoad = 0.0;
203-
}
204-
CheckPendingQueue();
205-
*Counters.QuotedLoadPercentage = static_cast<ui64>(QuotedLoad * 100);
206-
}
207-
}
208-
}
145+
auto& request = *ev.Get()->Get();
146+
CpuQuotaManager.AdjustCpuQuota(request.Quota, request.Duration, request.CpuSecondsConsumed);
147+
CheckPendingQueue();
209148
}
210149

211150
void SendCpuLoadRequest() {
@@ -215,57 +154,51 @@ class TComputeDatabaseMonitoringActor : public NActors::TActorBootstrapped<TComp
215154

216155
void CheckLoadIsOutdated() {
217156
// TODO: support timeout to decline quota after request pending time is over, not load info
218-
if (TInstant::Now() - LastCpuLoad > AverageLoadInterval) {
219-
Ready = false;
220-
QuotedLoad = 0.0;
221-
if (Strict) {
222-
while (PendingQueue.size()) {
223-
auto& ev = PendingQueue.front();
224-
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Cluster load info is not available"}}), 0, ev->Cookie);
225-
PendingQueue.pop();
226-
Counters.PendingQueueSize->Dec();
227-
}
157+
if (Strict && !CpuQuotaManager.CheckLoadIsOutdated()) {
158+
while (PendingQueue.size()) {
159+
auto& ev = PendingQueue.front();
160+
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::OVERLOADED, NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Cluster load info is not available"}}), 0, ev->Cookie);
161+
PendingQueue.pop();
162+
Counters.PendingQueueSize->Dec();
228163
}
229164
}
230165
}
231166

232167
void CheckPendingQueue() {
168+
CheckLoadIsOutdated();
169+
233170
auto now = TInstant::Now();
234-
while (QuotedLoad < MaxClusterLoad && PendingQueue.size()) {
171+
while (PendingQueue.size()) {
235172
auto& ev = PendingQueue.front();
236173
auto& request = *ev.Get()->Get();
237174
if (request.Deadline && now >= request.Deadline) {
238175
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(-1, NYdb::EStatus::CANCELLED, NYql::TIssues{
239176
NYql::TIssue{TStringBuilder{} << "Deadline reached " << request.Deadline}}), 0, ev->Cookie);
240177
} else {
241-
QuotedLoad += request.Quota;
242-
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(QuotedLoad * 100), 0, ev->Cookie);
178+
auto response = CpuQuotaManager.RequestCpuQuota(request.Quota, MaxClusterLoad);
179+
if (response.Status == NYdb::EStatus::OVERLOADED) {
180+
break;
181+
}
182+
183+
Send(ev->Sender, new TEvYdbCompute::TEvCpuQuotaResponse(response.CurrentLoad, response.Status, response.Issues), 0, ev->Cookie);
243184
}
185+
244186
PendingQueue.pop();
245187
Counters.PendingQueueSize->Dec();
246188
}
247189
}
248190

249191
private:
250-
TInstant StartCpuLoad;
251-
TInstant LastCpuLoad;
252192
TActorId MonitoringClientActorId;
253193
TCounters Counters;
254-
255-
double InstantLoad = 0.0;
256-
double AverageLoad = 0.0;
257-
double QuotedLoad = 0.0;
258-
bool Ready = false;
259-
260-
const TDuration MonitoringRequestDelay;
261-
const TDuration AverageLoadInterval;
262194
const double MaxClusterLoad;
263-
const double DefaultQueryLoad;
264195
const ui32 PendingQueueSize;
265196
const bool Strict;
266-
ui32 CpuNumber = 0;
267197

198+
NKikimr::NKqp::NWorkload::TCpuQuotaManager CpuQuotaManager;
268199
TQueue<TEvYdbCompute::TEvCpuQuotaRequest::TPtr> PendingQueue;
200+
201+
TInstant StartCpuLoad;
269202
};
270203

271204
std::unique_ptr<NActors::IActor> CreateDatabaseMonitoringActor(const NActors::TActorId& monitoringClientActorId, NFq::NConfig::TLoadControlConfig config, const ::NMonitoring::TDynamicCounterPtr& counters) {

ydb/core/fq/libs/compute/ydb/control_plane/ya.make

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ PEERDIR(
1818
ydb/core/fq/libs/compute/ydb/synchronization_service
1919
ydb/core/fq/libs/control_plane_storage/proto
2020
ydb/core/fq/libs/quota_manager/proto
21+
ydb/core/kqp/workload_service/common
2122
ydb/core/protos
2223
ydb/library/db_pool/protos
2324
ydb/library/yql/public/issue

ydb/core/kqp/common/events/workload_service.h

+5-1
Original file line numberDiff line numberDiff line change
@@ -41,15 +41,19 @@ struct TEvContinueRequest : public NActors::TEventLocal<TEvContinueRequest, TKqp
4141
};
4242

4343
struct TEvCleanupRequest : public NActors::TEventLocal<TEvCleanupRequest, TKqpWorkloadServiceEvents::EvCleanupRequest> {
44-
TEvCleanupRequest(const TString& database, const TString& sessionId, const TString& poolId)
44+
TEvCleanupRequest(const TString& database, const TString& sessionId, const TString& poolId, TDuration duration, TDuration cpuConsumed)
4545
: Database(database)
4646
, SessionId(sessionId)
4747
, PoolId(poolId)
48+
, Duration(duration)
49+
, CpuConsumed(cpuConsumed)
4850
{}
4951

5052
const TString Database;
5153
const TString SessionId;
5254
const TString PoolId;
55+
const TDuration Duration;
56+
const TDuration CpuConsumed;
5357
};
5458

5559
struct TEvCleanupResponse : public NActors::TEventLocal<TEvCleanupResponse, TKqpWorkloadServiceEvents::EvCleanupResponse> {

0 commit comments

Comments
 (0)