Skip to content

src: improve thread safety of TaskQueue #57910

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 106 additions & 94 deletions src/node_platform.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ static void PlatformWorkerThread(void* data) {
worker_data->platform_workers_ready->Signal(lock);
}

while (std::unique_ptr<Task> task = pending_worker_tasks->BlockingPop()) {
while (std::unique_ptr<Task> task =
pending_worker_tasks->Lock().BlockingPop()) {
task->Run();
pending_worker_tasks->NotifyOfCompletion();
pending_worker_tasks->Lock().NotifyOfCompletion();
}
}

Expand Down Expand Up @@ -73,13 +74,15 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
}

void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) {
tasks_.Push(std::make_unique<ScheduleTask>(this, std::move(task),
delay_in_seconds));
auto locked = tasks_.Lock();
locked.Push(std::make_unique<ScheduleTask>(
this, std::move(task), delay_in_seconds));
uv_async_send(&flush_tasks_);
}

void Stop() {
tasks_.Push(std::make_unique<StopTask>(this));
auto locked = tasks_.Lock();
locked.Push(std::make_unique<StopTask>(this));
uv_async_send(&flush_tasks_);
}

Expand All @@ -100,8 +103,14 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
static void FlushTasks(uv_async_t* flush_tasks) {
DelayedTaskScheduler* scheduler =
ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
while (std::unique_ptr<Task> task = scheduler->tasks_.Pop())

std::queue<std::unique_ptr<Task>> tasks_to_run =
scheduler->tasks_.Lock().PopAll();
while (!tasks_to_run.empty()) {
std::unique_ptr<Task> task = std::move(tasks_to_run.front());
tasks_to_run.pop();
task->Run();
}
}

class StopTask : public Task {
Expand Down Expand Up @@ -149,7 +158,8 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
static void RunTask(uv_timer_t* timer) {
DelayedTaskScheduler* scheduler =
ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer));
scheduler->pending_worker_tasks_->Lock().Push(
scheduler->TakeTimerTask(timer));
}

std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) {
Expand Down Expand Up @@ -203,7 +213,7 @@ WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
}

void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
pending_worker_tasks_.Push(std::move(task));
pending_worker_tasks_.Lock().Push(std::move(task));
}

void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
Expand All @@ -212,11 +222,11 @@ void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<Task> task,
}

void WorkerThreadsTaskRunner::BlockingDrain() {
pending_worker_tasks_.BlockingDrain();
pending_worker_tasks_.Lock().BlockingDrain();
}

void WorkerThreadsTaskRunner::Shutdown() {
pending_worker_tasks_.Stop();
pending_worker_tasks_.Lock().Stop();
delayed_task_scheduler_->Stop();
for (size_t i = 0; i < threads_.size(); i++) {
CHECK_EQ(0, uv_thread_join(threads_[i].get()));
Expand Down Expand Up @@ -253,29 +263,27 @@ void PerIsolatePlatformData::PostIdleTaskImpl(

void PerIsolatePlatformData::PostTaskImpl(std::unique_ptr<Task> task,
const v8::SourceLocation& location) {
if (flush_tasks_ == nullptr) {
// V8 may post tasks during Isolate disposal. In that case, the only
// sensible path forward is to discard the task.
return;
}
foreground_tasks_.Push(std::move(task));
// The task can be posted from any V8 background worker thread, even when
// the foreground task runner is being cleaned up by Shutdown(). In that
// case, make sure we wait until the shutdown is completed (which leads
// to flush_tasks_ == nullptr, and the task will be discarded).
auto locked = foreground_tasks_.Lock();
if (flush_tasks_ == nullptr) return;
locked.Push(std::move(task));
uv_async_send(flush_tasks_);
}

void PerIsolatePlatformData::PostDelayedTaskImpl(
std::unique_ptr<Task> task,
double delay_in_seconds,
const v8::SourceLocation& location) {
if (flush_tasks_ == nullptr) {
// V8 may post tasks during Isolate disposal. In that case, the only
// sensible path forward is to discard the task.
return;
}
auto locked = foreground_delayed_tasks_.Lock();
if (flush_tasks_ == nullptr) return;
std::unique_ptr<DelayedTask> delayed(new DelayedTask());
delayed->task = std::move(task);
delayed->platform_data = shared_from_this();
delayed->timeout = delay_in_seconds;
foreground_delayed_tasks_.Push(std::move(delayed));
locked.Push(std::move(delayed));
uv_async_send(flush_tasks_);
}

Expand All @@ -301,32 +309,30 @@ void PerIsolatePlatformData::AddShutdownCallback(void (*callback)(void*),
}

void PerIsolatePlatformData::Shutdown() {
if (flush_tasks_ == nullptr)
return;
auto foreground_tasks_locked = foreground_tasks_.Lock();
auto foreground_delayed_tasks_locked = foreground_delayed_tasks_.Lock();

// While there should be no V8 tasks in the queues at this point, it is
// possible that Node.js-internal tasks from e.g. the inspector are still
// lying around. We clear these queues and ignore the return value,
// effectively deleting the tasks instead of running them.
foreground_delayed_tasks_.PopAll();
foreground_tasks_.PopAll();
foreground_delayed_tasks_locked.PopAll();
foreground_tasks_locked.PopAll();
scheduled_delayed_tasks_.clear();

// Both destroying the scheduled_delayed_tasks_ lists and closing
// flush_tasks_ handle add tasks to the event loop. We keep a count of all
// non-closed handles, and when that reaches zero, we inform any shutdown
// callbacks that the platform is done as far as this Isolate is concerned.
self_reference_ = shared_from_this();
uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
[](uv_handle_t* handle) {
std::unique_ptr<uv_async_t> flush_tasks {
reinterpret_cast<uv_async_t*>(handle) };
PerIsolatePlatformData* platform_data =
static_cast<PerIsolatePlatformData*>(flush_tasks->data);
platform_data->DecreaseHandleCount();
platform_data->self_reference_.reset();
});
flush_tasks_ = nullptr;
if (flush_tasks_ != nullptr) {
// Both destroying the scheduled_delayed_tasks_ lists and closing
// flush_tasks_ handle add tasks to the event loop. We keep a count of all
// non-closed handles, and when that reaches zero, we inform any shutdown
// callbacks that the platform is done as far as this Isolate is concerned.
self_reference_ = shared_from_this();
uv_close(reinterpret_cast<uv_handle_t*>(flush_tasks_),
[](uv_handle_t* handle) {
std::unique_ptr<uv_async_t> flush_tasks{
reinterpret_cast<uv_async_t*>(handle)};
PerIsolatePlatformData* platform_data =
static_cast<PerIsolatePlatformData*>(flush_tasks->data);
platform_data->DecreaseHandleCount();
platform_data->self_reference_.reset();
});
flush_tasks_ = nullptr;
}
}

void PerIsolatePlatformData::DecreaseHandleCount() {
Expand Down Expand Up @@ -472,39 +478,48 @@ void NodePlatform::DrainTasks(Isolate* isolate) {
bool PerIsolatePlatformData::FlushForegroundTasksInternal() {
bool did_work = false;

while (std::unique_ptr<DelayedTask> delayed =
foreground_delayed_tasks_.Pop()) {
std::queue<std::unique_ptr<DelayedTask>> delayed_tasks_to_schedule =
foreground_delayed_tasks_.Lock().PopAll();
while (!delayed_tasks_to_schedule.empty()) {
std::unique_ptr<DelayedTask> delayed =
std::move(delayed_tasks_to_schedule.front());
delayed_tasks_to_schedule.pop();

did_work = true;
uint64_t delay_millis = llround(delayed->timeout * 1000);

delayed->timer.data = static_cast<void*>(delayed.get());
uv_timer_init(loop_, &delayed->timer);
// Timers may not guarantee queue ordering of events with the same delay if
// the delay is non-zero. This should not be a problem in practice.
// Timers may not guarantee queue ordering of events with the same delay
// if the delay is non-zero. This should not be a problem in practice.
uv_timer_start(&delayed->timer, RunForegroundTask, delay_millis, 0);
uv_unref(reinterpret_cast<uv_handle_t*>(&delayed->timer));
uv_handle_count_++;

scheduled_delayed_tasks_.emplace_back(delayed.release(),
[](DelayedTask* delayed) {
uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
[](uv_handle_t* handle) {
std::unique_ptr<DelayedTask> task {
static_cast<DelayedTask*>(handle->data) };
task->platform_data->DecreaseHandleCount();
});
});
scheduled_delayed_tasks_.emplace_back(
delayed.release(), [](DelayedTask* delayed) {
uv_close(reinterpret_cast<uv_handle_t*>(&delayed->timer),
[](uv_handle_t* handle) {
std::unique_ptr<DelayedTask> task{
static_cast<DelayedTask*>(handle->data)};
task->platform_data->DecreaseHandleCount();
});
});
}

std::queue<std::unique_ptr<Task>> tasks;
{
auto locked = foreground_tasks_.Lock();
tasks = locked.PopAll();
}
// Move all foreground tasks into a separate queue and flush that queue.
// This way tasks that are posted while flushing the queue will be run on the
// next call of FlushForegroundTasksInternal.
std::queue<std::unique_ptr<Task>> tasks = foreground_tasks_.PopAll();

while (!tasks.empty()) {
std::unique_ptr<Task> task = std::move(tasks.front());
tasks.pop();
did_work = true;
RunForegroundTask(std::move(task));
}

return did_work;
}

Expand Down Expand Up @@ -594,66 +609,63 @@ TaskQueue<T>::TaskQueue()
outstanding_tasks_(0), stopped_(false), task_queue_() { }

template <class T>
void TaskQueue<T>::Push(std::unique_ptr<T> task) {
Mutex::ScopedLock scoped_lock(lock_);
outstanding_tasks_++;
task_queue_.push(std::move(task));
tasks_available_.Signal(scoped_lock);
TaskQueue<T>::Locked::Locked(TaskQueue* queue)
: queue_(queue), lock_(queue->lock_) {}

template <class T>
void TaskQueue<T>::Locked::Push(std::unique_ptr<T> task) {
queue_->outstanding_tasks_++;
queue_->task_queue_.push(std::move(task));
queue_->tasks_available_.Signal(lock_);
}

template <class T>
std::unique_ptr<T> TaskQueue<T>::Pop() {
Mutex::ScopedLock scoped_lock(lock_);
if (task_queue_.empty()) {
std::unique_ptr<T> TaskQueue<T>::Locked::Pop() {
if (queue_->task_queue_.empty()) {
return std::unique_ptr<T>(nullptr);
}
std::unique_ptr<T> result = std::move(task_queue_.front());
task_queue_.pop();
std::unique_ptr<T> result = std::move(queue_->task_queue_.front());
queue_->task_queue_.pop();
return result;
}

template <class T>
std::unique_ptr<T> TaskQueue<T>::BlockingPop() {
Mutex::ScopedLock scoped_lock(lock_);
while (task_queue_.empty() && !stopped_) {
tasks_available_.Wait(scoped_lock);
std::unique_ptr<T> TaskQueue<T>::Locked::BlockingPop() {
while (queue_->task_queue_.empty() && !queue_->stopped_) {
queue_->tasks_available_.Wait(lock_);
}
if (stopped_) {
if (queue_->stopped_) {
return std::unique_ptr<T>(nullptr);
}
std::unique_ptr<T> result = std::move(task_queue_.front());
task_queue_.pop();
std::unique_ptr<T> result = std::move(queue_->task_queue_.front());
queue_->task_queue_.pop();
return result;
}

template <class T>
void TaskQueue<T>::NotifyOfCompletion() {
Mutex::ScopedLock scoped_lock(lock_);
if (--outstanding_tasks_ == 0) {
tasks_drained_.Broadcast(scoped_lock);
void TaskQueue<T>::Locked::NotifyOfCompletion() {
if (--queue_->outstanding_tasks_ == 0) {
queue_->tasks_drained_.Broadcast(lock_);
}
}

template <class T>
void TaskQueue<T>::BlockingDrain() {
Mutex::ScopedLock scoped_lock(lock_);
while (outstanding_tasks_ > 0) {
tasks_drained_.Wait(scoped_lock);
void TaskQueue<T>::Locked::BlockingDrain() {
while (queue_->outstanding_tasks_ > 0) {
queue_->tasks_drained_.Wait(lock_);
}
}

template <class T>
void TaskQueue<T>::Stop() {
Mutex::ScopedLock scoped_lock(lock_);
stopped_ = true;
tasks_available_.Broadcast(scoped_lock);
void TaskQueue<T>::Locked::Stop() {
queue_->stopped_ = true;
queue_->tasks_available_.Broadcast(lock_);
}

template <class T>
std::queue<std::unique_ptr<T>> TaskQueue<T>::PopAll() {
Mutex::ScopedLock scoped_lock(lock_);
std::queue<std::unique_ptr<T>> TaskQueue<T>::Locked::PopAll() {
std::queue<std::unique_ptr<T>> result;
result.swap(task_queue_);
result.swap(queue_->task_queue_);
return result;
}

Expand Down
32 changes: 24 additions & 8 deletions src/node_platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,28 @@ class PerIsolatePlatformData;
template <class T>
class TaskQueue {
public:
class Locked {
public:
void Push(std::unique_ptr<T> task);
std::unique_ptr<T> Pop();
std::unique_ptr<T> BlockingPop();
void NotifyOfCompletion();
void BlockingDrain();
void Stop();
std::queue<std::unique_ptr<T>> PopAll();

private:
friend class TaskQueue;
explicit Locked(TaskQueue* queue);

TaskQueue* queue_;
Mutex::ScopedLock lock_;
};

TaskQueue();
~TaskQueue() = default;

void Push(std::unique_ptr<T> task);
std::unique_ptr<T> Pop();
std::unique_ptr<T> BlockingPop();
std::queue<std::unique_ptr<T>> PopAll();
void NotifyOfCompletion();
void BlockingDrain();
void Stop();
Locked Lock() { return Locked(this); }

private:
Mutex lock_;
Expand Down Expand Up @@ -98,6 +110,8 @@ class PerIsolatePlatformData
void RunForegroundTask(std::unique_ptr<v8::Task> task);
static void RunForegroundTask(uv_timer_t* timer);

uv_async_t* flush_tasks_ = nullptr;

struct ShutdownCallback {
void (*cb)(void*);
void* data;
Expand All @@ -110,7 +124,9 @@ class PerIsolatePlatformData

v8::Isolate* const isolate_;
uv_loop_t* const loop_;
uv_async_t* flush_tasks_ = nullptr;

// When acquiring locks for both task queues, lock foreground_tasks_
// first then foreground_delayed_tasks_ to avoid deadlocks.
TaskQueue<v8::Task> foreground_tasks_;
TaskQueue<DelayedTask> foreground_delayed_tasks_;

Expand Down
Loading