Skip to content

Commit 18eeef7

Browse files
authored
Merge ca26304 into 0964dfd
2 parents 0964dfd + ca26304 commit 18eeef7

File tree

6 files changed

+374
-17
lines changed

6 files changed

+374
-17
lines changed

ydb/core/kqp/common/simple/kqp_event_ids.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ struct TKqpEvents {
4444
EvListSessionsRequest,
4545
EvListSessionsResponse,
4646
EvListProxyNodesRequest,
47-
EvListProxyNodesResponse
47+
EvListProxyNodesResponse,
48+
EvFinishKqpTasks
4849
};
4950

5051
static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution);

ydb/core/kqp/node_service/kqp_node_service.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
1515
#include <ydb/core/kqp/runtime/kqp_read_actor.h>
1616
#include <ydb/core/kqp/runtime/kqp_read_iterator_common.h>
17+
#include <ydb/core/kqp/runtime/kqp_compute_scheduler.h>
1718
#include <ydb/core/kqp/common/kqp_resolve.h>
1819

1920
#include <ydb/library/wilson_ids/wilson.h>
@@ -174,9 +175,10 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
174175

175176
private:
176177
STATEFN(WorkState) {
178+
Scheduler.AdvanceTime(TlsActivationContext->Monotonic());
177179
switch (ev->GetTypeRewrite()) {
178180
hFunc(TEvKqpNode::TEvStartKqpTasksRequest, HandleWork);
179-
hFunc(TEvKqpNode::TEvFinishKqpTask, HandleWork); // used only for unit tests
181+
hFunc(TEvFinishKqpTask, HandleWork); // used only for unit tests
180182
hFunc(TEvKqpNode::TEvCancelKqpTasksRequest, HandleWork);
181183
hFunc(TEvents::TEvWakeup, HandleWork);
182184
// misc
@@ -490,8 +492,11 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
490492
}
491493

492494
// used only for unit tests
493-
void HandleWork(TEvKqpNode::TEvFinishKqpTask::TPtr& ev) {
495+
void HandleWork(TEvFinishKqpTask::TPtr& ev) {
494496
auto& msg = *ev->Get();
497+
if (msg.SchedulerEntity) {
498+
Scheduler.Deregister(*msg.SchedulerEntity);
499+
}
495500
FinishKqpTask(msg.TxId, msg.TaskId, msg.Success, GetStateBucketByTx(Buckets, msg.TxId), GetKqpResourceManager());
496501
}
497502

@@ -670,6 +675,8 @@ class TKqpNodeService : public TActorBootstrapped<TKqpNodeService> {
670675
NYql::NDq::IDqAsyncIoFactory::TPtr AsyncIoFactory;
671676
const std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup;
672677

678+
TComputeScheduler Scheduler;
679+
673680
//state sharded by TxId
674681
std::shared_ptr<TBucketArray> Buckets;
675682
};

ydb/core/kqp/node_service/kqp_node_service.h

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ struct TKqpNodeEvents {
2626
enum EKqpNodeEvents {
2727
EvStartKqpTasksRequest = EventSpaceBegin(TKikimrEvents::ES_KQP) + 320,
2828
EvStartKqpTasksResponse,
29-
EvFinishKqpTasks,
3029
EvCancelKqpTasksRequest,
3130
EvCancelKqpTasksResponse,
3231
};
@@ -46,19 +45,6 @@ struct TEvKqpNode {
4645
struct TEvStartKqpTasksResponse : public TEventPB<TEvStartKqpTasksResponse,
4746
NKikimrKqp::TEvStartKqpTasksResponse, TKqpNodeEvents::EvStartKqpTasksResponse> {};
4847

49-
struct TEvFinishKqpTask : public TEventLocal<TEvFinishKqpTask, TKqpNodeEvents::EvFinishKqpTasks> {
50-
const ui64 TxId;
51-
const ui64 TaskId;
52-
const bool Success;
53-
const NYql::TIssues Issues;
54-
55-
TEvFinishKqpTask(ui64 txId, ui64 taskId, bool success, const NYql::TIssues& issues = {})
56-
: TxId(txId)
57-
, TaskId(taskId)
58-
, Success(success)
59-
, Issues(issues) {}
60-
};
61-
6248
struct TEvCancelKqpTasksRequest : public TEventPB<TEvCancelKqpTasksRequest,
6349
NKikimrKqp::TEvCancelKqpTasksRequest, TKqpNodeEvents::EvCancelKqpTasksRequest> {};
6450

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
#include "kqp_compute_scheduler.h"
2+
3+
namespace {
4+
static constexpr ui64 FromDuration(TDuration d) {
5+
return d.MicroSeconds();
6+
}
7+
8+
static constexpr TDuration ToDuration(double t) {
9+
return TDuration::MicroSeconds(t);
10+
}
11+
12+
static constexpr double MinPriority = 1e-8;
13+
14+
}
15+
16+
namespace NKikimr {
17+
namespace NKqp {
18+
19+
template<typename T>
20+
class TMultiThreadView {
21+
public:
22+
TMultiThreadView(std::atomic<ui64>* usage, T* slot)
23+
: Usage(usage)
24+
, Slot(slot)
25+
{
26+
Usage->fetch_add(1);
27+
}
28+
29+
const T* get() {
30+
return Slot;
31+
}
32+
33+
~TMultiThreadView() {
34+
Usage->fetch_sub(1);
35+
}
36+
37+
private:
38+
std::atomic<ui64>* Usage;
39+
T* Slot;
40+
};
41+
42+
template<typename T>
43+
class TMultithreadPublisher {
44+
public:
45+
void Publish() {
46+
auto oldVal = CurrentT.load();
47+
auto newVal = 1 - oldVal;
48+
CurrentT.store(newVal);
49+
while (true) {
50+
if (Usage[oldVal].load() == 0) {
51+
Slots[oldVal] = Slots[newVal];
52+
return;
53+
}
54+
}
55+
}
56+
57+
T* Next() {
58+
return &Slots[1 - CurrentT.load()];
59+
}
60+
61+
TMultiThreadView<T> Current() {
62+
while (true) {
63+
auto val = CurrentT.load();
64+
TMultiThreadView<T> view(&Usage[val], &Slots[val]);
65+
if (CurrentT.load() == val) {
66+
return view;
67+
}
68+
}
69+
}
70+
71+
private:
72+
std::atomic<ui32> CurrentT = 0;
73+
std::atomic<ui64> Usage[2] = {0, 0};
74+
T Slots[2];
75+
};
76+
77+
TSchedulerEntityHandle::~TSchedulerEntityHandle() = default;
78+
79+
class TSchedulerEntity {
80+
public:
81+
TSchedulerEntity() {}
82+
~TSchedulerEntity() {}
83+
84+
private:
85+
friend class TComputeScheduler;
86+
87+
struct TGroupRecord {
88+
double Weight;
89+
double Now = 0;
90+
TMonotonic LastNowRecalc;
91+
bool Disabled = false;
92+
double EntitiesWeight = 0;
93+
};
94+
95+
TMultithreadPublisher<TGroupRecord>* Group;
96+
double Weight;
97+
double Vruntime = 0;
98+
double Vstart;
99+
};
100+
101+
struct TComputeScheduler::TImpl {
102+
THashMap<TString, std::unique_ptr<TMultithreadPublisher<TSchedulerEntity::TGroupRecord>>> Groups;
103+
};
104+
105+
TComputeScheduler::TComputeScheduler() {
106+
Impl = std::make_unique<TImpl>();
107+
}
108+
109+
TComputeScheduler::~TComputeScheduler() = default;
110+
111+
void TComputeScheduler::SetPriorities(THashMap<TString, double> priorities, double cores) {
112+
double sum = 0;
113+
for (auto [_, v] : priorities) {
114+
sum += v;
115+
}
116+
for (auto& [k, v] : Impl->Groups) {
117+
auto ptr = priorities.FindPtr(k);
118+
if (ptr) {
119+
v->Next()->Weight = ((*ptr) * cores) / sum;
120+
v->Next()->Disabled = false;
121+
} else {
122+
v->Next()->Weight = 0;
123+
v->Next()->Disabled = true;
124+
}
125+
v->Publish();
126+
}
127+
for (auto& [k, v] : priorities) {
128+
if (!Impl->Groups.contains(k)) {
129+
auto group = std::make_unique<TMultithreadPublisher<TSchedulerEntity::TGroupRecord>>();
130+
group->Next()->LastNowRecalc = TMonotonic::Now();
131+
group->Next()->Weight = (v * cores) / sum;
132+
group->Publish();
133+
Impl->Groups[k] = std::move(group);
134+
}
135+
}
136+
}
137+
138+
TSchedulerEntityHandle TComputeScheduler::Enroll(TString group, double weight) {
139+
Y_ENSURE(Impl->Groups.contains(group), "unknown scheduler group");
140+
auto* groupEntry = Impl->Groups[group].get();
141+
auto result = std::make_unique<TSchedulerEntity>();
142+
result->Group = groupEntry;
143+
result->Weight = weight;
144+
result->Vstart = groupEntry->Next()->Now;
145+
groupEntry->Next()->EntitiesWeight += weight;
146+
return TSchedulerEntityHandle(std::move(result));
147+
}
148+
149+
void TComputeScheduler::AdvanceTime(TMonotonic now) {
150+
for (auto& [_, v] : Impl->Groups) {
151+
{
152+
auto group = v.get()->Current();
153+
if (!group.get()->Disabled && group.get()->EntitiesWeight > MinPriority) {
154+
v.get()->Next()->Now += FromDuration(now - group.get()->LastNowRecalc) * group.get()->Weight / group.get()->EntitiesWeight;
155+
v.get()->Next()->LastNowRecalc = now;
156+
}
157+
}
158+
v->Publish();
159+
}
160+
}
161+
162+
void TComputeScheduler::Deregister(TSchedulerEntity& self) {
163+
auto* group = self.Group->Next();
164+
group->Weight -= self.Weight;
165+
}
166+
167+
void TComputeScheduler::TrackTime(TSchedulerEntity& self, TDuration time) {
168+
self.Vruntime += FromDuration(time) / self.Weight;
169+
}
170+
171+
TMaybe<TDuration> TComputeScheduler::CalcDelay(TSchedulerEntity& self, TMonotonic now) {
172+
auto group = self.Group->Current();
173+
Y_ENSURE(!group.get()->Disabled);
174+
double lagTime = (self.Vruntime - (group.get()->Now - self.Vstart)) * group.get()->EntitiesWeight / group.get()->Weight;
175+
double neededTime = lagTime - FromDuration(now - group.get()->LastNowRecalc);
176+
if (neededTime <= 0) {
177+
return Nothing();
178+
} else {
179+
return ToDuration(neededTime);
180+
}
181+
}
182+
183+
184+
} // namespace NKqp
185+
} // namespace NKikimr

0 commit comments

Comments
 (0)