Skip to content

Commit a5d0166

Browse files
joyeecheungRafaelGSS
authored andcommitted
src: use priority queue to run worker tasks
According to the documentation, the v8 tasks should be executed based on priority. Previously we always execute the tasks in FIFO order, this changes the NodePlatform implementation to execute the higher priority tasks first. The tasks used to schedule timers for the delayed tasks are run in FIFO order since priority is irrelavent for the timer scheduling part while the tasks unwrapped by the timer callbacks are still ordered by priority. PR-URL: #58047 Refs: #47452 Refs: #54918 Reviewed-By: Stephen Belanger <admin@stephenbelanger.com>
1 parent d2f5ceb commit a5d0166

File tree

2 files changed

+109
-45
lines changed

2 files changed

+109
-45
lines changed

src/node_platform.cc

+66-39
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ using v8::TaskPriority;
1818
namespace {
1919

2020
struct PlatformWorkerData {
21-
TaskQueue<Task>* task_queue;
21+
TaskQueue<TaskQueueEntry>* task_queue;
2222
Mutex* platform_workers_mutex;
2323
ConditionVariable* platform_workers_ready;
2424
int* pending_platform_workers;
@@ -53,7 +53,7 @@ static void PlatformWorkerThread(void* data) {
5353
std::unique_ptr<PlatformWorkerData>
5454
worker_data(static_cast<PlatformWorkerData*>(data));
5555

56-
TaskQueue<Task>* pending_worker_tasks = worker_data->task_queue;
56+
TaskQueue<TaskQueueEntry>* pending_worker_tasks = worker_data->task_queue;
5757
TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
5858
"PlatformWorkerThread");
5959

@@ -67,16 +67,17 @@ static void PlatformWorkerThread(void* data) {
6767
bool debug_log_enabled =
6868
worker_data->debug_log_level != PlatformDebugLogLevel::kNone;
6969
int id = worker_data->id;
70-
while (std::unique_ptr<Task> task =
70+
while (std::unique_ptr<TaskQueueEntry> entry =
7171
pending_worker_tasks->Lock().BlockingPop()) {
7272
if (debug_log_enabled) {
7373
fprintf(stderr,
74-
"\nPlatformWorkerThread %d running task %p\n",
74+
"\nPlatformWorkerThread %d running task %p %s\n",
7575
id,
76-
task.get());
76+
entry->task.get(),
77+
GetTaskPriorityName(entry->priority));
7778
fflush(stderr);
7879
}
79-
task->Run();
80+
entry->task->Run();
8081
pending_worker_tasks->Lock().NotifyOfCompletion();
8182
}
8283
}
@@ -92,8 +93,8 @@ static int GetActualThreadPoolSize(int thread_pool_size) {
9293

9394
class WorkerThreadsTaskRunner::DelayedTaskScheduler {
9495
public:
95-
explicit DelayedTaskScheduler(TaskQueue<Task>* tasks)
96-
: pending_worker_tasks_(tasks) {}
96+
explicit DelayedTaskScheduler(TaskQueue<TaskQueueEntry>* tasks)
97+
: pending_worker_tasks_(tasks) {}
9798

9899
std::unique_ptr<uv_thread_t> Start() {
99100
auto start_thread = [](void* data) {
@@ -107,16 +108,21 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
107108
return t;
108109
}
109110

110-
void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) {
111+
void PostDelayedTask(v8::TaskPriority priority,
112+
std::unique_ptr<Task> task,
113+
double delay_in_seconds) {
111114
auto locked = tasks_.Lock();
112115

116+
auto entry = std::make_unique<TaskQueueEntry>(std::move(task), priority);
117+
auto delayed = std::make_unique<ScheduleTask>(
118+
this, std::move(entry), delay_in_seconds);
119+
113120
// The delayed task scheuler is on is own thread with its own loop that
114121
// runs the timers for the scheduled tasks to pop the original task back
115122
// into the the worker task queue. This first pushes the tasks that
116123
// schedules the timers into the local task queue that will be flushed
117124
// by the local event loop.
118-
locked.Push(std::make_unique<ScheduleTask>(
119-
this, std::move(task), delay_in_seconds));
125+
locked.Push(std::move(delayed));
120126
uv_async_send(&flush_tasks_);
121127
}
122128

@@ -144,10 +150,12 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
144150
DelayedTaskScheduler* scheduler =
145151
ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
146152

147-
std::queue<std::unique_ptr<Task>> tasks_to_run =
148-
scheduler->tasks_.Lock().PopAll();
153+
auto tasks_to_run = scheduler->tasks_.Lock().PopAll();
149154
while (!tasks_to_run.empty()) {
150-
std::unique_ptr<Task> task = std::move(tasks_to_run.front());
155+
// We have to use const_cast because std::priority_queue::top() does not
156+
// return a movable item.
157+
std::unique_ptr<Task> task =
158+
std::move(const_cast<std::unique_ptr<Task>&>(tasks_to_run.top()));
151159
tasks_to_run.pop();
152160
// This runs either the ScheduleTasks that scheduels the timers to
153161
// pop the tasks back into the worker task runner queue, or the
@@ -177,7 +185,7 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
177185
class ScheduleTask : public Task {
178186
public:
179187
ScheduleTask(DelayedTaskScheduler* scheduler,
180-
std::unique_ptr<Task> task,
188+
std::unique_ptr<TaskQueueEntry> task,
181189
double delay_in_seconds)
182190
: scheduler_(scheduler),
183191
task_(std::move(task)),
@@ -194,7 +202,7 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
194202

195203
private:
196204
DelayedTaskScheduler* scheduler_;
197-
std::unique_ptr<Task> task_;
205+
std::unique_ptr<TaskQueueEntry> task_;
198206
double delay_in_seconds_;
199207
};
200208

@@ -205,20 +213,21 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
205213
scheduler->TakeTimerTask(timer));
206214
}
207215

208-
std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) {
209-
std::unique_ptr<Task> task(static_cast<Task*>(timer->data));
216+
std::unique_ptr<TaskQueueEntry> TakeTimerTask(uv_timer_t* timer) {
217+
std::unique_ptr<TaskQueueEntry> task_entry(
218+
static_cast<TaskQueueEntry*>(timer->data));
210219
uv_timer_stop(timer);
211220
uv_close(reinterpret_cast<uv_handle_t*>(timer), [](uv_handle_t* handle) {
212221
delete reinterpret_cast<uv_timer_t*>(handle);
213222
});
214223
timers_.erase(timer);
215-
return task;
224+
return task_entry;
216225
}
217226

218227
uv_sem_t ready_;
219228
// Task queue in the worker thread task runner, we push the delayed task back
220229
// to it when the timer expires.
221-
TaskQueue<Task>* pending_worker_tasks_;
230+
TaskQueue<TaskQueueEntry>* pending_worker_tasks_;
222231

223232
// Locally scheduled tasks to be poped into the worker task runner queue.
224233
// It is flushed whenever the next closest timer expires.
@@ -264,13 +273,20 @@ WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(
264273
}
265274
}
266275

267-
void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
268-
pending_worker_tasks_.Lock().Push(std::move(task));
276+
void WorkerThreadsTaskRunner::PostTask(v8::TaskPriority priority,
277+
std::unique_ptr<v8::Task> task,
278+
const v8::SourceLocation& location) {
279+
auto entry = std::make_unique<TaskQueueEntry>(std::move(task), priority);
280+
pending_worker_tasks_.Lock().Push(std::move(entry));
269281
}
270282

271-
void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
272-
double delay_in_seconds) {
273-
delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds);
283+
void WorkerThreadsTaskRunner::PostDelayedTask(
284+
v8::TaskPriority priority,
285+
std::unique_ptr<v8::Task> task,
286+
const v8::SourceLocation& location,
287+
double delay_in_seconds) {
288+
delayed_task_scheduler_->PostDelayedTask(
289+
priority, std::move(task), delay_in_seconds);
274290
}
275291

276292
void WorkerThreadsTaskRunner::BlockingDrain() {
@@ -330,7 +346,9 @@ void PerIsolatePlatformData::PostTaskImpl(std::unique_ptr<Task> task,
330346

331347
auto locked = foreground_tasks_.Lock();
332348
if (flush_tasks_ == nullptr) return;
333-
locked.Push(std::move(task));
349+
// All foreground tasks are treated as user blocking tasks.
350+
locked.Push(std::make_unique<TaskQueueEntry>(
351+
std::move(task), v8::TaskPriority::kUserBlocking));
334352
uv_async_send(flush_tasks_);
335353
}
336354

@@ -356,6 +374,8 @@ void PerIsolatePlatformData::PostDelayedTaskImpl(
356374
delayed->task = std::move(task);
357375
delayed->platform_data = shared_from_this();
358376
delayed->timeout = delay_in_seconds;
377+
// All foreground tasks are treated as user blocking tasks.
378+
delayed->priority = v8::TaskPriority::kUserBlocking;
359379
locked.Push(std::move(delayed));
360380
uv_async_send(flush_tasks_);
361381
}
@@ -562,11 +582,13 @@ void NodePlatform::DrainTasks(Isolate* isolate) {
562582
bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
563583
bool did_work = false;
564584

565-
std::queue<std::unique_ptr<DelayedTask>> delayed_tasks_to_schedule =
566-
foreground_delayed_tasks_.Lock().PopAll();
585+
auto delayed_tasks_to_schedule = foreground_delayed_tasks_.Lock().PopAll();
567586
while (!delayed_tasks_to_schedule.empty()) {
587+
// We have to use const_cast because std::priority_queue::top() does not
588+
// return a movable item.
568589
std::unique_ptr<DelayedTask> delayed =
569-
std::move(delayed_tasks_to_schedule.front());
590+
std::move(const_cast<std::unique_ptr<DelayedTask>&>(
591+
delayed_tasks_to_schedule.top()));
570592
delayed_tasks_to_schedule.pop();
571593

572594
did_work = true;
@@ -591,17 +613,20 @@ bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
591613
});
592614
}
593615

594-
std::queue<std::unique_ptr<Task>> tasks;
616+
TaskQueue<TaskQueueEntry>::PriorityQueue tasks;
595617
{
596618
auto locked = foreground_tasks_.Lock();
597619
tasks = locked.PopAll();
598620
}
599621

600622
while (!tasks.empty()) {
601-
std::unique_ptr<Task> task = std::move(tasks.front());
623+
// We have to use const_cast because std::priority_queue::top() does not
624+
// return a movable item.
625+
std::unique_ptr<TaskQueueEntry> entry =
626+
std::move(const_cast<std::unique_ptr<TaskQueueEntry>&>(tasks.top()));
602627
tasks.pop();
603628
did_work = true;
604-
RunForegroundTask(std::move(task));
629+
RunForegroundTask(std::move(entry->task));
605630
}
606631

607632
return did_work;
@@ -622,7 +647,7 @@ void NodePlatform::PostTaskOnWorkerThreadImpl(
622647
}
623648
fflush(stderr);
624649
}
625-
worker_thread_task_runner_->PostTask(std::move(task));
650+
worker_thread_task_runner_->PostTask(priority, std::move(task), location);
626651
}
627652

628653
void NodePlatform::PostDelayedTaskOnWorkerThreadImpl(
@@ -642,8 +667,8 @@ void NodePlatform::PostDelayedTaskOnWorkerThreadImpl(
642667
}
643668
fflush(stderr);
644669
}
645-
worker_thread_task_runner_->PostDelayedTask(std::move(task),
646-
delay_in_seconds);
670+
worker_thread_task_runner_->PostDelayedTask(
671+
priority, std::move(task), location, delay_in_seconds);
647672
}
648673

649674
IsolatePlatformDelegate* NodePlatform::ForIsolate(Isolate* isolate) {
@@ -742,7 +767,8 @@ std::unique_ptr<T> TaskQueue<T>::Locked::Pop() {
742767
if (queue_->task_queue_.empty()) {
743768
return std::unique_ptr<T>(nullptr);
744769
}
745-
std::unique_ptr<T> result = std::move(queue_->task_queue_.front());
770+
std::unique_ptr<T> result = std::move(
771+
std::move(const_cast<std::unique_ptr<T>&>(queue_->task_queue_.top())));
746772
queue_->task_queue_.pop();
747773
return result;
748774
}
@@ -755,7 +781,8 @@ std::unique_ptr<T> TaskQueue<T>::Locked::BlockingPop() {
755781
if (queue_->stopped_) {
756782
return std::unique_ptr<T>(nullptr);
757783
}
758-
std::unique_ptr<T> result = std::move(queue_->task_queue_.front());
784+
std::unique_ptr<T> result = std::move(
785+
std::move(const_cast<std::unique_ptr<T>&>(queue_->task_queue_.top())));
759786
queue_->task_queue_.pop();
760787
return result;
761788
}
@@ -781,8 +808,8 @@ void TaskQueue<T>::Locked::Stop() {
781808
}
782809

783810
template <class T>
784-
std::queue<std::unique_ptr<T>> TaskQueue<T>::Locked::PopAll() {
785-
std::queue<std::unique_ptr<T>> result;
811+
TaskQueue<T>::PriorityQueue TaskQueue<T>::Locked::PopAll() {
812+
TaskQueue<T>::PriorityQueue result;
786813
result.swap(queue_->task_queue_);
787814
return result;
788815
}

src/node_platform.h

+43-6
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
#include <functional>
77
#include <queue>
8+
#include <type_traits>
89
#include <unordered_map>
910
#include <vector>
1011

@@ -19,9 +20,32 @@ class NodePlatform;
1920
class IsolateData;
2021
class PerIsolatePlatformData;
2122

23+
template <typename, typename = void>
24+
struct has_priority : std::false_type {};
25+
26+
template <typename T>
27+
struct has_priority<T, std::void_t<decltype(std::declval<T>().priority)>>
28+
: std::true_type {};
29+
2230
template <class T>
2331
class TaskQueue {
2432
public:
33+
// If the entry type has a priority memeber, order the priority queue by
34+
// that - higher priority first. Otherwise, maintain insertion order.
35+
struct EntryCompare {
36+
bool operator()(const std::unique_ptr<T>& a,
37+
const std::unique_ptr<T>& b) const {
38+
if constexpr (has_priority<T>::value) {
39+
return a->priority < b->priority;
40+
} else {
41+
return false;
42+
}
43+
}
44+
};
45+
46+
using PriorityQueue = std::priority_queue<std::unique_ptr<T>,
47+
std::vector<std::unique_ptr<T>>,
48+
EntryCompare>;
2549
class Locked {
2650
public:
2751
void Push(std::unique_ptr<T> task);
@@ -30,7 +54,7 @@ class TaskQueue {
3054
void NotifyOfCompletion();
3155
void BlockingDrain();
3256
void Stop();
33-
std::queue<std::unique_ptr<T>> PopAll();
57+
PriorityQueue PopAll();
3458

3559
private:
3660
friend class TaskQueue;
@@ -51,11 +75,19 @@ class TaskQueue {
5175
ConditionVariable tasks_drained_;
5276
int outstanding_tasks_;
5377
bool stopped_;
54-
std::queue<std::unique_ptr<T>> task_queue_;
78+
PriorityQueue task_queue_;
79+
};
80+
81+
struct TaskQueueEntry {
82+
std::unique_ptr<v8::Task> task;
83+
v8::TaskPriority priority;
84+
TaskQueueEntry(std::unique_ptr<v8::Task> t, v8::TaskPriority p)
85+
: task(std::move(t)), priority(p) {}
5586
};
5687

5788
struct DelayedTask {
5889
std::unique_ptr<v8::Task> task;
90+
v8::TaskPriority priority;
5991
uv_timer_t timer;
6092
double timeout;
6193
std::shared_ptr<PerIsolatePlatformData> platform_data;
@@ -136,7 +168,7 @@ class PerIsolatePlatformData
136168

137169
// When acquiring locks for both task queues, lock foreground_tasks_
138170
// first then foreground_delayed_tasks_ to avoid deadlocks.
139-
TaskQueue<v8::Task> foreground_tasks_;
171+
TaskQueue<TaskQueueEntry> foreground_tasks_;
140172
TaskQueue<DelayedTask> foreground_delayed_tasks_;
141173

142174
// Use a custom deleter because libuv needs to close the handle first.
@@ -152,8 +184,13 @@ class WorkerThreadsTaskRunner {
152184
explicit WorkerThreadsTaskRunner(int thread_pool_size,
153185
PlatformDebugLogLevel debug_log_level);
154186

155-
void PostTask(std::unique_ptr<v8::Task> task);
156-
void PostDelayedTask(std::unique_ptr<v8::Task> task, double delay_in_seconds);
187+
void PostTask(v8::TaskPriority priority,
188+
std::unique_ptr<v8::Task> task,
189+
const v8::SourceLocation& location);
190+
void PostDelayedTask(v8::TaskPriority priority,
191+
std::unique_ptr<v8::Task> task,
192+
const v8::SourceLocation& location,
193+
double delay_in_seconds);
157194

158195
void BlockingDrain();
159196
void Shutdown();
@@ -169,7 +206,7 @@ class WorkerThreadsTaskRunner {
169206
// v8::Platform::PostDelayedTaskOnWorkerThread(), the DelayedTaskScheduler
170207
// thread will schedule a timer that pushes the delayed tasks back into this
171208
// queue when the timer expires.
172-
TaskQueue<v8::Task> pending_worker_tasks_;
209+
TaskQueue<TaskQueueEntry> pending_worker_tasks_;
173210

174211
class DelayedTaskScheduler;
175212
std::unique_ptr<DelayedTaskScheduler> delayed_task_scheduler_;

0 commit comments

Comments
 (0)