From 0a90df2fcb511c56fdee0d193fa7c8a5b6212e33 Mon Sep 17 00:00:00 2001 From: Jinho Bang Date: Mon, 4 Mar 2019 15:13:08 +0900 Subject: [PATCH] Implement ThreadSafeFunction class This PR is implementing ThreadSafeFunction class wraps napi_threadsafe_function features. FYI, the test files that included in this PR have come from Node.js repo[1]. They've been rewritten based on C++ and node-addon-api. [1] https://github.com/nodejs/node/tree/e800f9d/test/node-api/test_threadsafe_function PR-URL: https://github.com/nodejs/node-addon-api/pull/442/ Fixes: https://github.com/nodejs/node-addon-api/issues/312/ Reviewed-By: Michael Dawson Reviewed-By: Gabriel Schulhof --- napi-inl.h | 391 ++++++++++++++++++ napi.h | 201 +++++++++ test/binding.cc | 6 + test/binding.gyp | 1 + test/index.js | 1 + .../threadsafe_function.cc | 179 ++++++++ .../threadsafe_function.js | 170 ++++++++ 7 files changed, 949 insertions(+) create mode 100644 test/threadsafe_function/threadsafe_function.cc create mode 100644 test/threadsafe_function/threadsafe_function.js diff --git a/napi-inl.h b/napi-inl.h index 7f94d4858..0db3c9830 100644 --- a/napi-inl.h +++ b/napi-inl.h @@ -125,6 +125,78 @@ struct FinalizeData { Hint* hint; }; +template , + typename FinalizerDataType=void> +struct ThreadSafeFinalize { + static inline + void Wrapper(napi_env env, void* rawFinalizeData, void* /* rawContext */) { + if (rawFinalizeData == nullptr) + return; + + ThreadSafeFinalize* finalizeData = + static_cast(rawFinalizeData); + finalizeData->callback(Env(env)); + if (finalizeData->tsfn) { + *finalizeData->tsfn = nullptr; + } + delete finalizeData; + } + + static inline + void FinalizeWrapperWithData(napi_env env, + void* rawFinalizeData, + void* /* rawContext */) { + if (rawFinalizeData == nullptr) + return; + + ThreadSafeFinalize* finalizeData = + static_cast(rawFinalizeData); + finalizeData->callback(Env(env), finalizeData->data); + if (finalizeData->tsfn) { + *finalizeData->tsfn = nullptr; + } + delete finalizeData; + } + + static inline + void FinalizeWrapperWithContext(napi_env env, + void* rawFinalizeData, + void* rawContext) { + if (rawFinalizeData == nullptr) + return; + + ThreadSafeFinalize* finalizeData = + static_cast(rawFinalizeData); + finalizeData->callback(Env(env), static_cast(rawContext)); + if (finalizeData->tsfn) { + *finalizeData->tsfn = nullptr; + } + delete finalizeData; + } + + static inline + void FinalizeFinalizeWrapperWithDataAndContext(napi_env env, + void* rawFinalizeData, + void* rawContext) { + if (rawFinalizeData == nullptr) + return; + + ThreadSafeFinalize* finalizeData = + static_cast(rawFinalizeData); + finalizeData->callback(Env(env), finalizeData->data, + static_cast(rawContext)); + if (finalizeData->tsfn) { + *finalizeData->tsfn = nullptr; + } + delete finalizeData; + } + + FinalizerDataType* data; + Finalizer callback; + napi_threadsafe_function* tsfn; +}; + template struct AccessorCallbackData { static inline @@ -3661,6 +3733,325 @@ inline void AsyncWorker::OnWorkComplete( } } +//////////////////////////////////////////////////////////////////////////////// +// ThreadSafeFunction class +//////////////////////////////////////////////////////////////////////////////// + +// static +template +inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env, + const Function& callback, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount) { + return New(env, callback, Object(), resourceName, maxQueueSize, + initialThreadCount); +} + +// static +template +inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env, + const Function& callback, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context) { + return New(env, callback, Object(), resourceName, maxQueueSize, + initialThreadCount, context); +} + +// static +template +inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env, + const Function& callback, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + Finalizer finalizeCallback) { + return New(env, callback, Object(), resourceName, maxQueueSize, + initialThreadCount, finalizeCallback); +} + +// static +template +inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env, + const Function& callback, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + Finalizer finalizeCallback, + FinalizerDataType* data) { + return New(env, callback, Object(), resourceName, maxQueueSize, + initialThreadCount, finalizeCallback, data); +} + +// static +template +inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env, + const Function& callback, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback) { + return New(env, callback, Object(), resourceName, maxQueueSize, + initialThreadCount, context, finalizeCallback); +} + +// static +template +inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env, + const Function& callback, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback, + FinalizerDataType* data) { + return New(env, callback, Object(), resourceName, maxQueueSize, + initialThreadCount, context, finalizeCallback, data); +} + +// static +template +inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env, + const Function& callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount) { + return New(env, callback, resource, resourceName, maxQueueSize, + initialThreadCount, static_cast(nullptr) /* context */); +} + +// static +template +inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env, + const Function& callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context) { + return New(env, callback, resource, resourceName, maxQueueSize, + initialThreadCount, context, + [](Env, ContextType*) {} /* empty finalizer */); +} + +// static +template +inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env, + const Function& callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + Finalizer finalizeCallback) { + return New(env, callback, resource, resourceName, maxQueueSize, + initialThreadCount, static_cast(nullptr) /* context */, + finalizeCallback, static_cast(nullptr) /* data */, + details::ThreadSafeFinalize::Wrapper); +} + +// static +template +inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env, + const Function& callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + Finalizer finalizeCallback, + FinalizerDataType* data) { + return New(env, callback, resource, resourceName, maxQueueSize, + initialThreadCount, static_cast(nullptr) /* context */, + finalizeCallback, data, + details::ThreadSafeFinalize< + void, Finalizer, FinalizerDataType>::FinalizeWrapperWithData); +} + +// static +template +inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env, + const Function& callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback) { + return New(env, callback, resource, resourceName, maxQueueSize, + initialThreadCount, context, finalizeCallback, + static_cast(nullptr) /* data */, + details::ThreadSafeFinalize< + ContextType, Finalizer>::FinalizeWrapperWithContext); +} + +// static +template +inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env, + const Function& callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback, + FinalizerDataType* data) { + return New(env, callback, resource, resourceName, maxQueueSize, + initialThreadCount, context, finalizeCallback, data, + details::ThreadSafeFinalize::FinalizeFinalizeWrapperWithDataAndContext); +} + +inline ThreadSafeFunction::ThreadSafeFunction() + : _tsfn(new napi_threadsafe_function(nullptr)) { +} + +inline ThreadSafeFunction::ThreadSafeFunction( + napi_threadsafe_function tsfn) + : _tsfn(new napi_threadsafe_function(tsfn)) { +} + +inline ThreadSafeFunction::ThreadSafeFunction(ThreadSafeFunction&& other) + : _tsfn(std::move(other._tsfn)) { + other._tsfn.reset(); +} + +inline ThreadSafeFunction& ThreadSafeFunction::operator =( + ThreadSafeFunction&& other) { + if (*_tsfn != nullptr) { + Error::Fatal("ThreadSafeFunction::operator =", + "You cannot assign a new TSFN because existing one is still alive."); + return *this; + } + _tsfn = std::move(other._tsfn); + other._tsfn.reset(); + return *this; +} + +inline napi_status ThreadSafeFunction::BlockingCall() const { + return CallInternal(nullptr, napi_tsfn_blocking); +} + +template +inline napi_status ThreadSafeFunction::BlockingCall( + Callback callback) const { + return CallInternal(new CallbackWrapper(callback), napi_tsfn_blocking); +} + +template +inline napi_status ThreadSafeFunction::BlockingCall( + DataType* data, Callback callback) const { + auto wrapper = [data, callback](Env env, Function jsCallback) { + callback(env, jsCallback, data); + }; + return CallInternal(new CallbackWrapper(wrapper), napi_tsfn_blocking); +} + +inline napi_status ThreadSafeFunction::NonBlockingCall() const { + return CallInternal(nullptr, napi_tsfn_nonblocking); +} + +template +inline napi_status ThreadSafeFunction::NonBlockingCall( + Callback callback) const { + return CallInternal(new CallbackWrapper(callback), napi_tsfn_nonblocking); +} + +template +inline napi_status ThreadSafeFunction::NonBlockingCall( + DataType* data, Callback callback) const { + auto wrapper = [data, callback](Env env, Function jsCallback) { + callback(env, jsCallback, data); + }; + return CallInternal(new CallbackWrapper(wrapper), napi_tsfn_nonblocking); +} + +inline napi_status ThreadSafeFunction::Acquire() const { + return napi_acquire_threadsafe_function(*_tsfn); +} + +inline napi_status ThreadSafeFunction::Release() { + return napi_release_threadsafe_function(*_tsfn, napi_tsfn_release); +} + +inline napi_status ThreadSafeFunction::Abort() { + return napi_release_threadsafe_function(*_tsfn, napi_tsfn_abort); +} + +inline ThreadSafeFunction::ConvertibleContext +ThreadSafeFunction::GetContext() const { + void* context; + napi_get_threadsafe_function_context(*_tsfn, &context); + return ConvertibleContext({ context }); +} + +// static +template +inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env, + const Function& callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback, + FinalizerDataType* data, + napi_finalize wrapper) { + static_assert(details::can_make_string::value + || std::is_convertible::value, + "Resource name should be convertible to the string type"); + + ThreadSafeFunction tsfn; + auto* finalizeData = new details::ThreadSafeFinalize({ data, finalizeCallback, tsfn._tsfn.get() }); + napi_status status = napi_create_threadsafe_function(env, callback, resource, + Value::From(env, resourceName), maxQueueSize, initialThreadCount, + finalizeData, wrapper, context, CallJS, tsfn._tsfn.get()); + if (status != napi_ok) { + delete finalizeData; + NAPI_THROW_IF_FAILED(env, status, ThreadSafeFunction()); + } + + return tsfn; +} + +inline napi_status ThreadSafeFunction::CallInternal( + CallbackWrapper* callbackWrapper, + napi_threadsafe_function_call_mode mode) const { + napi_status status = napi_call_threadsafe_function( + *_tsfn, callbackWrapper, mode); + if (status != napi_ok && callbackWrapper != nullptr) { + delete callbackWrapper; + } + + return status; +} + +// static +inline void ThreadSafeFunction::CallJS(napi_env env, + napi_value jsCallback, + void* /* context */, + void* data) { + if (env == nullptr && jsCallback == nullptr) { + return; + } + + if (data != nullptr) { + auto* callbackWrapper = static_cast(data); + (*callbackWrapper)(env, Function(env, jsCallback)); + delete callbackWrapper; + } else if (jsCallback != nullptr) { + Function(env, jsCallback).Call({}); + } +} + //////////////////////////////////////////////////////////////////////////////// // Memory Management class //////////////////////////////////////////////////////////////////////////////// diff --git a/napi.h b/napi.h index fcf34406d..287f16da7 100644 --- a/napi.h +++ b/napi.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include @@ -1828,6 +1829,206 @@ namespace Napi { bool _suppress_destruct; }; + class ThreadSafeFunction { + public: + // This API may only be called from the main thread. + template + static ThreadSafeFunction New(napi_env env, + const Function& callback, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount); + + // This API may only be called from the main thread. + template + static ThreadSafeFunction New(napi_env env, + const Function& callback, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context); + + // This API may only be called from the main thread. + template + static ThreadSafeFunction New(napi_env env, + const Function& callback, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + Finalizer finalizeCallback); + + // This API may only be called from the main thread. + template + static ThreadSafeFunction New(napi_env env, + const Function& callback, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + Finalizer finalizeCallback, + FinalizerDataType* data); + + // This API may only be called from the main thread. + template + static ThreadSafeFunction New(napi_env env, + const Function& callback, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback); + + // This API may only be called from the main thread. + template + static ThreadSafeFunction New(napi_env env, + const Function& callback, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback, + FinalizerDataType* data); + + // This API may only be called from the main thread. + template + static ThreadSafeFunction New(napi_env env, + const Function& callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount); + + // This API may only be called from the main thread. + template + static ThreadSafeFunction New(napi_env env, + const Function& callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context); + + // This API may only be called from the main thread. + template + static ThreadSafeFunction New(napi_env env, + const Function& callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + Finalizer finalizeCallback); + + // This API may only be called from the main thread. + template + static ThreadSafeFunction New(napi_env env, + const Function& callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + Finalizer finalizeCallback, + FinalizerDataType* data); + + // This API may only be called from the main thread. + template + static ThreadSafeFunction New(napi_env env, + const Function& callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback); + + // This API may only be called from the main thread. + template + static ThreadSafeFunction New(napi_env env, + const Function& callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback, + FinalizerDataType* data); + + ThreadSafeFunction(); + ThreadSafeFunction(napi_threadsafe_function tsFunctionValue); + + ThreadSafeFunction(ThreadSafeFunction&& other); + ThreadSafeFunction& operator=(ThreadSafeFunction&& other); + + // This API may be called from any thread. + napi_status BlockingCall() const; + + // This API may be called from any thread. + template + napi_status BlockingCall(Callback callback) const; + + // This API may be called from any thread. + template + napi_status BlockingCall(DataType* data, Callback callback) const; + + // This API may be called from any thread. + napi_status NonBlockingCall() const; + + // This API may be called from any thread. + template + napi_status NonBlockingCall(Callback callback) const; + + // This API may be called from any thread. + template + napi_status NonBlockingCall(DataType* data, Callback callback) const; + + // This API may be called from any thread. + napi_status Acquire() const; + + // This API may be called from any thread. + napi_status Release(); + + // This API may be called from any thread. + napi_status Abort(); + + struct ConvertibleContext + { + template + operator T*() { return static_cast(context); } + void* context; + }; + + // This API may be called from any thread. + ConvertibleContext GetContext() const; + + private: + using CallbackWrapper = std::function; + + template + static ThreadSafeFunction New(napi_env env, + const Function& callback, + const Object& resource, + ResourceString resourceName, + size_t maxQueueSize, + size_t initialThreadCount, + ContextType* context, + Finalizer finalizeCallback, + FinalizerDataType* data, + napi_finalize wrapper); + + napi_status CallInternal(CallbackWrapper* callbackWrapper, + napi_threadsafe_function_call_mode mode) const; + + static void CallJS(napi_env env, + napi_value jsCallback, + void* context, + void* data); + + std::unique_ptr _tsfn; + }; + // Memory management. class MemoryManagement { public: diff --git a/test/binding.cc b/test/binding.cc index 4a5fec15c..141493e94 100644 --- a/test/binding.cc +++ b/test/binding.cc @@ -33,6 +33,9 @@ Object InitObject(Env env); Object InitObjectDeprecated(Env env); #endif // !NODE_ADDON_API_DISABLE_DEPRECATED Object InitPromise(Env env); +#if (NAPI_VERSION > 3) +Object InitThreadSafeFunction(Env env); +#endif Object InitTypedArray(Env env); Object InitObjectWrap(Env env); Object InitObjectReference(Env env); @@ -71,6 +74,9 @@ Object Init(Env env, Object exports) { exports.Set("object_deprecated", InitObjectDeprecated(env)); #endif // !NODE_ADDON_API_DISABLE_DEPRECATED exports.Set("promise", InitPromise(env)); +#if (NAPI_VERSION > 3) + exports.Set("threadsafe_function", InitThreadSafeFunction(env)); +#endif exports.Set("typedarray", InitTypedArray(env)); exports.Set("objectwrap", InitObjectWrap(env)); exports.Set("objectreference", InitObjectReference(env)); diff --git a/test/binding.gyp b/test/binding.gyp index 0e2b7d05c..56d636adc 100644 --- a/test/binding.gyp +++ b/test/binding.gyp @@ -32,6 +32,7 @@ 'object/object.cc', 'object/set_property.cc', 'promise.cc', + 'threadsafe_function/threadsafe_function.cc', 'typedarray.cc', 'objectwrap.cc', 'objectreference.cc', diff --git a/test/index.js b/test/index.js index e46c7b162..9a5409b9a 100644 --- a/test/index.js +++ b/test/index.js @@ -36,6 +36,7 @@ let testModules = [ 'object/object_deprecated', 'object/set_property', 'promise', + 'threadsafe_function/threadsafe_function', 'typedarray', 'typedarray-bigint', 'objectwrap', diff --git a/test/threadsafe_function/threadsafe_function.cc b/test/threadsafe_function/threadsafe_function.cc new file mode 100644 index 000000000..529bfb308 --- /dev/null +++ b/test/threadsafe_function/threadsafe_function.cc @@ -0,0 +1,179 @@ +#include +#include +#include "napi.h" + +using namespace Napi; + +constexpr size_t ARRAY_LENGTH = 10; +constexpr size_t MAX_QUEUE_SIZE = 2; + +static std::thread threads[2]; +static ThreadSafeFunction tsfn; + +struct ThreadSafeFunctionInfo { + enum CallType { + DEFAULT, + BLOCKING, + NON_BLOCKING + } type; + bool abort; + bool startSecondary; + FunctionReference jsFinalizeCallback; + uint32_t maxQueueSize; +} tsfnInfo; + +// Thread data to transmit to JS +static int ints[ARRAY_LENGTH]; + +static void SecondaryThread() { + if (tsfn.Release() != napi_ok) { + Error::Fatal("SecondaryThread", "ThreadSafeFunction.Release() failed"); + } +} + +// Source thread producing the data +static void DataSourceThread() { + ThreadSafeFunctionInfo* info = tsfn.GetContext(); + + if (info->startSecondary) { + if (tsfn.Acquire() != napi_ok) { + Error::Fatal("DataSourceThread", "ThreadSafeFunction.Acquire() failed"); + } + + threads[1] = std::thread(SecondaryThread); + } + + bool queueWasFull = false; + bool queueWasClosing = false; + for (int index = ARRAY_LENGTH - 1; index > -1 && !queueWasClosing; index--) { + napi_status status = napi_generic_failure; + auto callback = [](Env env, Function jsCallback, int* data) { + jsCallback.Call({ Number::New(env, *data) }); + }; + + switch (info->type) { + case ThreadSafeFunctionInfo::DEFAULT: + status = tsfn.BlockingCall(); + break; + case ThreadSafeFunctionInfo::BLOCKING: + status = tsfn.BlockingCall(&ints[index], callback); + break; + case ThreadSafeFunctionInfo::NON_BLOCKING: + status = tsfn.NonBlockingCall(&ints[index], callback); + break; + } + + if (info->maxQueueSize == 0) { + // Let's make this thread really busy for 200 ms to give the main thread a + // chance to abort. + auto start = std::chrono::high_resolution_clock::now(); + constexpr auto MS_200 = std::chrono::milliseconds(200); + for (; std::chrono::high_resolution_clock::now() - start < MS_200;); + } + + switch (status) { + case napi_queue_full: + queueWasFull = true; + index++; + // fall through + + case napi_ok: + continue; + + case napi_closing: + queueWasClosing = true; + break; + + default: + Error::Fatal("DataSourceThread", "ThreadSafeFunction.*Call() failed"); + } + } + + if (info->type == ThreadSafeFunctionInfo::NON_BLOCKING && !queueWasFull) { + Error::Fatal("DataSourceThread", "Queue was never full"); + } + + if (info->abort && !queueWasClosing) { + Error::Fatal("DataSourceThread", "Queue was never closing"); + } + + if (!queueWasClosing && tsfn.Release() != napi_ok) { + Error::Fatal("DataSourceThread", "ThreadSafeFunction.Release() failed"); + } +} + +static Value StopThread(const CallbackInfo& info) { + tsfnInfo.jsFinalizeCallback = Napi::Persistent(info[0].As()); + bool abort = info[1].As(); + if (abort) { + tsfn.Abort(); + } else { + tsfn.Release(); + } + return Value(); +} + +// Join the thread and inform JS that we're done. +static void JoinTheThreads(Env /* env */, + std::thread* theThreads, + ThreadSafeFunctionInfo* info) { + theThreads[0].join(); + if (info->startSecondary) { + theThreads[1].join(); + } + + info->jsFinalizeCallback.Call({}); + info->jsFinalizeCallback.Reset(); +} + +static Value StartThreadInternal(const CallbackInfo& info, + ThreadSafeFunctionInfo::CallType type) { + tsfnInfo.type = type; + tsfnInfo.abort = info[1].As(); + tsfnInfo.startSecondary = info[2].As(); + tsfnInfo.maxQueueSize = info[3].As().Uint32Value(); + + tsfn = ThreadSafeFunction::New(info.Env(), info[0].As(), + "Test", tsfnInfo.maxQueueSize, 2, &tsfnInfo, JoinTheThreads, threads); + + threads[0] = std::thread(DataSourceThread); + + return Value(); +} + +static Value Release(const CallbackInfo& /* info */) { + if (tsfn.Release() != napi_ok) { + Error::Fatal("Release", "ThreadSafeFunction.Release() failed"); + } + return Value(); +} + +static Value StartThread(const CallbackInfo& info) { + return StartThreadInternal(info, ThreadSafeFunctionInfo::BLOCKING); +} + +static Value StartThreadNonblocking(const CallbackInfo& info) { + return StartThreadInternal(info, ThreadSafeFunctionInfo::NON_BLOCKING); +} + +static Value StartThreadNoNative(const CallbackInfo& info) { + return StartThreadInternal(info, ThreadSafeFunctionInfo::DEFAULT); +} + +Object InitThreadSafeFunction(Env env) { + for (size_t index = 0; index < ARRAY_LENGTH; index++) { + ints[index] = index; + } + + Object exports = Object::New(env); + exports["ARRAY_LENGTH"] = Number::New(env, ARRAY_LENGTH); + exports["MAX_QUEUE_SIZE"] = Number::New(env, MAX_QUEUE_SIZE); + exports["startThread"] = Function::New(env, StartThread); + exports["startThreadNoNative"] = Function::New(env, StartThreadNoNative); + exports["startThreadNonblocking"] = + Function::New(env, StartThreadNonblocking); + exports["stopThread"] = Function::New(env, StopThread); + exports["release"] = Function::New(env, Release); + + return exports; +} diff --git a/test/threadsafe_function/threadsafe_function.js b/test/threadsafe_function/threadsafe_function.js new file mode 100644 index 000000000..710c21212 --- /dev/null +++ b/test/threadsafe_function/threadsafe_function.js @@ -0,0 +1,170 @@ +'use strict'; + +const buildType = process.config.target_defaults.default_configuration; +const assert = require('assert'); +const common = require('../common'); + +test(require(`../build/${buildType}/binding.node`)); +test(require(`../build/${buildType}/binding_noexcept.node`)); + +function test(binding) { + const expectedArray = (function(arrayLength) { + const result = []; + for (let index = 0; index < arrayLength; index++) { + result.push(arrayLength - 1 - index); + } + return result; + })(binding.threadsafe_function.ARRAY_LENGTH); + + function testWithJSMarshaller({ + threadStarter, + quitAfter, + abort, + maxQueueSize, + launchSecondary }) { + return new Promise((resolve) => { + const array = []; + binding.threadsafe_function[threadStarter](function testCallback(value) { + array.push(value); + if (array.length === quitAfter) { + setImmediate(() => { + binding.threadsafe_function.stopThread(common.mustCall(() => { + resolve(array); + }), !!abort); + }); + } + }, !!abort, !!launchSecondary, maxQueueSize); + if (threadStarter === 'startThreadNonblocking') { + // Let's make this thread really busy for a short while to ensure that + // the queue fills and the thread receives a napi_queue_full. + const start = Date.now(); + while (Date.now() - start < 200); + } + }); + } + + new Promise(function testWithoutJSMarshaller(resolve) { + let callCount = 0; + binding.threadsafe_function.startThreadNoNative(function testCallback() { + callCount++; + + // The default call-into-JS implementation passes no arguments. + assert.strictEqual(arguments.length, 0); + if (callCount === binding.threadsafe_function.ARRAY_LENGTH) { + setImmediate(() => { + binding.threadsafe_function.stopThread(common.mustCall(() => { + resolve(); + }), false); + }); + } + }, false /* abort */, false /* launchSecondary */, + binding.threadsafe_function.MAX_QUEUE_SIZE); + }) + + // Start the thread in blocking mode, and assert that all values are passed. + // Quit after it's done. + .then(() => testWithJSMarshaller({ + threadStarter: 'startThread', + maxQueueSize: binding.threadsafe_function.MAX_QUEUE_SIZE, + quitAfter: binding.threadsafe_function.ARRAY_LENGTH + })) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // Start the thread in blocking mode with an infinite queue, and assert that + // all values are passed. Quit after it's done. + .then(() => testWithJSMarshaller({ + threadStarter: 'startThread', + maxQueueSize: 0, + quitAfter: binding.threadsafe_function.ARRAY_LENGTH + })) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // Start the thread in non-blocking mode, and assert that all values are + // passed. Quit after it's done. + .then(() => testWithJSMarshaller({ + threadStarter: 'startThreadNonblocking', + maxQueueSize: binding.threadsafe_function.MAX_QUEUE_SIZE, + quitAfter: binding.threadsafe_function.ARRAY_LENGTH + })) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // Start the thread in blocking mode, and assert that all values are passed. + // Quit early, but let the thread finish. + .then(() => testWithJSMarshaller({ + threadStarter: 'startThread', + maxQueueSize: binding.threadsafe_function.MAX_QUEUE_SIZE, + quitAfter: 1 + })) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // Start the thread in blocking mode with an infinite queue, and assert that + // all values are passed. Quit early, but let the thread finish. + .then(() => testWithJSMarshaller({ + threadStarter: 'startThread', + maxQueueSize: 0, + quitAfter: 1 + })) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + + // Start the thread in non-blocking mode, and assert that all values are + // passed. Quit early, but let the thread finish. + .then(() => testWithJSMarshaller({ + threadStarter: 'startThreadNonblocking', + maxQueueSize: binding.threadsafe_function.MAX_QUEUE_SIZE, + quitAfter: 1 + })) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // Start the thread in blocking mode, and assert that all values are passed. + // Quit early, but let the thread finish. Launch a secondary thread to test + // the reference counter incrementing functionality. + .then(() => testWithJSMarshaller({ + threadStarter: 'startThread', + quitAfter: 1, + maxQueueSize: binding.threadsafe_function.MAX_QUEUE_SIZE, + launchSecondary: true + })) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // Start the thread in non-blocking mode, and assert that all values are + // passed. Quit early, but let the thread finish. Launch a secondary thread + // to test the reference counter incrementing functionality. + .then(() => testWithJSMarshaller({ + threadStarter: 'startThreadNonblocking', + quitAfter: 1, + maxQueueSize: binding.threadsafe_function.MAX_QUEUE_SIZE, + launchSecondary: true + })) + .then((result) => assert.deepStrictEqual(result, expectedArray)) + + // Start the thread in blocking mode, and assert that it could not finish. + // Quit early by aborting. + .then(() => testWithJSMarshaller({ + threadStarter: 'startThread', + quitAfter: 1, + maxQueueSize: binding.threadsafe_function.MAX_QUEUE_SIZE, + abort: true + })) + .then((result) => assert.strictEqual(result.indexOf(0), -1)) + + // Start the thread in blocking mode with an infinite queue, and assert that + // it could not finish. Quit early by aborting. + .then(() => testWithJSMarshaller({ + threadStarter: 'startThread', + quitAfter: 1, + maxQueueSize: 0, + abort: true + })) + .then((result) => assert.strictEqual(result.indexOf(0), -1)) + + // Start the thread in non-blocking mode, and assert that it could not finish. + // Quit early and aborting. + .then(() => testWithJSMarshaller({ + threadStarter: 'startThreadNonblocking', + quitAfter: 1, + maxQueueSize: binding.threadsafe_function.MAX_QUEUE_SIZE, + abort: true + })) + .then((result) => assert.strictEqual(result.indexOf(0), -1)) +}