Skip to content

Commit 4ae940d

Browse files
committed
stash
1 parent 381b290 commit 4ae940d

File tree

4 files changed

+125
-28
lines changed

4 files changed

+125
-28
lines changed

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -656,6 +656,11 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
656656
}
657657

658658
void SetPriorities(const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration& conf) {
659+
std::function<TComputeScheduler::TDistributionRule(const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration&)> convert
660+
= [](const NKikimrConfig::TTableServiceConfig::TComputePoolConfiguration&)
661+
{
662+
};
663+
Scheduler.SetPriorities(convert(conf), 1, TlsActivationContext->Monotonic());
659664
}
660665

661666
void SetIteratorReadsRetrySettings(const NKikimrConfig::TTableServiceConfig::TIteratorReadsRetrySettings& settings) {

ydb/core/kqp/runtime/kqp_compute_scheduler.cpp

Lines changed: 115 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,31 @@ double TSchedulerEntityHandle::VRuntime() {
121121
}
122122

123123
struct TComputeScheduler::TImpl {
124-
THashMap<TString, std::unique_ptr<TMultithreadPublisher<TSchedulerEntity::TGroupRecord>>> Groups;
124+
THashMap<TString, size_t> PoolId;
125+
std::vector<std::unique_ptr<TMultithreadPublisher<TSchedulerEntity::TGroupRecord>>> Records;
126+
127+
struct TRule {
128+
size_t Parent;
129+
double Weight;
130+
131+
TMaybe<size_t> RecordId = {};
132+
double SubRulesSum = 0;
133+
bool Empty = true;
134+
};
135+
size_t RootRule;
136+
std::vector<TRule> Rules;
137+
138+
double SumCores;
139+
140+
void AssignWeights(TMonotonic now) {
141+
for (size_t i = 0; i < Rules.size(); ++i) {
142+
if (Rules[i].RecordId) {
143+
if (Records[*Rules[i].RecordId]->Next()->EntitiesWeight < MinPriority) { // mincores
144+
}
145+
} else {
146+
}
147+
}
148+
}
125149
};
126150

127151
TComputeScheduler::TComputeScheduler() {
@@ -130,31 +154,100 @@ TComputeScheduler::TComputeScheduler() {
130154

131155
TComputeScheduler::~TComputeScheduler() = default;
132156

133-
void TComputeScheduler::SetPriorities(THashMap<TString, double> priorities, double cores) {
134-
double sum = 0;
135-
for (auto [_, v] : priorities) {
136-
sum += v;
137-
}
138-
for (auto& [k, v] : Impl->Groups) {
139-
auto ptr = priorities.FindPtr(k);
140-
if (ptr) {
141-
v->Next()->Weight = ((*ptr) * cores) / sum;
142-
v->Next()->Disabled = false;
157+
void TComputeScheduler::SetPriorities(TDistributionRule rule, double cores, TMonotonic now) {
158+
THashSet<TString> seenNames;
159+
std::function<void(TDistributionRule&)> exploreNames = [&](TDistributionRule& rule) {
160+
if (rule.SubRules.empty()) {
161+
seenNames.insert(rule.Name);
143162
} else {
144-
v->Next()->Weight = 0;
145-
v->Next()->Disabled = true;
163+
for (auto& subRule : rule.SubRules) {
164+
exploreNames(subRule);
165+
}
146166
}
147-
v->Publish();
148-
}
149-
for (auto& [k, v] : priorities) {
150-
if (!Impl->Groups.contains(k)) {
167+
};
168+
exploreNames(rule);
169+
170+
for (auto& k : seenNames) {
171+
auto ptr = Impl->PoolId.FindPtr(k);
172+
if (!ptr) {
173+
Impl->PoolId[k] = Impl->Records.size();
151174
auto group = std::make_unique<TMultithreadPublisher<TSchedulerEntity::TGroupRecord>>();
152-
group->Next()->LastNowRecalc = TMonotonic::Now();
153-
group->Next()->Weight = (v * cores) / sum;
175+
group->Next()->LastNowRecalc = now;
176+
Impl->Records.push_back(std::move(group));
177+
}
178+
}
179+
for (auto& [k, v] : Impl->PoolId) {
180+
if (!seenNames.contains(k)) {
181+
auto* group = Impl->Records[Impl->PoolId[v]].get();
182+
group->Next()->Weight = 0;
183+
group->Next()->Disabled = true;
154184
group->Publish();
155-
Impl->Groups[k] = std::move(group);
156185
}
157186
}
187+
Impl->SumCores = cores;
188+
189+
TVector<TImpl::TRule> rules;
190+
std::function<size_t(TDistributionRule&)> makeRules = [&](TDistributionRule& rule) {
191+
size_t result;
192+
if (rule.SubRules.empty()) {
193+
result = rules.size();
194+
rules.push_back(TImpl::TRule{.Weight = rule.Share, .RecordId=Impl->PoolId[rule.Name]});
195+
} else {
196+
TVector<size_t> toAssign;
197+
for (auto& subRule : rule.SubRules) {
198+
toAssign.push_back(makeRules(subRule));
199+
}
200+
size_t result = rules.size();
201+
rules.push_back(TImpl::TRule{.Weight = rule.Share});
202+
for (auto i : toAssign) {
203+
rules[i].Parent = result;
204+
}
205+
return result;
206+
}
207+
return result;
208+
};
209+
Impl->RootRule = makeRules(rule);
210+
Impl->Rules.swap(rules);
211+
212+
// if (ptr) {
213+
// v->Next()->Weight = ((*ptr) * cores) / sum;
214+
// v->Next()->Disabled = false;
215+
// } else {
216+
// v->Next()->Weight = 0;
217+
// v->Next()->Disabled = true;
218+
// }
219+
// v->Publish();
220+
//}
221+
222+
Impl->AssignWeights(now);
223+
for (auto& record : Impl->Records) {
224+
record->Publish();
225+
}
226+
227+
//double sum = 0;
228+
//for (auto [_, v] : priorities) {
229+
// sum += v;
230+
//}
231+
//for (auto& [k, v] : Impl->Groups) {
232+
// auto ptr = priorities.FindPtr(k);
233+
// if (ptr) {
234+
// v->Next()->Weight = ((*ptr) * cores) / sum;
235+
// v->Next()->Disabled = false;
236+
// } else {
237+
// v->Next()->Weight = 0;
238+
// v->Next()->Disabled = true;
239+
// }
240+
// v->Publish();
241+
//}
242+
//for (auto& [k, v] : priorities) {
243+
// if (!Impl->Groups.contains(k)) {
244+
// auto group = ;
245+
// group->Next()->LastNowRecalc = TMonotonic::Now();
246+
// group->Next()->Weight = (v * cores) / sum;
247+
// group->Publish();
248+
// Impl->Groups[k] = std::move(group);
249+
// }
250+
//}
158251
}
159252

160253

@@ -177,7 +270,8 @@ TSchedulerEntityHandle TComputeScheduler::Enroll(TString groupName, double weigh
177270
}
178271

179272
void TComputeScheduler::AdvanceTime(TMonotonic now) {
180-
for (auto& [_, v] : Impl->Groups) {
273+
Impl->AssignWeights();
274+
for (auto& v : Impl->Records) {
181275
{
182276
auto group = v.get()->Current();
183277
if (!group.get()->Disabled && group.get()->EntitiesWeight > MinPriority) {

ydb/core/kqp/runtime/kqp_compute_scheduler.h

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,18 +41,16 @@ class TSchedulerEntityHandle {
4141
class TComputeScheduler {
4242
public:
4343
struct TDistributionRule {
44-
std::string Name;
45-
};
46-
47-
struct TDistributionOptions {
48-
TDistributionRule Rule;
44+
double Share;
45+
TString Name;
46+
TVector<TDistributionRule> SubRules;
4947
};
5048

5149
public:
5250
TComputeScheduler();
5351
~TComputeScheduler();
5452

55-
void SetPriorities(THashMap<TString, double> priorities, double cores);
53+
void SetPriorities(TDistributionRule rootRule, double cores, TMonotonic now);
5654

5755
TSchedulerEntityHandle Enroll(TString group, double weight);
5856

ydb/core/kqp/runtime/kqp_scheduler_ut.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ Y_UNIT_TEST_SUITE(TKqpComputeScheduler) {
3838
TVector<double> groupnow(processes.size());
3939

4040
for (size_t i = 0; i < processes.size(); ++i) {
41-
handles[i] = scheduler.Enroll(processes[i].Group, processes[i].Weight, now);
41+
handles[i] = scheduler.Enroll(processes[i].Group, processes[i].Weight);
4242
runQueue.push_back(i);
4343
}
4444

0 commit comments

Comments
 (0)