Skip to content

Commit 97995cf

Browse files
committed
implement groups compacting
1 parent c75b079 commit 97995cf

File tree

2 files changed

+86
-49
lines changed

2 files changed

+86
-49
lines changed

ydb/core/kqp/runtime/kqp_compute_scheduler.cpp

Lines changed: 85 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@ namespace {
1717
return TDuration::MicroSeconds(t);
1818
}
1919

20-
static constexpr double MinEntitiesWeight = 1e-8;
21-
2220
static constexpr TDuration AvgBatch = TDuration::MicroSeconds(100);
2321
}
2422

@@ -110,7 +108,7 @@ class TSchedulerEntity {
110108
double Weight = 0;
111109
TMonotonic LastNowRecalc;
112110
bool Disabled = false;
113-
double EntitiesWeight = 0;
111+
i64 EntitiesWeight = 0;
114112
double MaxDeviation = 0;
115113
double MaxLimitDeviation = 0;
116114

@@ -128,11 +126,37 @@ class TSchedulerEntity {
128126

129127
double Share;
130128

129+
::NMonitoring::TDynamicCounters::TCounterPtr Vtime;
130+
::NMonitoring::TDynamicCounters::TCounterPtr EntitiesWeight;
131+
::NMonitoring::TDynamicCounters::TCounterPtr Limit;
132+
::NMonitoring::TDynamicCounters::TCounterPtr Weight;
133+
134+
::NMonitoring::TDynamicCounters::TCounterPtr SchedulerClock;
135+
::NMonitoring::TDynamicCounters::TCounterPtr SchedulerLimitUs;
136+
::NMonitoring::TDynamicCounters::TCounterPtr SchedulerTrackedUs;
137+
138+
TString Name;
139+
140+
void InitCounters(const TIntrusivePtr<TKqpCounters>& counters) {
141+
if (Vtime || !Name) {
142+
return;
143+
}
144+
145+
auto group = counters->GetKqpCounters()->GetSubgroup("NodeScheduler/Group", Name);
146+
Vtime = group->GetCounter("VTime", true);
147+
EntitiesWeight = group->GetCounter("Entities", false);
148+
Limit = group->GetCounter("Limit", true);
149+
Weight = group->GetCounter("Weight", false);
150+
SchedulerClock = group->GetCounter("Clock", false);
151+
SchedulerTrackedUs = group->GetCounter("Tracked", true);
152+
SchedulerLimitUs = group->GetCounter("AbsoluteLimit", true);
153+
}
154+
131155
TMultithreadPublisher<TGroupMutableStats> MutableStats;
132156
};
133157

134158
TGroupRecord* Group;
135-
double Weight;
159+
i64 Weight;
136160
double Vruntime = 0;
137161
double Vstart;
138162

@@ -197,15 +221,6 @@ class TSchedulerEntity {
197221
};
198222

199223
struct TComputeScheduler::TImpl {
200-
TVector<::NMonitoring::TDynamicCounters::TCounterPtr> VtimeCounters;
201-
TVector<::NMonitoring::TDynamicCounters::TCounterPtr> EntitiesWeightCounters;
202-
TVector<::NMonitoring::TDynamicCounters::TCounterPtr> LimitCounters;
203-
TVector<::NMonitoring::TDynamicCounters::TCounterPtr> WeightCounters;
204-
205-
TVector<::NMonitoring::TDynamicCounters::TCounterPtr> SchedulerClock;
206-
TVector<::NMonitoring::TDynamicCounters::TCounterPtr> SchedulerLimitUs;
207-
TVector<::NMonitoring::TDynamicCounters::TCounterPtr> SchedulerTrackedUs;
208-
209224
THashMap<TString, size_t> PoolId;
210225
std::vector<std::unique_ptr<TSchedulerEntity::TGroupRecord>> Records;
211226

@@ -217,18 +232,57 @@ struct TComputeScheduler::TImpl {
217232

218233
TDuration MaxDelay = TDuration::Seconds(10);
219234

235+
void AssignWeight(TSchedulerEntity::TGroupRecord* record) {
236+
record->MutableStats.Next()->Weight = SumCores * record->Share;
237+
}
238+
220239
void AssignWeights() {
221240
for (auto& record : Records) {
222-
record->MutableStats.Next()->Weight = SumCores * record->Share;
241+
AssignWeight(record.get());
223242
}
224243
}
225244

226245
void CreateGroup(TString groupName, double maxShare) {
227246
PoolId[groupName] = Records.size();
228247
auto group = std::make_unique<TSchedulerEntity::TGroupRecord>();
229248
group->Share = maxShare;
249+
group->Name = groupName;
250+
AssignWeight(group.get());
230251
Records.push_back(std::move(group));
231252
}
253+
254+
void CompactGroups() {
255+
std::vector<i64> remap;
256+
std::vector<std::unique_ptr<TSchedulerEntity::TGroupRecord>> records;
257+
258+
std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> vtimeCounters;
259+
std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> entitiesWeightCounters;
260+
std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> limitCounters;
261+
std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> weightCounters;
262+
263+
std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> schedulerClock;
264+
std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> schedulerLimitUs;
265+
std::vector<::NMonitoring::TDynamicCounters::TCounterPtr> schedulerTrackedUs;
266+
267+
for (size_t i = 0; i < Records.size(); ++i) {
268+
auto record = Records[i]->MutableStats.Current();
269+
if (record.get()->Disabled || record.get()->EntitiesWeight == 0) {
270+
// to delete
271+
remap.push_back(-1);
272+
} else {
273+
records.emplace_back(std::move(Records[i]));
274+
remap.push_back(i);
275+
}
276+
}
277+
278+
Records.swap(records);
279+
280+
for (auto& [k, v] : PoolId) {
281+
if (remap[v] >= 0) {
282+
v = remap[v];
283+
}
284+
}
285+
}
232286
};
233287

234288
TComputeScheduler::TComputeScheduler() {
@@ -237,7 +291,7 @@ TComputeScheduler::TComputeScheduler() {
237291

238292
TComputeScheduler::~TComputeScheduler() = default;
239293

240-
TSchedulerEntityHandle TComputeScheduler::Enroll(TString groupName, double weight, TMonotonic now) {
294+
TSchedulerEntityHandle TComputeScheduler::Enroll(TString groupName, i64 weight, TMonotonic now) {
241295
Y_ENSURE(Impl->PoolId.contains(groupName), "unknown scheduler group");
242296
auto* groupEntry = Impl->Records[Impl->PoolId.at(groupName)].get();
243297
groupEntry->MutableStats.Next()->EntitiesWeight += weight;
@@ -253,30 +307,12 @@ TSchedulerEntityHandle TComputeScheduler::Enroll(TString groupName, double weigh
253307
}
254308

255309
void TComputeScheduler::AdvanceTime(TMonotonic now) {
256-
if (Impl->Counters) {
257-
if (Impl->VtimeCounters.size() < Impl->Records.size()) {
258-
Impl->VtimeCounters.resize(Impl->Records.size());
259-
Impl->EntitiesWeightCounters.resize(Impl->Records.size());
260-
Impl->LimitCounters.resize(Impl->Records.size());
261-
Impl->WeightCounters.resize(Impl->Records.size());
262-
Impl->SchedulerClock.resize(Impl->Records.size());
263-
Impl->SchedulerLimitUs.resize(Impl->Records.size());
264-
Impl->SchedulerTrackedUs.resize(Impl->Records.size());
265-
266-
for (auto& [k, i] : Impl->PoolId) {
267-
auto group = Impl->Counters->GetKqpCounters()->GetSubgroup("NodeScheduler/Group", k);
268-
Impl->VtimeCounters[i] = group->GetCounter("VTime", true);
269-
Impl->EntitiesWeightCounters[i] = group->GetCounter("Entities", false);
270-
Impl->LimitCounters[i] = group->GetCounter("Limit", true);
271-
Impl->WeightCounters[i] = group->GetCounter("Weight", false);
272-
Impl->SchedulerClock[i] = group->GetCounter("Clock", false);
273-
Impl->SchedulerTrackedUs[i] = group->GetCounter("Tracked", true);
274-
Impl->SchedulerLimitUs[i] = group->GetCounter("AbsoluteLimit", true);
275-
}
276-
}
277-
}
278310
for (size_t i = 0; i < Impl->Records.size(); ++i) {
279-
auto& v = Impl->Records[i]->MutableStats;
311+
auto* record = Impl->Records[i].get();
312+
if (Impl->Counters) {
313+
record->InitCounters(Impl->Counters);
314+
}
315+
auto& v = record->MutableStats;
280316
{
281317
auto group = v.Current();
282318
if (group.get()->LastNowRecalc > now) {
@@ -292,23 +328,24 @@ void TComputeScheduler::AdvanceTime(TMonotonic now) {
292328
tracked - FromDuration(Impl->ForgetInteval) * group.get()->Weight,
293329
Min<ssize_t>(group.get()->Limit(now) - group.get()->MaxLimitDeviation, tracked));
294330

295-
if (!group.get()->Disabled && group.get()->EntitiesWeight > MinEntitiesWeight) {
331+
if (!group.get()->Disabled && group.get()->EntitiesWeight > 0) {
296332
delta = FromDuration(now - group.get()->LastNowRecalc) * group.get()->Weight / group.get()->EntitiesWeight;
297333
v.Next()->MaxDeviation = (FromDuration(Impl->SmoothPeriod) * v.Next()->Weight) / v.Next()->EntitiesWeight;
298334
}
299335

300-
if (Impl->VtimeCounters.size() > i && Impl->VtimeCounters[i]) {
301-
Impl->SchedulerLimitUs[i]->Set(group.get()->Limit(now));
302-
Impl->SchedulerTrackedUs[i]->Set(Impl->Records[i]->TrackedMicroSeconds.load());
303-
Impl->SchedulerClock[i]->Add(now.MicroSeconds() - group.get()->LastNowRecalc.MicroSeconds());
304-
Impl->VtimeCounters[i]->Add(delta);
305-
Impl->EntitiesWeightCounters[i]->Set(v.Next()->EntitiesWeight);
306-
Impl->LimitCounters[i]->Add(FromDuration(now - group.get()->LastNowRecalc) * group.get()->Weight);
307-
Impl->WeightCounters[i]->Set(group.get()->Weight);
336+
if (Impl->Records[i]->Vtime) {
337+
record->SchedulerLimitUs->Set(group.get()->Limit(now));
338+
record->SchedulerTrackedUs->Set(Impl->Records[i]->TrackedMicroSeconds.load());
339+
record->SchedulerClock->Add(now.MicroSeconds() - group.get()->LastNowRecalc.MicroSeconds());
340+
record->Vtime->Add(delta);
341+
record->EntitiesWeight->Set(v.Next()->EntitiesWeight);
342+
record->Limit->Add(FromDuration(now - group.get()->LastNowRecalc) * group.get()->Weight);
343+
record->Weight->Set(group.get()->Weight);
308344
}
309345
}
310346
v.Publish();
311347
}
348+
Impl->CompactGroups();
312349
}
313350

314351
void TComputeScheduler::Deregister(TSchedulerEntity& self, TMonotonic now) {
@@ -363,7 +400,7 @@ bool TComputeScheduler::Disabled(TString group) {
363400

364401
bool TComputeScheduler::Disable(TString group, TMonotonic now) {
365402
auto ptr = Impl->PoolId.FindPtr(group);
366-
if (Impl->Records[*ptr]->MutableStats.Current().get()->Weight > MinEntitiesWeight) {
403+
if (Impl->Records[*ptr]->MutableStats.Current().get()->Weight > 0) {
367404
return false;
368405
}
369406
Impl->Records[*ptr]->MutableStats.Next()->Disabled = true;

ydb/core/kqp/runtime/kqp_compute_scheduler.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class TComputeScheduler {
6969
void SetForgetInterval(TDuration);
7070
::NMonitoring::TDynamicCounters::TCounterPtr GetGroupUsageCounter(TString group) const;
7171

72-
TSchedulerEntityHandle Enroll(TString group, double weight, TMonotonic now);
72+
TSchedulerEntityHandle Enroll(TString group, i64 weight, TMonotonic now);
7373

7474
void AdvanceTime(TMonotonic now);
7575

0 commit comments

Comments
 (0)