From c57af9036f8147db92e59f2076dce2c895caf457 Mon Sep 17 00:00:00 2001 From: Kyle Farnung Date: Thu, 5 Jul 2018 15:50:14 -0700 Subject: [PATCH] n-api: implement threadsafe for ChakraCore --- src/node_api_jsrt.cc | 434 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 434 insertions(+) diff --git a/src/node_api_jsrt.cc b/src/node_api_jsrt.cc index bf85abde192..68881ced8b6 100644 --- a/src/node_api_jsrt.cc +++ b/src/node_api_jsrt.cc @@ -9,6 +9,7 @@ ******************************************************************************/ #include +#define NAPI_EXPERIMENTAL #include #include #include @@ -3000,3 +3001,436 @@ NAPI_EXTERN napi_status napi_get_uv_event_loop(napi_env env, uv_loop_t** loop) { napi_clear_last_error(); return napi_ok; } + +class TsFn: public node::AsyncResource { + public: + TsFn(v8::Local func, + v8::Local resource, + v8::Local name, + size_t thread_count_, + void* context_, + size_t max_queue_size_, + napi_env env_, + void* finalize_data_, + napi_finalize finalize_cb_, + napi_threadsafe_function_call_js call_js_cb_): + AsyncResource(env_->isolate, + resource, + *v8::String::Utf8Value(env_->isolate, name)), + thread_count(thread_count_), + is_closing(false), + context(context_), + max_queue_size(max_queue_size_), + env(env_), + finalize_data(finalize_data_), + finalize_cb(finalize_cb_), + idle_running(false), + call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_), + handles_closing(false) { + ref.Reset(env->isolate, func); + node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this); + } + + ~TsFn() { + node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this); + } + + // These methods can be called from any thread. + + napi_status Push(void* data, napi_threadsafe_function_call_mode mode) { + node::Mutex::ScopedLock lock(this->mutex); + + while (queue.size() >= max_queue_size && + max_queue_size > 0 && + !is_closing) { + if (mode == napi_tsfn_nonblocking) { + return napi_queue_full; + } + cond->Wait(lock); + } + + if (is_closing) { + if (thread_count == 0) { + return napi_invalid_arg; + } else { + thread_count--; + return napi_closing; + } + } else { + if (uv_async_send(&async) != 0) { + return napi_generic_failure; + } + queue.push(data); + return napi_ok; + } + } + + napi_status Acquire() { + node::Mutex::ScopedLock lock(this->mutex); + + if (is_closing) { + return napi_closing; + } + + thread_count++; + + return napi_ok; + } + + napi_status Release(napi_threadsafe_function_release_mode mode) { + node::Mutex::ScopedLock lock(this->mutex); + + if (thread_count == 0) { + return napi_invalid_arg; + } + + thread_count--; + + if (thread_count == 0 || mode == napi_tsfn_abort) { + if (!is_closing) { + is_closing = (mode == napi_tsfn_abort); + if (is_closing) { + cond->Signal(lock); + } + if (uv_async_send(&async) != 0) { + return napi_generic_failure; + } + } + } + + return napi_ok; + } + + void EmptyQueueAndDelete() { + for (; !queue.empty() ; queue.pop()) { + call_js_cb(nullptr, nullptr, context, queue.front()); + } + delete this; + } + + // These methods must only be called from the loop thread. + + napi_status Init() { + TsFn* ts_fn = this; + + if (uv_async_init(env->loop, &async, AsyncCb) == 0) { + if (max_queue_size > 0) { + cond.reset(new node::ConditionVariable); + } + if ((max_queue_size == 0 || cond.get() != nullptr) && + uv_idle_init(env->loop, &idle) == 0) { + return napi_ok; + } + + node::Environment::GetCurrent(env->isolate)->CloseHandle( + reinterpret_cast(&async), + [] (uv_handle_t* handle) -> void { + TsFn* ts_fn = + node::ContainerOf(&TsFn::async, + reinterpret_cast(handle)); + delete ts_fn; + }); + + // Prevent the thread-safe function from being deleted here, because + // the callback above will delete it. + ts_fn = nullptr; + } + + delete ts_fn; + + return napi_generic_failure; + } + + napi_status Unref() { + uv_unref(reinterpret_cast(&async)); + uv_unref(reinterpret_cast(&idle)); + + return napi_ok; + } + + napi_status Ref() { + uv_ref(reinterpret_cast(&async)); + uv_ref(reinterpret_cast(&idle)); + + return napi_ok; + } + + void DispatchOne() { + void* data; + bool popped_value = false; + bool idle_stop_failed = false; + + { + node::Mutex::ScopedLock lock(this->mutex); + if (is_closing) { + CloseHandlesAndMaybeDelete(); + } else { + size_t size = queue.size(); + if (size > 0) { + data = queue.front(); + queue.pop(); + popped_value = true; + if (size == max_queue_size && max_queue_size > 0) { + cond->Signal(lock); + } + size--; + } + + if (size == 0) { + if (thread_count == 0) { + is_closing = true; + cond->Signal(lock); + CloseHandlesAndMaybeDelete(); + } else { + if (uv_idle_stop(&idle) != 0) { + idle_stop_failed = true; + } else { + idle_running = false; + } + } + } + } + } + + if (popped_value || idle_stop_failed) { + v8::HandleScope scope(env->isolate); + CallbackScope cb_scope(this); + + if (idle_stop_failed) { + CHECK(napi_throw_error(env, + "ERR_NAPI_TSFN_STOP_IDLE_LOOP", + "Failed to stop the idle loop") == napi_ok); + } else { + v8::Local js_cb = + v8::Local::New(env->isolate, ref); + call_js_cb(env, + v8impl::JsValueFromV8LocalValue(js_cb), + context, + data); + } + } + } + + node::Environment* NodeEnv() { + // For some reason grabbing the Node.js environment requires a handle scope. + v8::HandleScope scope(env->isolate); + return node::Environment::GetCurrent(env->isolate); + } + + void MaybeStartIdle() { + if (!idle_running) { + if (uv_idle_start(&idle, IdleCb) != 0) { + v8::HandleScope scope(env->isolate); + CallbackScope cb_scope(this); + CHECK(napi_throw_error(env, + "ERR_NAPI_TSFN_START_IDLE_LOOP", + "Failed to start the idle loop") == napi_ok); + } + } + } + + void Finalize() { + v8::HandleScope scope(env->isolate); + if (finalize_cb) { + CallbackScope cb_scope(this); + finalize_cb(env, finalize_data, context); + } + EmptyQueueAndDelete(); + } + + inline void* Context() { + return context; + } + + void CloseHandlesAndMaybeDelete(bool set_closing = false) { + if (set_closing) { + node::Mutex::ScopedLock lock(this->mutex); + is_closing = true; + cond->Signal(lock); + } + if (handles_closing) { + return; + } + handles_closing = true; + NodeEnv()->CloseHandle( + reinterpret_cast(&async), + [] (uv_handle_t* handle) -> void { + TsFn* ts_fn = node::ContainerOf(&TsFn::async, + reinterpret_cast(handle)); + ts_fn->NodeEnv()->CloseHandle( + reinterpret_cast(&ts_fn->idle), + [] (uv_handle_t* handle) -> void { + TsFn* ts_fn = node::ContainerOf(&TsFn::idle, + reinterpret_cast(handle)); + ts_fn->Finalize(); + }); + }); + } + + // Default way of calling into JavaScript. Used when TsFn is constructed + // without a call_js_cb_. + static void CallJs(napi_env env, napi_value cb, void* context, void* data) { + if (!(env == nullptr || cb == nullptr)) { + napi_value recv; + napi_status status; + + status = napi_get_undefined(env, &recv); + if (status != napi_ok) { + napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED", + "Failed to retrieve undefined value"); + return; + } + + status = napi_call_function(env, recv, cb, 0, nullptr, nullptr); + if (status != napi_ok && status != napi_pending_exception) { + napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS", + "Failed to call JS callback"); + return; + } + } + } + + static void IdleCb(uv_idle_t* idle) { + TsFn* ts_fn = + node::ContainerOf(&TsFn::idle, idle); + ts_fn->DispatchOne(); + } + + static void AsyncCb(uv_async_t* async) { + TsFn* ts_fn = + node::ContainerOf(&TsFn::async, async); + ts_fn->MaybeStartIdle(); + } + + static void Cleanup(void* data) { + reinterpret_cast(data)->CloseHandlesAndMaybeDelete(true); + } + + private: + // These are variables protected by the mutex. + node::Mutex mutex; + std::unique_ptr cond; + std::queue queue; + uv_async_t async; + uv_idle_t idle; + size_t thread_count; + bool is_closing; + + // These are variables set once, upon creation, and then never again, which + // means we don't need the mutex to read them. + void* context; + size_t max_queue_size; + + // These are variables accessed only from the loop thread. + node::Persistent ref; + napi_env env; + void* finalize_data; + napi_finalize finalize_cb; + bool idle_running; + napi_async_context async_context; + napi_threadsafe_function_call_js call_js_cb; + bool handles_closing; +}; + +NAPI_EXTERN napi_status +napi_create_threadsafe_function(napi_env env, + napi_value func, + napi_value async_resource, + napi_value async_resource_name, + size_t max_queue_size, + size_t initial_thread_count, + void* thread_finalize_data, + napi_finalize thread_finalize_cb, + void* context, + napi_threadsafe_function_call_js call_js_cb, + napi_threadsafe_function* result) { + CHECK_ENV(env); + CHECK_ARG(func); + CHECK_ARG(async_resource_name); + RETURN_STATUS_IF_FALSE(initial_thread_count > 0, napi_invalid_arg); + CHECK_ARG(result); + + napi_status status = napi_ok; + + v8::Local func_local = + v8impl::V8LocalValueFromJsValue(func).As(); + + v8::Local v8_context = env->isolate->GetCurrentContext(); + + v8::Local async_resource_local; + if (async_resource == nullptr) { + async_resource_local = v8::Object::New(env->isolate); + } else { + async_resource_local = + v8impl::V8LocalValueFromJsValue(async_resource).As(); + } + + v8::Local async_resource_name_local = + v8impl::V8LocalValueFromJsValue(async_resource_name).As(); + + TsFn* ts_fn = new TsFn(func_local, + async_resource_local, + async_resource_name_local, + initial_thread_count, + context, + max_queue_size, + env, + thread_finalize_data, + thread_finalize_cb, + call_js_cb); + + if (ts_fn == nullptr) { + status = napi_generic_failure; + } else { + // Init deletes ts_fn upon failure. + status = ts_fn->Init(); + if (status == napi_ok) { + *result = reinterpret_cast(ts_fn); + } + } + + return napi_set_last_error(status); +} + +NAPI_EXTERN napi_status +napi_get_threadsafe_function_context(napi_threadsafe_function func, + void** result) { + CHECK(func != nullptr); + CHECK(result != nullptr); + + *result = reinterpret_cast(func)->Context(); + return napi_ok; +} + +NAPI_EXTERN napi_status +napi_call_threadsafe_function(napi_threadsafe_function func, + void* data, + napi_threadsafe_function_call_mode is_blocking) { + CHECK(func != nullptr); + return reinterpret_cast(func)->Push(data, is_blocking); +} + +NAPI_EXTERN napi_status +napi_acquire_threadsafe_function(napi_threadsafe_function func) { + CHECK(func != nullptr); + return reinterpret_cast(func)->Acquire(); +} + +NAPI_EXTERN napi_status +napi_release_threadsafe_function(napi_threadsafe_function func, + napi_threadsafe_function_release_mode mode) { + CHECK(func != nullptr); + return reinterpret_cast(func)->Release(mode); +} + +NAPI_EXTERN napi_status +napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func) { + CHECK(func != nullptr); + return reinterpret_cast(func)->Unref(); +} + +NAPI_EXTERN napi_status +napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func) { + CHECK(func != nullptr); + return reinterpret_cast(func)->Ref(); +}