Skip to content

Commit ca26304

Browse files
committed
pimpl
1 parent 25cc8eb commit ca26304

File tree

2 files changed

+260
-152
lines changed

2 files changed

+260
-152
lines changed
Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,185 @@
11
#include "kqp_compute_scheduler.h"
22

3+
namespace {
4+
static constexpr ui64 FromDuration(TDuration d) {
5+
return d.MicroSeconds();
6+
}
7+
8+
static constexpr TDuration ToDuration(double t) {
9+
return TDuration::MicroSeconds(t);
10+
}
11+
12+
static constexpr double MinPriority = 1e-8;
13+
14+
}
15+
316
namespace NKikimr {
417
namespace NKqp {
518

19+
template<typename T>
20+
class TMultiThreadView {
21+
public:
22+
TMultiThreadView(std::atomic<ui64>* usage, T* slot)
23+
: Usage(usage)
24+
, Slot(slot)
25+
{
26+
Usage->fetch_add(1);
27+
}
28+
29+
const T* get() {
30+
return Slot;
31+
}
32+
33+
~TMultiThreadView() {
34+
Usage->fetch_sub(1);
35+
}
36+
37+
private:
38+
std::atomic<ui64>* Usage;
39+
T* Slot;
40+
};
41+
42+
template<typename T>
43+
class TMultithreadPublisher {
44+
public:
45+
void Publish() {
46+
auto oldVal = CurrentT.load();
47+
auto newVal = 1 - oldVal;
48+
CurrentT.store(newVal);
49+
while (true) {
50+
if (Usage[oldVal].load() == 0) {
51+
Slots[oldVal] = Slots[newVal];
52+
return;
53+
}
54+
}
55+
}
56+
57+
T* Next() {
58+
return &Slots[1 - CurrentT.load()];
59+
}
60+
61+
TMultiThreadView<T> Current() {
62+
while (true) {
63+
auto val = CurrentT.load();
64+
TMultiThreadView<T> view(&Usage[val], &Slots[val]);
65+
if (CurrentT.load() == val) {
66+
return view;
67+
}
68+
}
69+
}
70+
71+
private:
72+
std::atomic<ui32> CurrentT = 0;
73+
std::atomic<ui64> Usage[2] = {0, 0};
74+
T Slots[2];
75+
};
76+
77+
TSchedulerEntityHandle::~TSchedulerEntityHandle() = default;
78+
79+
class TSchedulerEntity {
80+
public:
81+
TSchedulerEntity() {}
82+
~TSchedulerEntity() {}
83+
84+
private:
85+
friend class TComputeScheduler;
86+
87+
struct TGroupRecord {
88+
double Weight;
89+
double Now = 0;
90+
TMonotonic LastNowRecalc;
91+
bool Disabled = false;
92+
double EntitiesWeight = 0;
93+
};
94+
95+
TMultithreadPublisher<TGroupRecord>* Group;
96+
double Weight;
97+
double Vruntime = 0;
98+
double Vstart;
99+
};
100+
101+
struct TComputeScheduler::TImpl {
102+
THashMap<TString, std::unique_ptr<TMultithreadPublisher<TSchedulerEntity::TGroupRecord>>> Groups;
103+
};
104+
105+
TComputeScheduler::TComputeScheduler() {
106+
Impl = std::make_unique<TImpl>();
107+
}
108+
109+
TComputeScheduler::~TComputeScheduler() = default;
110+
111+
void TComputeScheduler::SetPriorities(THashMap<TString, double> priorities, double cores) {
112+
double sum = 0;
113+
for (auto [_, v] : priorities) {
114+
sum += v;
115+
}
116+
for (auto& [k, v] : Impl->Groups) {
117+
auto ptr = priorities.FindPtr(k);
118+
if (ptr) {
119+
v->Next()->Weight = ((*ptr) * cores) / sum;
120+
v->Next()->Disabled = false;
121+
} else {
122+
v->Next()->Weight = 0;
123+
v->Next()->Disabled = true;
124+
}
125+
v->Publish();
126+
}
127+
for (auto& [k, v] : priorities) {
128+
if (!Impl->Groups.contains(k)) {
129+
auto group = std::make_unique<TMultithreadPublisher<TSchedulerEntity::TGroupRecord>>();
130+
group->Next()->LastNowRecalc = TMonotonic::Now();
131+
group->Next()->Weight = (v * cores) / sum;
132+
group->Publish();
133+
Impl->Groups[k] = std::move(group);
134+
}
135+
}
136+
}
137+
138+
TSchedulerEntityHandle TComputeScheduler::Enroll(TString group, double weight) {
139+
Y_ENSURE(Impl->Groups.contains(group), "unknown scheduler group");
140+
auto* groupEntry = Impl->Groups[group].get();
141+
auto result = std::make_unique<TSchedulerEntity>();
142+
result->Group = groupEntry;
143+
result->Weight = weight;
144+
result->Vstart = groupEntry->Next()->Now;
145+
groupEntry->Next()->EntitiesWeight += weight;
146+
return TSchedulerEntityHandle(std::move(result));
147+
}
148+
149+
void TComputeScheduler::AdvanceTime(TMonotonic now) {
150+
for (auto& [_, v] : Impl->Groups) {
151+
{
152+
auto group = v.get()->Current();
153+
if (!group.get()->Disabled && group.get()->EntitiesWeight > MinPriority) {
154+
v.get()->Next()->Now += FromDuration(now - group.get()->LastNowRecalc) * group.get()->Weight / group.get()->EntitiesWeight;
155+
v.get()->Next()->LastNowRecalc = now;
156+
}
157+
}
158+
v->Publish();
159+
}
160+
}
161+
162+
void TComputeScheduler::Deregister(TSchedulerEntity& self) {
163+
auto* group = self.Group->Next();
164+
group->Weight -= self.Weight;
165+
}
166+
167+
void TComputeScheduler::TrackTime(TSchedulerEntity& self, TDuration time) {
168+
self.Vruntime += FromDuration(time) / self.Weight;
169+
}
170+
171+
TMaybe<TDuration> TComputeScheduler::CalcDelay(TSchedulerEntity& self, TMonotonic now) {
172+
auto group = self.Group->Current();
173+
Y_ENSURE(!group.get()->Disabled);
174+
double lagTime = (self.Vruntime - (group.get()->Now - self.Vstart)) * group.get()->EntitiesWeight / group.get()->Weight;
175+
double neededTime = lagTime - FromDuration(now - group.get()->LastNowRecalc);
176+
if (neededTime <= 0) {
177+
return Nothing();
178+
} else {
179+
return ToDuration(neededTime);
180+
}
181+
}
182+
183+
6184
} // namespace NKqp
7185
} // namespace NKikimr

0 commit comments

Comments
 (0)