Skip to content

Commit 381b290

Browse files
committed
add ut
1 parent 438dc8f commit 381b290

File tree

1 file changed

+158
-0
lines changed

1 file changed

+158
-0
lines changed
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
#include "kqp_compute_scheduler.h"
2+
3+
#include <ydb/library/yql/minikql/mkql_alloc.h>
4+
#include <library/cpp/testing/unittest/registar.h>
5+
6+
using namespace NKikimr::NKqp;
7+
using namespace NMonotonic;
8+
9+
Y_UNIT_TEST_SUITE(TKqpComputeScheduler) {
10+
11+
struct TProcess {
12+
TString Group;
13+
double Weight;
14+
TDuration Cuanta;
15+
};
16+
17+
TVector<TDuration> RunSimulation(TComputeScheduler& scheduler, TVector<TProcess> processes, TDuration time, size_t executionUnits) {
18+
auto start = TMonotonic::Now();
19+
TMonotonic now = start;
20+
TMonotonic deadline = now + time;
21+
scheduler.AdvanceTime(now);
22+
TVector<TDuration> runTimes(processes.size());
23+
struct TEvent {
24+
enum EEventType {
25+
Wakeup,
26+
Sleep,
27+
} Type;
28+
29+
TMonotonic Time;
30+
};
31+
32+
33+
TVector<TMaybe<TEvent>> events(processes.size());
34+
//TVector<TMaybe<TMonotonic>> wakeup(processes.size());
35+
//TVector<TMaybe<TMonotonic>> sleep(processes.size());
36+
TVector<TSchedulerEntityHandle> handles(processes.size());
37+
TVector<size_t> runQueue;
38+
TVector<double> groupnow(processes.size());
39+
40+
for (size_t i = 0; i < processes.size(); ++i) {
41+
handles[i] = scheduler.Enroll(processes[i].Group, processes[i].Weight, now);
42+
runQueue.push_back(i);
43+
}
44+
45+
while (now < deadline) {
46+
size_t toRun = executionUnits;
47+
for (size_t i = 0; i < processes.size(); ++i) {
48+
groupnow[i] = scheduler.GroupNow(*handles[i], now);
49+
//Cerr << " " << scheduler.Now(*handles[i]) << "<" << scheduler.GroupNow(*handles[i]);
50+
UNIT_ASSERT(handles[i].VRuntime() <= scheduler.GroupNow(*handles[i], now) + (processes[i].Cuanta / processes[i].Weight).MicroSeconds());
51+
}
52+
Cerr << Endl;
53+
54+
for (size_t i = 0; i < processes.size(); ++i) {
55+
if (events[i]) {
56+
if (events[i]->Time <= now) {
57+
if (events[i]->Type == TEvent::EEventType::Wakeup) {
58+
events[i].Clear();
59+
runQueue.push_back(i);
60+
} else {
61+
auto delay = scheduler.CalcDelay(*handles[i], now);
62+
if (delay) {
63+
events[i] = TEvent{TEvent::EEventType::Wakeup, now + *delay};
64+
} else {
65+
events[i].Clear();
66+
runQueue.push_back(i);
67+
}
68+
}
69+
} else {
70+
if (events[i]->Type == TEvent::EEventType::Sleep) {
71+
toRun -= 1;
72+
} else {
73+
//UNIT_ASSERT(scheduler.CalcDelay(*handles[i], now - TDuration::MicroSeconds(1)).GetOrElse(TDuration::Zero()) > TDuration::Zero());
74+
}
75+
}
76+
}
77+
78+
}
79+
80+
for (size_t i = 0; i < toRun && !runQueue.empty(); ++i) {
81+
size_t taskToRun = runQueue[0];
82+
events[taskToRun] = TEvent{TEvent::EEventType::Sleep, now + processes[taskToRun].Cuanta};
83+
runTimes[taskToRun] += processes[taskToRun].Cuanta;
84+
scheduler.TrackTime(*handles[taskToRun], processes[taskToRun].Cuanta);
85+
runQueue.erase(runQueue.begin());
86+
}
87+
88+
89+
TMonotonic newDeadline = TMonotonic::Max();
90+
for (auto& t : events) {
91+
if (t) {
92+
newDeadline = Min(newDeadline, t->Time);
93+
}
94+
}
95+
now = newDeadline;
96+
scheduler.AdvanceTime(now);
97+
}
98+
99+
return runTimes;
100+
}
101+
102+
void AssertEq(TDuration first, TDuration second, TDuration delta) {
103+
UNIT_ASSERT(first >= second - delta);
104+
UNIT_ASSERT(first <= second + delta);
105+
}
106+
107+
Y_UNIT_TEST(SingleCoreSimple) {
108+
NKikimr::NKqp::TComputeScheduler scheduler;
109+
THashMap<TString, double> priorities;
110+
priorities["first"] = 1;
111+
priorities["second"] = 1;
112+
scheduler.SetPriorities(priorities, 1);
113+
114+
TDuration all = TDuration::Seconds(10);
115+
116+
auto result = RunSimulation(scheduler, {{"first", 1, TDuration::MilliSeconds(10)}, {"first", 1, TDuration::MilliSeconds(10)}}, all, 1);
117+
118+
for (auto t : result) {
119+
AssertEq(t, all/4, TDuration::MilliSeconds(20));
120+
}
121+
}
122+
123+
Y_UNIT_TEST(SingleCoreThird) {
124+
NKikimr::NKqp::TComputeScheduler scheduler;
125+
THashMap<TString, double> priorities;
126+
priorities["first"] = 1;
127+
priorities["second"] = 1;
128+
scheduler.SetPriorities(priorities, 1);
129+
130+
TDuration all = TDuration::Seconds(10);
131+
132+
133+
auto result = RunSimulation(scheduler, {{"first", 1, TDuration::MilliSeconds(10)}, {"first", 2, TDuration::MilliSeconds(10)}}, all, 1);
134+
all = all/2;
135+
136+
Cerr << result[0].MicroSeconds() << " " << result[1].MicroSeconds() << Endl;
137+
AssertEq(result[0], all/3, TDuration::MilliSeconds(20));
138+
AssertEq(result[1], 2*all/3, TDuration::MilliSeconds(20));
139+
}
140+
141+
Y_UNIT_TEST(SingleCoreForth) {
142+
NKikimr::NKqp::TComputeScheduler scheduler;
143+
THashMap<TString, double> priorities;
144+
priorities["first"] = 1;
145+
priorities["second"] = 1;
146+
scheduler.SetPriorities(priorities, 1);
147+
148+
TDuration all = TDuration::Seconds(10);
149+
150+
151+
auto result = RunSimulation(scheduler, {{"first", 1, TDuration::MilliSeconds(10)}, {"first", 3, TDuration::MilliSeconds(10)}}, all, 1);
152+
all = all/2;
153+
154+
Cerr << result[0].MicroSeconds() << " " << result[1].MicroSeconds() << Endl;
155+
AssertEq(result[0], all/4, TDuration::MilliSeconds(20));
156+
AssertEq(result[1], 3*all/4, TDuration::MilliSeconds(20));
157+
}
158+
}

0 commit comments

Comments
 (0)