Skip to content

Commit b176c36

Browse files
committed
support per-query limits
1 parent eb96f1f commit b176c36

File tree

7 files changed

+129
-80
lines changed

7 files changed

+129
-80
lines changed

ydb/core/kqp/executer_actor/kqp_planner.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,10 @@ std::unique_ptr<TEvKqpNode::TEvStartKqpTasksRequest> TKqpPlanner::SerializeReque
234234
request.SetDatabase(Database);
235235
if (UserRequestContext->PoolConfig.has_value()) {
236236
request.SetMemoryPoolPercent(UserRequestContext->PoolConfig->QueryMemoryLimitPercentPerNode);
237-
request.SetMaxCpuShare(UserRequestContext->PoolConfig->TotalCpuLimitPercentPerNode / 100.0);
237+
request.SetPoolMaxCpuShare(UserRequestContext->PoolConfig->TotalCpuLimitPercentPerNode / 100.0);
238+
if (UserRequestContext->PoolConfig->QueryCpuLimitPercentPerNode >= 0) {
239+
request.SetQueryCpuShare(UserRequestContext->PoolConfig->QueryCpuLimitPercentPerNode / 100.0);
240+
}
238241
}
239242

240243
return result;

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,10 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
190190
ev->Get()->Record.GetSerializedGUCSettings() : "";
191191

192192
auto schedulerNow = TlsActivationContext->Monotonic();
193+
std::optional<ui64> querySchedulerGroup;
194+
if (msg.HasQueryCpuShare()) {
195+
querySchedulerGroup = Scheduler->MakeAnonymousGroup(schedulerNow, msg.GetQueryCpuShare());
196+
}
193197

194198
// start compute actors
195199
TMaybe<NYql::NDqProto::TRlPath> rlPath = Nothing();
@@ -217,7 +221,7 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
217221
};
218222

219223
if (SchedulerOptions.Scheduler->Disabled(group)) {
220-
auto share = msg.GetMaxCpuShare();
224+
auto share = msg.GetPoolMaxCpuShare();
221225
if (share > 0) {
222226
Scheduler->UpdateMaxShare(group, share, schedulerNow);
223227
Send(SchedulerActorId, new TEvSchedulerNewPool(msg.GetDatabase(), group, share));
@@ -228,6 +232,9 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
228232

229233
if (!schedulingTaskOptions.NoThrottle) {
230234
schedulingTaskOptions.Handle = SchedulerOptions.Scheduler->Enroll(schedulingTaskOptions.Group, schedulingTaskOptions.Weight, schedulingTaskOptions.Now);
235+
if (querySchedulerGroup) {
236+
Scheduler->AddToGroup(schedulerNow, *querySchedulerGroup, schedulingTaskOptions.Handle);
237+
}
231238
}
232239

233240
auto result = CaFactory_->CreateKqpComputeActor({

ydb/core/kqp/runtime/kqp_compute_scheduler.cpp

Lines changed: 107 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ class TSchedulerEntity {
105105
~TSchedulerEntity() {}
106106

107107
struct TGroupMutableStats {
108-
double Weight = 0;
108+
double Share = 0;
109109
TMonotonic LastNowRecalc;
110110
bool Disabled = false;
111111
i64 EntitiesWeight = 0;
@@ -115,7 +115,7 @@ class TSchedulerEntity {
115115
ssize_t TrackedBefore = 0;
116116

117117
double Limit(TMonotonic now) const {
118-
return FromDuration(now - LastNowRecalc) * Weight + MaxLimitDeviation + TrackedBefore;
118+
return FromDuration(now - LastNowRecalc) * Share + MaxLimitDeviation + TrackedBefore;
119119
}
120120
};
121121

@@ -153,7 +153,7 @@ class TSchedulerEntity {
153153
TMultithreadPublisher<TGroupMutableStats> MutableStats;
154154
};
155155

156-
TGroupRecord* Group;
156+
TStackVec<TGroupRecord*, 2, true, std::vector<TGroupRecord*>> Groups;
157157
i64 Weight;
158158
double Vruntime = 0;
159159
double Vstart;
@@ -174,8 +174,10 @@ class TSchedulerEntity {
174174
bool isThrottled = false;
175175

176176
void TrackTime(TDuration time, TMonotonic) {
177-
auto group = Group->MutableStats.Current();
178-
Group->TrackedMicroSeconds.fetch_add(time.MicroSeconds());
177+
for (auto group : Groups) {
178+
//auto current = group->MutableStats.Current();
179+
group->TrackedMicroSeconds.fetch_add(time.MicroSeconds());
180+
}
179181
}
180182

181183
void UpdateBatchTime(TDuration time) {
@@ -190,31 +192,48 @@ class TSchedulerEntity {
190192
}
191193
}
192194

193-
TMaybe<TDuration> GroupDelay(TMonotonic now) {
194-
auto group = Group->MutableStats.Current();
195-
auto limit = group.get()->Limit(now);
196-
auto tracked = Group->TrackedMicroSeconds.load();
195+
TMaybe<TDuration> GroupDelay(TMonotonic now, TGroupRecord* group) {
196+
auto current = group->MutableStats.Current();
197+
auto limit = current.get()->Limit(now);
198+
auto tracked = group->TrackedMicroSeconds.load();
197199
//double Coeff = pow(WakeupDelay, Wakeups);
198200
if (limit > tracked) {
199201
return {};
200202
} else {
201203
return Min(MaxDelay, ToDuration(/*Coeff * */(tracked - limit +
202-
Max<i64>(0, Group->DelayedSumBatches.load()) + BatchTime.MicroSeconds() +
203-
ActivationPenalty.MicroSeconds() * (Group->DelayedCount.load() + 1) +
204-
group.get()->MaxLimitDeviation) / group.get()->Weight));
204+
Max<i64>(0, group->DelayedSumBatches.load()) + BatchTime.MicroSeconds() +
205+
ActivationPenalty.MicroSeconds() * (group->DelayedCount.load() + 1) +
206+
current.get()->MaxLimitDeviation) / current.get()->Share));
207+
}
208+
}
209+
210+
TMaybe<TDuration> GroupDelay(TMonotonic now) {
211+
TMaybe<TDuration> result;
212+
for (auto group : Groups) {
213+
auto groupResult = GroupDelay(now, group);
214+
if (!result) {
215+
result = groupResult;
216+
} else if (groupResult && *result < *groupResult) {
217+
result = groupResult;
218+
}
205219
}
220+
return result;
206221
}
207222

208223
void MarkThrottled() {
209224
isThrottled = true;
210-
Group->DelayedSumBatches.fetch_add(BatchTime.MicroSeconds());
211-
Group->DelayedCount.fetch_add(1);
225+
for (auto group : Groups) {
226+
group->DelayedSumBatches.fetch_add(BatchTime.MicroSeconds());
227+
group->DelayedCount.fetch_add(1);
228+
}
212229
}
213230

214231
void MarkResumed() {
215232
isThrottled = false;
216-
Group->DelayedSumBatches.fetch_sub(BatchTime.MicroSeconds());
217-
Group->DelayedCount.fetch_sub(1);
233+
for (auto group : Groups) {
234+
group->DelayedSumBatches.fetch_sub(BatchTime.MicroSeconds());
235+
group->DelayedCount.fetch_sub(1);
236+
}
218237
}
219238
};
220239

@@ -230,14 +249,14 @@ struct TComputeScheduler::TImpl {
230249

231250
TDuration MaxDelay = TDuration::Seconds(10);
232251

233-
void AssignWeights() { }
234-
235-
void CreateGroup(TString groupName, double maxShare, NMonotonic::TMonotonic now) {
252+
void CreateGroup(double maxShare, NMonotonic::TMonotonic now, std::optional<TString> groupName = std::nullopt) {
236253
PoolId[groupName] = Records.size();
237254
auto group = std::make_unique<TSchedulerEntity::TGroupRecord>();
238255
group->MutableStats.Next()->LastNowRecalc = now;
239-
group->MutableStats.Next()->Weight = maxShare;
240-
group->Name = groupName;
256+
group->MutableStats.Next()->Share = maxShare;
257+
if (groupName) {
258+
group->Name = *groupName;
259+
}
241260
Records.push_back(std::move(group));
242261
}
243262

@@ -273,6 +292,8 @@ struct TComputeScheduler::TImpl {
273292
}
274293
}
275294
}
295+
296+
void AdvanceTime(TMonotonic now, TSchedulerEntity::TGroupRecord* record);
276297
};
277298

278299
TComputeScheduler::TComputeScheduler() {
@@ -281,69 +302,81 @@ TComputeScheduler::TComputeScheduler() {
281302

282303
TComputeScheduler::~TComputeScheduler() = default;
283304

305+
void TComputeScheduler::AddToGroup(TMonotonic now, ui64 id, TSchedulerEntityHandle& handle) {
306+
auto group = Impl->Records[id].get();
307+
(*handle).Groups.push_back(group);
308+
group->MutableStats.Next()->EntitiesWeight += (*handle).Weight;
309+
Impl->AdvanceTime(now, group);
310+
}
311+
284312
TSchedulerEntityHandle TComputeScheduler::Enroll(TString groupName, i64 weight, TMonotonic now) {
285313
Y_ENSURE(Impl->PoolId.contains(groupName), "unknown scheduler group");
286-
auto* groupEntry = Impl->Records[Impl->PoolId.at(groupName)].get();
287-
groupEntry->MutableStats.Next()->EntitiesWeight += weight;
288-
Impl->AssignWeights();
289-
AdvanceTime(now);
314+
auto id = Impl->PoolId.at(groupName);
290315

291-
auto result = std::make_unique<TSchedulerEntity>();
292-
result->Group = groupEntry;
293-
result->Weight = weight;
294-
result->MaxDelay = Impl->MaxDelay;
316+
TSchedulerEntityHandle result{new TSchedulerEntity()};
317+
(*result).Weight = weight;
318+
(*result).MaxDelay = Impl->MaxDelay;
295319

296-
return TSchedulerEntityHandle(result.release());
320+
AddToGroup(now, id, result);
321+
return result;
297322
}
298323

299-
void TComputeScheduler::AdvanceTime(TMonotonic now) {
300-
for (size_t i = 0; i < Impl->Records.size(); ++i) {
301-
auto* record = Impl->Records[i].get();
302-
if (Impl->Counters) {
303-
record->InitCounters(Impl->Counters);
324+
void TComputeScheduler::TImpl::AdvanceTime(TMonotonic now, TSchedulerEntity::TGroupRecord* record) {
325+
if (Counters) {
326+
record->InitCounters(Counters);
327+
}
328+
auto& v = record->MutableStats;
329+
{
330+
auto group = v.Current();
331+
if (group.get()->LastNowRecalc > now) {
332+
return;
333+
}
334+
double delta = 0;
335+
336+
auto tracked = record->TrackedMicroSeconds.load();
337+
v.Next()->MaxLimitDeviation = SmoothPeriod.MicroSeconds() * v.Next()->Share;
338+
v.Next()->LastNowRecalc = now;
339+
v.Next()->TrackedBefore =
340+
Max<ssize_t>(
341+
tracked - FromDuration(ForgetInteval) * group.get()->Share,
342+
Min<ssize_t>(group.get()->Limit(now) - group.get()->MaxLimitDeviation, tracked));
343+
344+
if (!group.get()->Disabled && group.get()->EntitiesWeight > 0) {
345+
delta = FromDuration(now - group.get()->LastNowRecalc) * group.get()->Share / group.get()->EntitiesWeight;
346+
v.Next()->MaxDeviation = (FromDuration(SmoothPeriod) * v.Next()->Share) / v.Next()->EntitiesWeight;
304347
}
305-
auto& v = record->MutableStats;
306-
{
307-
auto group = v.Current();
308-
if (group.get()->LastNowRecalc > now) {
309-
continue;
310-
}
311-
double delta = 0;
312-
313-
auto tracked = Impl->Records[i]->TrackedMicroSeconds.load();
314-
v.Next()->MaxLimitDeviation = Impl->SmoothPeriod.MicroSeconds() * v.Next()->Weight;
315-
v.Next()->LastNowRecalc = now;
316-
v.Next()->TrackedBefore =
317-
Max<ssize_t>(
318-
tracked - FromDuration(Impl->ForgetInteval) * group.get()->Weight,
319-
Min<ssize_t>(group.get()->Limit(now) - group.get()->MaxLimitDeviation, tracked));
320-
321-
if (!group.get()->Disabled && group.get()->EntitiesWeight > 0) {
322-
delta = FromDuration(now - group.get()->LastNowRecalc) * group.get()->Weight / group.get()->EntitiesWeight;
323-
v.Next()->MaxDeviation = (FromDuration(Impl->SmoothPeriod) * v.Next()->Weight) / v.Next()->EntitiesWeight;
324-
}
325348

326-
if (Impl->Records[i]->Vtime) {
327-
record->SchedulerLimitUs->Set(group.get()->Limit(now));
328-
record->SchedulerTrackedUs->Set(Impl->Records[i]->TrackedMicroSeconds.load());
329-
record->SchedulerClock->Add(now.MicroSeconds() - group.get()->LastNowRecalc.MicroSeconds());
330-
record->Vtime->Add(delta);
331-
record->EntitiesWeight->Set(v.Next()->EntitiesWeight);
332-
record->Limit->Add(FromDuration(now - group.get()->LastNowRecalc) * group.get()->Weight);
333-
record->Weight->Set(group.get()->Weight);
334-
}
349+
if (record->Vtime) {
350+
record->SchedulerLimitUs->Set(group.get()->Limit(now));
351+
record->SchedulerTrackedUs->Set(record->TrackedMicroSeconds.load());
352+
record->SchedulerClock->Add(now.MicroSeconds() - group.get()->LastNowRecalc.MicroSeconds());
353+
record->Vtime->Add(delta);
354+
record->EntitiesWeight->Set(v.Next()->EntitiesWeight);
355+
record->Limit->Add(FromDuration(now - group.get()->LastNowRecalc) * group.get()->Share);
356+
record->Weight->Set(group.get()->Share);
335357
}
336-
v.Publish();
358+
}
359+
v.Publish();
360+
}
361+
362+
void TComputeScheduler::AdvanceTime(TMonotonic now) {
363+
for (size_t i = 0; i < Impl->Records.size(); ++i) {
364+
Impl->AdvanceTime(now, Impl->Records[i].get());
337365
}
338366
Impl->CompactGroups();
339367
}
340368

341-
void TComputeScheduler::Deregister(TSchedulerEntity& self, TMonotonic now) {
342-
auto* group = self.Group->MutableStats.Next();
343-
group->EntitiesWeight -= self.Weight;
369+
void TComputeScheduler::Deregister(TSchedulerEntityHandle& self, TMonotonic now) {
370+
for (auto group : (*self).Groups) {
371+
auto* next = group->MutableStats.Next();
372+
next->EntitiesWeight -= (*self).Weight;
373+
Impl->AdvanceTime(now, group);
374+
}
375+
}
344376

345-
Impl->AssignWeights();
346-
AdvanceTime(now);
377+
ui64 TComputeScheduler::MakeAnonymousGroup(TMonotonic now, double share) {
378+
Impl->CreateGroup(share, now);
379+
return Impl->Records.size() - 1;
347380
}
348381

349382
void TSchedulerEntityHandle::TrackTime(TDuration time, TMonotonic now) {
@@ -390,7 +423,7 @@ bool TComputeScheduler::Disabled(TString group) {
390423

391424
bool TComputeScheduler::Disable(TString group, TMonotonic now) {
392425
auto ptr = Impl->PoolId.FindPtr(group);
393-
if (Impl->Records[*ptr]->MutableStats.Current().get()->Weight > 0) {
426+
if (Impl->Records[*ptr]->MutableStats.Current().get()->Share > 0) {
394427
return false;
395428
}
396429
Impl->Records[*ptr]->MutableStats.Next()->Disabled = true;
@@ -401,10 +434,10 @@ bool TComputeScheduler::Disable(TString group, TMonotonic now) {
401434
void TComputeScheduler::UpdateMaxShare(TString group, double share, TMonotonic now) {
402435
auto ptr = Impl->PoolId.FindPtr(group);
403436
if (!ptr) {
404-
Impl->CreateGroup(group, share, now);
437+
Impl->CreateGroup(share, now, group);
405438
} else {
406439
auto& record = Impl->Records[*ptr];
407-
record->MutableStats.Next()->Weight = share;
440+
record->MutableStats.Next()->Share = share;
408441
}
409442
AdvanceTime(now);
410443
}
@@ -475,7 +508,7 @@ class TSchedulerActor : public TActorBootstrapped<TSchedulerActor> {
475508

476509
void Handle(TEvSchedulerDeregister::TPtr& ev) {
477510
if (ev->Get()->SchedulerEntity) {
478-
Opts.Scheduler->Deregister(*ev->Get()->SchedulerEntity, TlsActivationContext->Monotonic());
511+
Opts.Scheduler->Deregister(ev->Get()->SchedulerEntity, TlsActivationContext->Monotonic());
479512
}
480513
}
481514

ydb/core/kqp/runtime/kqp_compute_scheduler.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ class TComputeScheduler {
6161

6262
void ReportCounters(TIntrusivePtr<TKqpCounters>);
6363

64-
void UpdateMaxShare(TString, double, TMonotonic now);
64+
void UpdateMaxShare(TString name, double share, TMonotonic now);
65+
ui64 MakeAnonymousGroup(TMonotonic now, double share);
66+
void AddToGroup(TMonotonic now, ui64, TSchedulerEntityHandle&);
6567

6668
void SetMaxDeviation(TDuration);
6769
void SetForgetInterval(TDuration);
@@ -71,7 +73,7 @@ class TComputeScheduler {
7173

7274
void AdvanceTime(TMonotonic now);
7375

74-
void Deregister(TSchedulerEntity& self, TMonotonic now);
76+
void Deregister(TSchedulerEntityHandle& self, TMonotonic now);
7577

7678
bool Disabled(TString group);
7779
bool Disable(TString group, TMonotonic now);

ydb/core/protos/kqp.proto

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -552,7 +552,9 @@ message TEvStartKqpTasksRequest {
552552
optional string SchedulerGroup = 9;
553553
optional double MemoryPoolPercent = 10 [default = 100];
554554
optional string Database = 11;
555-
optional double MaxCpuShare = 12;
555+
556+
optional double PoolMaxCpuShare = 12;
557+
optional double QueryCpuShare = 13;
556558
}
557559

558560
message TEvStartKqpTasksResponse {

ydb/core/resource_pools/resource_pool_settings.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ std::unordered_map<TString, TPoolSettings::TProperty> TPoolSettings::GetProperti
4949
{"queue_size", &QueueSize},
5050
{"query_memory_limit_percent_per_node", &QueryMemoryLimitPercentPerNode},
5151
{"database_load_cpu_threshold", &DatabaseLoadCpuThreshold},
52-
{"total_cpu_limit_percent_per_node", &TotalCpuLimitPercentPerNode}
52+
{"total_cpu_limit_percent_per_node", &TotalCpuLimitPercentPerNode},
53+
{"query_cpu_limit_percent_per_node", &QueryCpuLimitPercentPerNode},
5354
};
5455
if (!restricted) {
5556
properties.insert({"query_cancel_after_seconds", &QueryCancelAfter});

ydb/core/resource_pools/resource_pool_settings.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ struct TPoolSettings : public TSettingsBase {
3636
TPercent QueryMemoryLimitPercentPerNode = -1; // Percent from node memory capacity, -1 = disabled
3737
TPercent DatabaseLoadCpuThreshold = -1; // -1 = disabled
3838
TPercent TotalCpuLimitPercentPerNode = -1; // -1 = disabled
39+
TPercent QueryCpuLimitPercentPerNode = -1; // -1 = disabled;
3940
};
4041

4142
} // namespace NKikimr::NResourcePool

0 commit comments

Comments
 (0)