forked from infiniflow/infinity
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtask.h
254 lines (204 loc) · 5.87 KB
/
task.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
//
// Created by jinhai on 23-5-7.
//
#pragma once
#include "operator.h"
#include "buffer_queue.h"
#include "task_queue.h"
#include "poller_queue.h"
#include <unistd.h>
#include <cassert>
#include <unordered_set>
#include <unordered_map>
namespace infinity {
struct Task;
class NewScheduler {
public:
static void
Init(const std::unordered_set<int64_t>& cpu_set);
static void
Uninit();
static void
RunTask(Task* task);
private:
static void
DispatchTask(int64_t worker_id, Task* task);
static void
PollerLoop(int64_t cpu_id);
static void
CoordinatorLoop(int64_t cpu_id);
static void
WorkerLoop(BlockingQueue* task_queue, int64_t worker_id);
static int64_t
GetAvailableCPU();
private:
static std::unordered_set<int64_t> cpu_set;
static std::unordered_map<int64_t, std::unique_ptr<BlockingQueue>> task_queues;
static std::unordered_map<int64_t, std::unique_ptr<std::thread>> workers;
static std::unique_ptr<BlockingQueue> ready_queue;
static std::unique_ptr<std::thread> coordinator;
static std::unique_ptr<PollerQueue> poller_queue;
static std::unique_ptr<std::thread> poller;
static std::vector<int64_t> cpu_array;
static uint64_t current_cpu_id;
};
#define BUFFER_SIZE 128
enum class TaskType {
kTerminate,
kDummy,
kPipeline,
kInvalid,
};
enum class TaskState {
kReady,
kFinished,
kCancelled,
kPending,
kRunning,
};
struct Task {
inline explicit
Task(TaskType type) : type_(type) {}
virtual void
Run(int64_t worker_id) {
// Not implemented
last_worker_id_ = worker_id;
}
inline void
set_state(TaskState state) {
state_.store(state);
}
[[nodiscard]] inline TaskType
type() const {
return type_;
}
TaskType type_{TaskType::kInvalid};
int64_t last_worker_id_{-1};
bool ready_{false};
std::atomic<TaskState> state_{TaskState::kPending};
};
struct TerminateTask final : public Task {
inline explicit
TerminateTask() : Task(TaskType::kTerminate) {
ready_ = true;
}
};
struct DummyTask final : public Task {
inline explicit
DummyTask() : Task(TaskType::kDummy) {
ready_ = true;
}
void
Run(int64_t worker_id) override {
last_worker_id_ = worker_id;
printf("Run dummy task by worker: %ld\n", worker_id);
sleep(1);
}
};
struct PipelineTask final : public Task {
inline explicit
PipelineTask() : Task(TaskType::kPipeline) {}
inline void
Init() {
if(parents_.empty()) {
root_task_ = true;
} else {
root_task_ = false;
}
}
inline void
AddSink(Sink* sink) {
sink_ = sink;
}
inline void
AddSource(Source* source, bool input_queue) {
source_ = source;
if(input_queue) {
input_queue_ = std::make_unique<ConcurrentQueue>();
}
}
inline void
AddOperator(Operator* op) {
operators_.emplace_back(op);
buffers_.emplace_back(std::make_unique<Buffer>(BUFFER_SIZE));
}
inline void
Run(int64_t worker_id) override {
last_worker_id_ = worker_id;
// printf("Run pipeline task by worker: %ld\n", worker_id);
// Read data from source buffer or input queue
source_buffer_ = std::make_shared<Buffer>(BUFFER_SIZE);
source_->Run(input_queue_.get(), nullptr, source_buffer_);
// process the data one by one operator and push to next operator
size_t op_count = operators_.size();
assert(op_count > 0);
operators_[0]->Run(source_buffer_.get(), buffers_[0].get());
for(size_t idx = 1; idx < op_count; ++idx) {
operators_[idx]->Run(buffers_[idx - 1].get(), buffers_[idx].get());
}
// push the data into output queue
sink_->Run(buffers_.back().get(), output_queues_);
// put the parent task into scheduler
for(Task* parent: parents_) {
// printf("Notify parent to run\n");
parent->set_state(TaskState::kReady);
// NewScheduler::RunTask(parent);
}
if(root_task_) {
// wait_flag_.notify_one();
// printf("Notify result\n");
std::unique_lock<std::mutex> lck(result_lk_);
completed_ = true;
result_cv_.notify_one();
}
// sleep(1);
}
inline void
SetChildren(std::vector<std::shared_ptr<Task>> children) {
children_ = std::move(children);
for(const std::shared_ptr<Task>& child: children_) {
PipelineTask* child_pipeline = (PipelineTask*)child.get();
child_pipeline->AddOutputQueue(input_queue_.get());
child_pipeline->AddParent(this);
}
}
[[nodiscard]] inline const std::vector<std::shared_ptr<Task>>&
children() const {
return children_;
}
inline void
GetResult() {
// wait_flag_.wait(true);
std::unique_lock<std::mutex> locker(result_lk_);
result_cv_.wait(locker, [&] {
return completed_;
});
// printf("Get result\n");
}
private:
inline void
AddOutputQueue(ConcurrentQueue* queue) {
output_queues_.emplace_back(queue);
}
inline void
AddParent(Task* parent) {
parents_.emplace_back(parent);
}
private:
Sink* sink_{};
std::vector<ConcurrentQueue*> output_queues_;
std::vector<Operator*> operators_{};
std::vector<std::shared_ptr<Buffer>> buffers_{};
Source* source_{};
std::shared_ptr<Buffer> source_buffer_ = nullptr;
// Wait-free queue
std::unique_ptr<ConcurrentQueue> input_queue_{nullptr};
std::vector<std::shared_ptr<Task>> children_{};
std::vector<Task*> parents_{};
bool root_task_{false};
bool completed_{false};
std::mutex result_lk_;
std::condition_variable result_cv_;
// std::atomic_bool wait_flag_{false};
};
}