Skip to content

Commit 13d1d78

Browse files
committed
empty priorities adjustment
1 parent 4ae940d commit 13d1d78

File tree

4 files changed

+100
-79
lines changed

4 files changed

+100
-79
lines changed

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,9 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
182182

183183
private:
184184
STATEFN(WorkState) {
185-
Scheduler.AdvanceTime(TlsActivationContext->Monotonic());
185+
Y_DEFER {
186+
Scheduler.AdvanceTime(TlsActivationContext->Monotonic());
187+
};
186188
switch (ev->GetTypeRewrite()) {
187189
hFunc(TEvKqpNode::TEvStartKqpTasksRequest, HandleWork);
188190
hFunc(TEvFinishKqpTask, HandleWork); // used only for unit tests
@@ -435,6 +437,8 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
435437

436438
TComputeStagesWithScan computesByStage;
437439

440+
auto now = TlsActivationContext->Monotonic();
441+
438442
// start compute actors
439443
for (int i = 0; i < msg.GetTasks().size(); ++i) {
440444
auto& dqTask = *msg.MutableTasks(i);
@@ -495,13 +499,19 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
495499
}
496500

497501
TComputeActorSchedulingOptions schedulingOptions {
502+
.Now = now,
498503
.NodeService = SelfId(),
499504
.Scheduler = &Scheduler,
500505
.Group = msg.GetRuntimeSettings().GetExecType() == NYql::NDqProto::TComputeRuntimeSettings::SCAN ? "olap" : "",
501506
.Weight = 1,
502-
.NoThrottle = false//msg.GetRuntimeSettings().GetExecType() == NYql::NDqProto::TComputeRuntimeSettings::DATA,
507+
.NoThrottle = false,
503508
};
504509

510+
if (msg.GetRuntimeSettings().GetExecType() == NYql::NDqProto::TComputeRuntimeSettings::DATA) {
511+
schedulingOptions.NoThrottle = true;
512+
schedulingOptions.Scheduler = nullptr;
513+
}
514+
505515
IActor* computeActor;
506516
if (tableKind == ETableKind::Datashard || tableKind == ETableKind::Olap) {
507517
auto& info = computesByStage.UpsertTaskWithScan(dqTask, meta, !AppData()->FeatureFlags.GetEnableSeparationComputeActorsFromRead());
@@ -657,10 +667,25 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
657667

658668
void SetPriorities(const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration& conf) {
659669
std::function<TComputeScheduler::TDistributionRule(const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration&)> convert
660-
= [](const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration&)
670+
= [&](const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration& conf)
661671
{
672+
if (conf.HasName()) {
673+
return TComputeScheduler::TDistributionRule{.Share = conf.GetMaxCpuShare(), .Name = conf.GetName()};
674+
} else if (conf.HasSubPoolsConfiguration()) {
675+
auto res = TComputeScheduler::TDistributionRule{.Share = conf.GetMaxCpuShare()};
676+
for (auto& subConf : conf.GetSubPoolsConfiguration().GetSubPools()) {
677+
res.SubRules.push_back(convert(subConf));
678+
}
679+
return res;
680+
} else {
681+
Y_ENSURE(false, "unknown case");
682+
}
662683
};
663-
Scheduler.SetPriorities(convert(conf), 1, TlsActivationContext->Monotonic());
684+
auto converted = convert(conf);
685+
686+
auto threads = TlsActivationContext->ActorSystem()->GetPoolThreadsCount(SelfId().PoolID());
687+
Y_ENSURE(threads.has_value());
688+
Scheduler.SetPriorities(converted.empty() ? TComputeScheduler::TDistributionRule{.Share = 1, .Name = "olap"} : converted, threads.value(), TlsActivationContext->Monotonic());
664689
}
665690

666691
void SetIteratorReadsRetrySettings(const NKikimrConfig::TTableServiceConfig::TIteratorReadsRetrySettings& settings) {

ydb/core/kqp/runtime/kqp_compute_scheduler.cpp

Lines changed: 40 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ namespace {
99
return TDuration::MicroSeconds(t);
1010
}
1111

12-
static constexpr double MinPriority = 1e-8;
12+
static constexpr double MinEntitiesWeight = 1e-8;
1313

1414
}
1515

@@ -126,26 +126,45 @@ struct TComputeScheduler::TImpl {
126126

127127
struct TRule {
128128
size_t Parent;
129-
double Weight;
129+
double Weight = 0;
130130

131+
double Share;
131132
TMaybe<size_t> RecordId = {};
132133
double SubRulesSum = 0;
133134
bool Empty = true;
134135
};
135-
size_t RootRule;
136136
std::vector<TRule> Rules;
137137

138138
double SumCores;
139139

140-
void AssignWeights(TMonotonic now) {
140+
void AssignWeights() {
141+
ssize_t rootRule = static_cast<ssize_t>(Rules.size()) - 1;
141142
for (size_t i = 0; i < Rules.size(); ++i) {
143+
Rules[i].SubRulesSum = 0;
144+
Rules[i].Empty = true;
145+
}
146+
for (ssize_t i = 0; i < static_cast<ssize_t>(Rules.size()); ++i) {
142147
if (Rules[i].RecordId) {
143-
if (Records[*Rules[i].RecordId]->Next()->EntitiesWeight < MinPriority) { // mincores
144-
}
148+
Rules[i].Empty = Records[*Rules[i].RecordId]->Next()->EntitiesWeight < MinEntitiesWeight;
149+
}
150+
if (i != rootRule && !Rules[i].Empty) {
151+
Rules[Rules[i].Parent].Empty = false;
152+
Rules[Rules[i].Parent].SubRulesSum += Rules[i].Weight;
153+
}
154+
}
155+
for (ssize_t i = static_cast<ssize_t>(Rules.size()) - 1; i >= 0; --i) {
156+
if (i == static_cast<ssize_t>(Rules.size()) - 1) {
157+
Rules[i].Weight = SumCores * Rules[i].Share;
158+
} else if (!Rules[i].Empty) {
159+
Rules[i].Weight = Rules[Rules[i].Parent].Weight * Rules[i].Share / Rules[i].SubRulesSum;
145160
} else {
161+
Rules[i].Weight = 0;
162+
}
163+
if (Rules[i].RecordId) {
164+
Records[*Rules[i].RecordId]->Next()->Weight = Rules[i].Weight;
146165
}
147166
}
148-
}
167+
}
149168
};
150169

151170
TComputeScheduler::TComputeScheduler() {
@@ -178,7 +197,7 @@ void TComputeScheduler::SetPriorities(TDistributionRule rule, double cores, TMon
178197
}
179198
for (auto& [k, v] : Impl->PoolId) {
180199
if (!seenNames.contains(k)) {
181-
auto* group = Impl->Records[Impl->PoolId[v]].get();
200+
auto* group = Impl->Records[Impl->PoolId[k]].get();
182201
group->Next()->Weight = 0;
183202
group->Next()->Disabled = true;
184203
group->Publish();
@@ -191,63 +210,27 @@ void TComputeScheduler::SetPriorities(TDistributionRule rule, double cores, TMon
191210
size_t result;
192211
if (rule.SubRules.empty()) {
193212
result = rules.size();
194-
rules.push_back(TImpl::TRule{.Weight = rule.Share, .RecordId=Impl->PoolId[rule.Name]});
213+
rules.push_back(TImpl::TRule{.Share = rule.Share, .RecordId=Impl->PoolId[rule.Name]});
195214
} else {
196215
TVector<size_t> toAssign;
197216
for (auto& subRule : rule.SubRules) {
198217
toAssign.push_back(makeRules(subRule));
199218
}
200219
size_t result = rules.size();
201-
rules.push_back(TImpl::TRule{.Weight = rule.Share});
220+
rules.push_back(TImpl::TRule{.Share = rule.Share});
202221
for (auto i : toAssign) {
203222
rules[i].Parent = result;
204223
}
205224
return result;
206225
}
207226
return result;
208227
};
209-
Impl->RootRule = makeRules(rule);
210228
Impl->Rules.swap(rules);
211229

212-
// if (ptr) {
213-
// v->Next()->Weight = ((*ptr) * cores) / sum;
214-
// v->Next()->Disabled = false;
215-
// } else {
216-
// v->Next()->Weight = 0;
217-
// v->Next()->Disabled = true;
218-
// }
219-
// v->Publish();
220-
//}
221-
222-
Impl->AssignWeights(now);
230+
Impl->AssignWeights();
223231
for (auto& record : Impl->Records) {
224232
record->Publish();
225233
}
226-
227-
//double sum = 0;
228-
//for (auto [_, v] : priorities) {
229-
// sum += v;
230-
//}
231-
//for (auto& [k, v] : Impl->Groups) {
232-
// auto ptr = priorities.FindPtr(k);
233-
// if (ptr) {
234-
// v->Next()->Weight = ((*ptr) * cores) / sum;
235-
// v->Next()->Disabled = false;
236-
// } else {
237-
// v->Next()->Weight = 0;
238-
// v->Next()->Disabled = true;
239-
// }
240-
// v->Publish();
241-
//}
242-
//for (auto& [k, v] : priorities) {
243-
// if (!Impl->Groups.contains(k)) {
244-
// auto group = ;
245-
// group->Next()->LastNowRecalc = TMonotonic::Now();
246-
// group->Next()->Weight = (v * cores) / sum;
247-
// group->Publish();
248-
// Impl->Groups[k] = std::move(group);
249-
// }
250-
//}
251234
}
252235

253236

@@ -257,15 +240,20 @@ double TComputeScheduler::GroupNow(TSchedulerEntity& self, TMonotonic now) {
257240
}
258241

259242

260-
TSchedulerEntityHandle TComputeScheduler::Enroll(TString groupName, double weight) {
261-
Y_ENSURE(Impl->Groups.contains(groupName), "unknown scheduler group");
262-
auto* groupEntry = Impl->Groups[groupName].get();
243+
TSchedulerEntityHandle TComputeScheduler::Enroll(TString groupName, double weight, TMonotonic now) {
244+
Y_ENSURE(Impl->PoolId.contains(groupName), "unknown scheduler group");
245+
auto* groupEntry = Impl->Records[Impl->PoolId.at(groupName)].get();
263246
auto group = groupEntry->Current();
264247
auto result = std::make_unique<TSchedulerEntity>();
265248
result->Group = groupEntry;
266249
result->Weight = weight;
267250
result->Vstart = group.get()->Now;
268-
groupEntry->Next()->EntitiesWeight += weight;
251+
if (groupEntry->Next()->EntitiesWeight < MinEntitiesWeight) {
252+
groupEntry->Next()->EntitiesWeight += weight;
253+
AdvanceTime(now);
254+
} else {
255+
groupEntry->Next()->EntitiesWeight += weight;
256+
}
269257
return TSchedulerEntityHandle(result.release());
270258
}
271259

@@ -274,7 +262,7 @@ void TComputeScheduler::AdvanceTime(TMonotonic now) {
274262
for (auto& v : Impl->Records) {
275263
{
276264
auto group = v.get()->Current();
277-
if (!group.get()->Disabled && group.get()->EntitiesWeight > MinPriority) {
265+
if (!group.get()->Disabled && group.get()->EntitiesWeight > MinEntitiesWeight) {
278266
v.get()->Next()->Now += FromDuration(now - group.get()->LastNowRecalc) * group.get()->Weight / group.get()->EntitiesWeight;
279267
}
280268
v.get()->Next()->LastNowRecalc = now;

ydb/core/kqp/runtime/kqp_compute_scheduler.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ class TComputeScheduler {
4444
double Share;
4545
TString Name;
4646
TVector<TDistributionRule> SubRules;
47+
48+
bool empty() {
49+
return SubRules.empty() && Name.empty();
50+
}
4751
};
4852

4953
public:
@@ -52,7 +56,7 @@ class TComputeScheduler {
5256

5357
void SetPriorities(TDistributionRule rootRule, double cores, TMonotonic now);
5458

55-
TSchedulerEntityHandle Enroll(TString group, double weight);
59+
TSchedulerEntityHandle Enroll(TString group, double weight, TMonotonic now);
5660

5761
void AdvanceTime(TMonotonic now);
5862

@@ -70,6 +74,7 @@ class TComputeScheduler {
7074
};
7175

7276
struct TComputeActorSchedulingOptions {
77+
TMonotonic Now;
7378
NActors::TActorId NodeService;
7479
TComputeScheduler* Scheduler = nullptr;
7580
TString Group = "";
@@ -98,7 +103,7 @@ class TSchedulableComputeActorBase : public NYql::NDq::TDqSyncComputeActorBase<T
98103
private:
99104
using TBase = NYql::NDq::TDqSyncComputeActorBase<TDerived>;
100105

101-
static constexpr TDuration MaxDelay = TDuration::Seconds(1);
106+
static constexpr TDuration MaxDelay = TDuration::MilliSeconds(60);
102107

103108
public:
104109
template<typename... TArgs>
@@ -108,7 +113,7 @@ class TSchedulableComputeActorBase : public NYql::NDq::TDqSyncComputeActorBase<T
108113
, NoThrottle(options.NoThrottle)
109114
{
110115
if (Scheduler) {
111-
SelfHandle = Scheduler->Enroll(options.Group, options.Weight);
116+
SelfHandle = Scheduler->Enroll(options.Group, options.Weight, options.Now);
112117
}
113118
}
114119

ydb/core/kqp/runtime/kqp_scheduler_ut.cpp

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ Y_UNIT_TEST_SUITE(TKqpComputeScheduler) {
1414
TDuration Cuanta;
1515
};
1616

17-
TVector<TDuration> RunSimulation(TComputeScheduler& scheduler, TVector<TProcess> processes, TDuration time, size_t executionUnits) {
18-
auto start = TMonotonic::Now();
17+
TVector<TDuration> RunSimulation(TComputeScheduler& scheduler, TVector<TProcess> processes, TDuration time, size_t executionUnits, TMonotonic start) {
1918
TMonotonic now = start;
2019
TMonotonic deadline = now + time;
2120
scheduler.AdvanceTime(now);
@@ -38,7 +37,7 @@ Y_UNIT_TEST_SUITE(TKqpComputeScheduler) {
3837
TVector<double> groupnow(processes.size());
3938

4039
for (size_t i = 0; i < processes.size(); ++i) {
41-
handles[i] = scheduler.Enroll(processes[i].Group, processes[i].Weight);
40+
handles[i] = scheduler.Enroll(processes[i].Group, processes[i].Weight, now);
4241
runQueue.push_back(i);
4342
}
4443

@@ -106,14 +105,16 @@ Y_UNIT_TEST_SUITE(TKqpComputeScheduler) {
106105

107106
Y_UNIT_TEST(SingleCoreSimple) {
108107
NKikimr::NKqp::TComputeScheduler scheduler;
109-
THashMap<TString, double> priorities;
110-
priorities["first"] = 1;
111-
priorities["second"] = 1;
112-
scheduler.SetPriorities(priorities, 1);
108+
auto start = TMonotonic::Now();
109+
110+
TComputeScheduler::TDistributionRule rule;
111+
rule.SubRules.push_back({.Share = 1, .Name = "first"});
112+
rule.SubRules.push_back({.Share = 1, .Name = "second"});
113+
scheduler.SetPriorities(rule, 1, start);
113114

114115
TDuration all = TDuration::Seconds(10);
115116

116-
auto result = RunSimulation(scheduler, {{"first", 1, TDuration::MilliSeconds(10)}, {"first", 1, TDuration::MilliSeconds(10)}}, all, 1);
117+
auto result = RunSimulation(scheduler, {{"first", 1, TDuration::MilliSeconds(10)}, {"first", 1, TDuration::MilliSeconds(10)}}, all, 1, start);
117118

118119
for (auto t : result) {
119120
AssertEq(t, all/4, TDuration::MilliSeconds(20));
@@ -122,15 +123,16 @@ Y_UNIT_TEST_SUITE(TKqpComputeScheduler) {
122123

123124
Y_UNIT_TEST(SingleCoreThird) {
124125
NKikimr::NKqp::TComputeScheduler scheduler;
125-
THashMap<TString, double> priorities;
126-
priorities["first"] = 1;
127-
priorities["second"] = 1;
128-
scheduler.SetPriorities(priorities, 1);
126+
auto start = TMonotonic::Now();
129127

130-
TDuration all = TDuration::Seconds(10);
128+
TComputeScheduler::TDistributionRule rule;
129+
rule.SubRules.push_back({.Share = 1, .Name = "first"});
130+
rule.SubRules.push_back({.Share = 1, .Name = "second"});
131+
scheduler.SetPriorities(rule, 1, start);
131132

133+
TDuration all = TDuration::Seconds(10);
132134

133-
auto result = RunSimulation(scheduler, {{"first", 1, TDuration::MilliSeconds(10)}, {"first", 2, TDuration::MilliSeconds(10)}}, all, 1);
135+
auto result = RunSimulation(scheduler, {{"first", 1, TDuration::MilliSeconds(10)}, {"first", 2, TDuration::MilliSeconds(10)}}, all, 1, start);
134136
all = all/2;
135137

136138
Cerr << result[0].MicroSeconds() << " " << result[1].MicroSeconds() << Endl;
@@ -140,15 +142,16 @@ Y_UNIT_TEST_SUITE(TKqpComputeScheduler) {
140142

141143
Y_UNIT_TEST(SingleCoreForth) {
142144
NKikimr::NKqp::TComputeScheduler scheduler;
143-
THashMap<TString, double> priorities;
144-
priorities["first"] = 1;
145-
priorities["second"] = 1;
146-
scheduler.SetPriorities(priorities, 1);
145+
TComputeScheduler::TDistributionRule rule;
146+
auto start = TMonotonic::Now();
147+
rule.SubRules.push_back({.Share = 1, .Name = "first"});
148+
rule.SubRules.push_back({.Share = 1, .Name = "second"});
147149

148-
TDuration all = TDuration::Seconds(10);
150+
scheduler.SetPriorities(rule, 1, start);
149151

152+
TDuration all = TDuration::Seconds(10);
150153

151-
auto result = RunSimulation(scheduler, {{"first", 1, TDuration::MilliSeconds(10)}, {"first", 3, TDuration::MilliSeconds(10)}}, all, 1);
154+
auto result = RunSimulation(scheduler, {{"first", 1, TDuration::MilliSeconds(10)}, {"first", 3, TDuration::MilliSeconds(10)}}, all, 1, start);
152155
all = all/2;
153156

154157
Cerr << result[0].MicroSeconds() << " " << result[1].MicroSeconds() << Endl;

0 commit comments

Comments
 (0)