Skip to content

Commit 7c2c068

Browse files
addaleaxMylesBorins
authored andcommitted
src: remove AsyncRequest
Remove `AsyncRequest` from the source code, and replace its usage with threadsafe `SetImmediate()` calls. This has the advantage of being able to pass in any function, rather than one that is defined when the `AsyncRequest` is “installed”. This necessitates two changes: - The stopping flag (which was only used in one case and ignored in the other) is now a direct member of the `Environment` class. - Workers no longer have their own libuv handles, requiring manual management of their libuv ref count. As a drive-by fix, the `can_call_into_js` variable was turned into an atomic variable. While there have been no bug reports, the flag is set from `Stop(env)` calls, which are supposed to be possible from any thread. Backport-PR-URL: #32301 PR-URL: #31386 Refs: openjs-foundation/summit#240 Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: Rich Trott <rtrott@gmail.com>
1 parent 748a530 commit 7c2c068

File tree

5 files changed

+52
-103
lines changed

5 files changed

+52
-103
lines changed

src/env-inl.h

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -881,8 +881,21 @@ inline void Environment::remove_sub_worker_context(worker::Worker* context) {
881881
sub_worker_contexts_.erase(context);
882882
}
883883

884+
inline void Environment::add_refs(int64_t diff) {
885+
task_queues_async_refs_ += diff;
886+
CHECK_GE(task_queues_async_refs_, 0);
887+
if (task_queues_async_refs_ == 0)
888+
uv_unref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
889+
else
890+
uv_ref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
891+
}
892+
884893
inline bool Environment::is_stopping() const {
885-
return thread_stopper_.is_stopped();
894+
return is_stopping_.load();
895+
}
896+
897+
inline void Environment::set_stopping(bool value) {
898+
is_stopping_.store(value);
886899
}
887900

888901
inline std::list<node_module>* Environment::extra_linked_bindings() {
@@ -1192,14 +1205,6 @@ int64_t Environment::base_object_count() const {
11921205
return base_object_count_;
11931206
}
11941207

1195-
bool AsyncRequest::is_stopped() const {
1196-
return stopped_.load();
1197-
}
1198-
1199-
void AsyncRequest::set_stopped(bool flag) {
1200-
stopped_.store(flag);
1201-
}
1202-
12031208
#define VP(PropertyName, StringValue) V(v8::Private, PropertyName)
12041209
#define VY(PropertyName, StringValue) V(v8::Symbol, PropertyName)
12051210
#define VS(PropertyName, StringValue) V(v8::String, PropertyName)

src/env.cc

Lines changed: 2 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -473,14 +473,6 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
473473
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));
474474
uv_unref(reinterpret_cast<uv_handle_t*>(&task_queues_async_));
475475

476-
thread_stopper()->Install(
477-
this, static_cast<void*>(this), [](uv_async_t* handle) {
478-
Environment* env = static_cast<Environment*>(handle->data);
479-
uv_stop(env->event_loop());
480-
});
481-
thread_stopper()->set_stopped(false);
482-
uv_unref(reinterpret_cast<uv_handle_t*>(thread_stopper()->GetHandle()));
483-
484476
// Register clean-up cb to be called to clean up the handles
485477
// when the environment is freed, note that they are not cleaned in
486478
// the one environment per process setup, but will be called in
@@ -498,8 +490,9 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
498490

499491
void Environment::ExitEnv() {
500492
set_can_call_into_js(false);
501-
thread_stopper()->Stop();
493+
set_stopping(true);
502494
isolate_->TerminateExecution();
495+
SetImmediateThreadsafe([](Environment* env) { uv_stop(env->event_loop()); });
503496
}
504497

505498
void Environment::RegisterHandleCleanups() {
@@ -604,7 +597,6 @@ void Environment::RunCleanup() {
604597
started_cleanup_ = true;
605598
TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment),
606599
"RunCleanup", this);
607-
thread_stopper()->Uninstall();
608600
CleanupHandles();
609601

610602
while (!cleanup_hooks_.empty()) {
@@ -993,7 +985,6 @@ inline size_t Environment::SelfSize() const {
993985
// TODO(joyeecheung): refactor the MemoryTracker interface so
994986
// this can be done for common types within the Track* calls automatically
995987
// if a certain scope is entered.
996-
size -= sizeof(thread_stopper_);
997988
size -= sizeof(async_hooks_);
998989
size -= sizeof(tick_info_);
999990
size -= sizeof(immediate_info_);
@@ -1015,7 +1006,6 @@ void Environment::MemoryInfo(MemoryTracker* tracker) const {
10151006
tracker->TrackField("fs_stats_field_array", fs_stats_field_array_);
10161007
tracker->TrackField("fs_stats_field_bigint_array",
10171008
fs_stats_field_bigint_array_);
1018-
tracker->TrackField("thread_stopper", thread_stopper_);
10191009
tracker->TrackField("cleanup_hooks", cleanup_hooks_);
10201010
tracker->TrackField("async_hooks", async_hooks_);
10211011
tracker->TrackField("immediate_info", immediate_info_);
@@ -1094,38 +1084,6 @@ void Environment::CleanupFinalizationGroups() {
10941084
}
10951085
}
10961086

1097-
void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
1098-
CHECK_NULL(async_);
1099-
env_ = env;
1100-
async_ = new uv_async_t;
1101-
async_->data = data;
1102-
CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
1103-
}
1104-
1105-
void AsyncRequest::Uninstall() {
1106-
if (async_ != nullptr) {
1107-
env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
1108-
async_ = nullptr;
1109-
}
1110-
}
1111-
1112-
void AsyncRequest::Stop() {
1113-
set_stopped(true);
1114-
if (async_ != nullptr) uv_async_send(async_);
1115-
}
1116-
1117-
uv_async_t* AsyncRequest::GetHandle() {
1118-
return async_;
1119-
}
1120-
1121-
void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
1122-
if (async_ != nullptr) tracker->TrackField("async_request", *async_);
1123-
}
1124-
1125-
AsyncRequest::~AsyncRequest() {
1126-
CHECK_NULL(async_);
1127-
}
1128-
11291087
// Not really any better place than env.cc at this moment.
11301088
void BaseObject::DeleteMe(void* data) {
11311089
BaseObject* self = static_cast<BaseObject*>(data);

src/env.h

Lines changed: 12 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -585,34 +585,6 @@ struct AllocatedBuffer {
585585
friend class Environment;
586586
};
587587

588-
class AsyncRequest : public MemoryRetainer {
589-
public:
590-
AsyncRequest() = default;
591-
~AsyncRequest() override;
592-
593-
AsyncRequest(const AsyncRequest&) = delete;
594-
AsyncRequest& operator=(const AsyncRequest&) = delete;
595-
AsyncRequest(AsyncRequest&&) = delete;
596-
AsyncRequest& operator=(AsyncRequest&&) = delete;
597-
598-
void Install(Environment* env, void* data, uv_async_cb target);
599-
void Uninstall();
600-
void Stop();
601-
inline void set_stopped(bool flag);
602-
inline bool is_stopped() const;
603-
uv_async_t* GetHandle();
604-
void MemoryInfo(MemoryTracker* tracker) const override;
605-
606-
607-
SET_MEMORY_INFO_NAME(AsyncRequest)
608-
SET_SELF_SIZE(AsyncRequest)
609-
610-
private:
611-
Environment* env_;
612-
uv_async_t* async_ = nullptr;
613-
std::atomic_bool stopped_ {true};
614-
};
615-
616588
class KVStore {
617589
public:
618590
KVStore() = default;
@@ -1058,6 +1030,14 @@ class Environment : public MemoryRetainer {
10581030
inline bool can_call_into_js() const;
10591031
inline void set_can_call_into_js(bool can_call_into_js);
10601032

1033+
// Increase or decrease a counter that manages whether this Environment
1034+
// keeps the event loop alive on its own or not. The counter starts out at 0,
1035+
// meaning it does not, and any positive value will make it keep the event
1036+
// loop alive.
1037+
// This is used by Workers to manage their own .ref()/.unref() implementation,
1038+
// as Workers aren't directly associated with their own libuv handles.
1039+
inline void add_refs(int64_t diff);
1040+
10611041
inline bool has_run_bootstrapping_code() const;
10621042
inline void set_has_run_bootstrapping_code(bool has_run_bootstrapping_code);
10631043

@@ -1078,6 +1058,7 @@ class Environment : public MemoryRetainer {
10781058
inline void remove_sub_worker_context(worker::Worker* context);
10791059
void stop_sub_worker_contexts();
10801060
inline bool is_stopping() const;
1061+
inline void set_stopping(bool value);
10811062
inline std::list<node_module>* extra_linked_bindings();
10821063
inline node_module* extra_linked_bindings_head();
10831064
inline const Mutex& extra_linked_bindings_mutex() const;
@@ -1219,8 +1200,6 @@ class Environment : public MemoryRetainer {
12191200
inline std::shared_ptr<EnvironmentOptions> options();
12201201
inline std::shared_ptr<ExclusiveAccess<HostPort>> inspector_host_port();
12211202

1222-
inline AsyncRequest* thread_stopper() { return &thread_stopper_; }
1223-
12241203
inline int32_t stack_trace_limit() const { return 10; }
12251204

12261205
// The BaseObject count is a debugging helper that makes sure that there are
@@ -1285,6 +1264,7 @@ class Environment : public MemoryRetainer {
12851264
uv_prepare_t idle_prepare_handle_;
12861265
uv_check_t idle_check_handle_;
12871266
uv_async_t task_queues_async_;
1267+
int64_t task_queues_async_refs_ = 0;
12881268
bool profiler_idle_notifier_started_ = false;
12891269

12901270
AsyncHooks async_hooks_;
@@ -1342,7 +1322,7 @@ class Environment : public MemoryRetainer {
13421322
bool has_run_bootstrapping_code_ = false;
13431323
bool has_serialized_options_ = false;
13441324

1345-
bool can_call_into_js_ = true;
1325+
std::atomic_bool can_call_into_js_ { true };
13461326
Flags flags_;
13471327
uint64_t thread_id_;
13481328
std::unordered_set<worker::Worker*> sub_worker_contexts_;
@@ -1458,10 +1438,7 @@ class Environment : public MemoryRetainer {
14581438
bool started_cleanup_ = false;
14591439

14601440
int64_t base_object_count_ = 0;
1461-
1462-
// A custom async abstraction (a pair of async handle and a state variable)
1463-
// Used by embedders to shutdown running Node instance.
1464-
AsyncRequest thread_stopper_;
1441+
std::atomic_bool is_stopping_ { false };
14651442

14661443
typedef std::unordered_set<std::shared_ptr<v8::ArrayBuffer::Allocator>>
14671444
ArrayBufferAllocatorList;

src/node_worker.cc

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ void Worker::Run() {
281281
stopped_ = true;
282282
this->env_ = nullptr;
283283
}
284-
env_->thread_stopper()->set_stopped(true);
284+
env_->set_stopping(true);
285285
env_->stop_sub_worker_contexts();
286286
env_->RunCleanup();
287287
RunAtExit(env_.get());
@@ -424,7 +424,6 @@ void Worker::JoinThread() {
424424
thread_joined_ = true;
425425

426426
env()->remove_sub_worker_context(this);
427-
on_thread_finished_.Uninstall();
428427

429428
{
430429
HandleScope handle_scope(env()->isolate());
@@ -455,6 +454,8 @@ void Worker::JoinThread() {
455454
}
456455

457456
Worker::~Worker() {
457+
JoinThread();
458+
458459
Mutex::ScopedLock lock(mutex_);
459460

460461
CHECK(stopped_);
@@ -630,18 +631,16 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
630631
w->stopped_ = false;
631632
w->thread_joined_ = false;
632633

633-
w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) {
634-
Worker* w_ = static_cast<Worker*>(handle->data);
635-
CHECK(w_->is_stopped());
636-
w_->parent_port_ = nullptr;
637-
w_->JoinThread();
638-
delete w_;
639-
});
634+
if (w->has_ref_)
635+
w->env()->add_refs(1);
640636

641637
uv_thread_options_t thread_options;
642638
thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
643639
thread_options.stack_size = kStackSize;
644640
CHECK_EQ(uv_thread_create_ex(&w->tid_, &thread_options, [](void* arg) {
641+
// XXX: This could become a std::unique_ptr, but that makes at least
642+
// gcc 6.3 detect undefined behaviour when there shouldn't be any.
643+
// gcc 7+ handles this well.
645644
Worker* w = static_cast<Worker*>(arg);
646645
const uintptr_t stack_top = reinterpret_cast<uintptr_t>(&arg);
647646

@@ -652,7 +651,12 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
652651
w->Run();
653652

654653
Mutex::ScopedLock lock(w->mutex_);
655-
w->on_thread_finished_.Stop();
654+
w->env()->SetImmediateThreadsafe(
655+
[w = std::unique_ptr<Worker>(w)](Environment* env) {
656+
if (w->has_ref_)
657+
env->add_refs(-1);
658+
// implicitly delete w
659+
});
656660
}, static_cast<void*>(w)), 0);
657661
}
658662

@@ -667,13 +671,19 @@ void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
667671
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
668672
Worker* w;
669673
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
670-
uv_ref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
674+
if (!w->has_ref_) {
675+
w->has_ref_ = true;
676+
w->env()->add_refs(1);
677+
}
671678
}
672679

673680
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
674681
Worker* w;
675682
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
676-
uv_unref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
683+
if (w->has_ref_) {
684+
w->has_ref_ = false;
685+
w->env()->add_refs(-1);
686+
}
677687
}
678688

679689
void Worker::GetResourceLimits(const FunctionCallbackInfo<Value>& args) {

src/node_worker.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ class Worker : public AsyncWrap {
4242

4343
void MemoryInfo(MemoryTracker* tracker) const override {
4444
tracker->TrackField("parent_port", parent_port_);
45-
tracker->TrackInlineField(&on_thread_finished_, "on_thread_finished_");
4645
}
4746

4847
SET_MEMORY_INFO_NAME(Worker)
@@ -109,14 +108,14 @@ class Worker : public AsyncWrap {
109108
// instance refers to it via its [kPort] property.
110109
MessagePort* parent_port_ = nullptr;
111110

112-
AsyncRequest on_thread_finished_;
113-
114111
// A raw flag that is used by creator and worker threads to
115112
// sync up on pre-mature termination of worker - while in the
116113
// warmup phase. Once the worker is fully warmed up, use the
117114
// async handle of the worker's Environment for the same purpose.
118115
bool stopped_ = true;
119116

117+
bool has_ref_ = true;
118+
120119
// The real Environment of the worker object. It has a lesser
121120
// lifespan than the worker object itself - comes to life
122121
// when the worker thread creates a new Environment, and gets

0 commit comments

Comments
 (0)