diff --git a/src/node_api.cc b/src/node_api.cc index ad0796570a4c14..c0ea0a02048166 100644 --- a/src/node_api.cc +++ b/src/node_api.cc @@ -3782,7 +3782,7 @@ class TsFn: public node::AsyncResource { if (thread_count == 0 || mode == napi_tsfn_abort) { if (!is_closing) { is_closing = (mode == napi_tsfn_abort); - if (is_closing) { + if (is_closing && max_queue_size > 0) { cond->Signal(lock); } if (uv_async_send(&async) != 0) { @@ -3872,7 +3872,9 @@ class TsFn: public node::AsyncResource { if (size == 0) { if (thread_count == 0) { is_closing = true; - cond->Signal(lock); + if (max_queue_size > 0) { + cond->Signal(lock); + } CloseHandlesAndMaybeDelete(); } else { if (uv_idle_stop(&idle) != 0) { @@ -3939,7 +3941,9 @@ class TsFn: public node::AsyncResource { if (set_closing) { node::Mutex::ScopedLock lock(this->mutex); is_closing = true; - cond->Signal(lock); + if (max_queue_size > 0) { + cond->Signal(lock); + } } if (handles_closing) { return; diff --git a/test/addons-napi/test_threadsafe_function/binding.c b/test/addons-napi/test_threadsafe_function/binding.c index 551705b1f21074..354012a288b3ca 100644 --- a/test/addons-napi/test_threadsafe_function/binding.c +++ b/test/addons-napi/test_threadsafe_function/binding.c @@ -9,6 +9,7 @@ #include "../common.h" #define ARRAY_LENGTH 10 +#define MAX_QUEUE_SIZE 2 static uv_thread_t uv_threads[2]; static napi_threadsafe_function ts_fn; @@ -18,6 +19,7 @@ typedef struct { napi_threadsafe_function_release_mode abort; bool start_secondary; napi_ref js_finalize_cb; + uint32_t max_queue_size; } ts_fn_hint; static ts_fn_hint ts_info; @@ -71,6 +73,12 @@ static void data_source_thread(void* data) { for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) { status = napi_call_threadsafe_function(ts_fn, &ints[index], ts_fn_info->block_on_full); + if (ts_fn_info->max_queue_size == 0) { + // Let's make this thread really busy for 200 ms to give the main thread a + // chance to abort. + uint64_t start = uv_hrtime(); + for (; uv_hrtime() - start < 200000000;); + } switch (status) { case napi_queue_full: queue_was_full = true; @@ -167,8 +175,8 @@ static napi_value StartThreadInternal(napi_env env, napi_callback_info info, napi_threadsafe_function_call_js cb, bool block_on_full) { - size_t argc = 3; - napi_value argv[3]; + size_t argc = 4; + napi_value argv[4]; ts_info.block_on_full = (block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking); @@ -178,8 +186,18 @@ static napi_value StartThreadInternal(napi_env env, napi_value async_name; NAPI_CALL(env, napi_create_string_utf8(env, "N-API Thread-safe Function Test", NAPI_AUTO_LENGTH, &async_name)); - NAPI_CALL(env, napi_create_threadsafe_function(env, argv[0], NULL, async_name, - 2, 2, uv_threads, join_the_threads, &ts_info, cb, &ts_fn)); + NAPI_CALL(env, napi_get_value_uint32(env, argv[3], &ts_info.max_queue_size)); + NAPI_CALL(env, napi_create_threadsafe_function(env, + argv[0], + NULL, + async_name, + ts_info.max_queue_size, + 2, + uv_threads, + join_the_threads, + &ts_info, + cb, + &ts_fn)); bool abort; NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort)); ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release; @@ -224,8 +242,9 @@ static napi_value Init(napi_env env, napi_value exports) { for (index = 0; index < ARRAY_LENGTH; index++) { ints[index] = index; } - napi_value js_array_length; + napi_value js_array_length, js_max_queue_size; napi_create_uint32(env, ARRAY_LENGTH, &js_array_length); + napi_create_uint32(env, MAX_QUEUE_SIZE, &js_max_queue_size); napi_property_descriptor properties[] = { { @@ -238,6 +257,16 @@ static napi_value Init(napi_env env, napi_value exports) { napi_enumerable, NULL }, + { + "MAX_QUEUE_SIZE", + NULL, + NULL, + NULL, + NULL, + js_max_queue_size, + napi_enumerable, + NULL + }, DECLARE_NAPI_PROPERTY("StartThread", StartThread), DECLARE_NAPI_PROPERTY("StartThreadNoNative", StartThreadNoNative), DECLARE_NAPI_PROPERTY("StartThreadNonblocking", StartThreadNonblocking), diff --git a/test/addons-napi/test_threadsafe_function/test.js b/test/addons-napi/test_threadsafe_function/test.js index d998853559ddda..cfd00285308f5d 100644 --- a/test/addons-napi/test_threadsafe_function/test.js +++ b/test/addons-napi/test_threadsafe_function/test.js @@ -23,7 +23,7 @@ if (process.argv[2] === 'child') { if (callCount === 2) { binding.Unref(); } - }, false /* abort */, true /* launchSecondary */); + }, false /* abort */, true /* launchSecondary */, +process.argv[3]); // Release the thread-safe function from the main thread so that it may be // torn down via the environment cleanup handler. @@ -35,6 +35,7 @@ function testWithJSMarshaller({ threadStarter, quitAfter, abort, + maxQueueSize, launchSecondary }) { return new Promise((resolve) => { const array = []; @@ -47,7 +48,7 @@ function testWithJSMarshaller({ }), !!abort); }); } - }, !!abort, !!launchSecondary); + }, !!abort, !!launchSecondary, maxQueueSize); if (threadStarter === 'StartThreadNonblocking') { // Let's make this thread really busy for a short while to ensure that // the queue fills and the thread receives a napi_queue_full. @@ -57,6 +58,24 @@ function testWithJSMarshaller({ }); } +function testUnref(queueSize) { + return new Promise((resolve, reject) => { + let output = ''; + const child = fork(__filename, ['child', queueSize], { + stdio: [process.stdin, 'pipe', process.stderr, 'ipc'] + }); + child.on('close', (code) => { + if (code === 0) { + resolve(output.match(/\S+/g)); + } else { + reject(new Error('Child process died with code ' + code)); + } + }); + child.stdout.on('data', (data) => (output += data.toString())); + }) + .then((result) => assert.strictEqual(result.indexOf(0), -1)); +} + new Promise(function testWithoutJSMarshaller(resolve) { let callCount = 0; binding.StartThreadNoNative(function testCallback() { @@ -71,13 +90,23 @@ new Promise(function testWithoutJSMarshaller(resolve) { }), false); }); } - }, false /* abort */, false /* launchSecondary */); + }, false /* abort */, false /* launchSecondary */, binding.MAX_QUEUE_SIZE); }) // Start the thread in blocking mode, and assert that all values are passed. // Quit after it's done. .then(() => testWithJSMarshaller({ threadStarter: 'StartThread', + maxQueueSize: binding.MAX_QUEUE_SIZE, + quitAfter: binding.ARRAY_LENGTH +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in blocking mode with an infinite queue, and assert that all +// values are passed. Quit after it's done. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + maxQueueSize: 0, quitAfter: binding.ARRAY_LENGTH })) .then((result) => assert.deepStrictEqual(result, expectedArray)) @@ -86,6 +115,7 @@ new Promise(function testWithoutJSMarshaller(resolve) { // Quit after it's done. .then(() => testWithJSMarshaller({ threadStarter: 'StartThreadNonblocking', + maxQueueSize: binding.MAX_QUEUE_SIZE, quitAfter: binding.ARRAY_LENGTH })) .then((result) => assert.deepStrictEqual(result, expectedArray)) @@ -94,6 +124,16 @@ new Promise(function testWithoutJSMarshaller(resolve) { // Quit early, but let the thread finish. .then(() => testWithJSMarshaller({ threadStarter: 'StartThread', + maxQueueSize: binding.MAX_QUEUE_SIZE, + quitAfter: 1 +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in blocking mode with an infinite queue, and assert that all +// values are passed. Quit early, but let the thread finish. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + maxQueueSize: 0, quitAfter: 1 })) .then((result) => assert.deepStrictEqual(result, expectedArray)) @@ -102,6 +142,7 @@ new Promise(function testWithoutJSMarshaller(resolve) { // Quit early, but let the thread finish. .then(() => testWithJSMarshaller({ threadStarter: 'StartThreadNonblocking', + maxQueueSize: binding.MAX_QUEUE_SIZE, quitAfter: 1 })) .then((result) => assert.deepStrictEqual(result, expectedArray)) @@ -112,6 +153,7 @@ new Promise(function testWithoutJSMarshaller(resolve) { .then(() => testWithJSMarshaller({ threadStarter: 'StartThread', quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, launchSecondary: true })) .then((result) => assert.deepStrictEqual(result, expectedArray)) @@ -122,15 +164,27 @@ new Promise(function testWithoutJSMarshaller(resolve) { .then(() => testWithJSMarshaller({ threadStarter: 'StartThreadNonblocking', quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, launchSecondary: true })) .then((result) => assert.deepStrictEqual(result, expectedArray)) // Start the thread in blocking mode, and assert that it could not finish. -// Quit early and aborting. +// Quit early by aborting. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, + abort: true +})) +.then((result) => assert.strictEqual(result.indexOf(0), -1)) + +// Start the thread in blocking mode with an infinite queue, and assert that it +// could not finish. Quit early by aborting. .then(() => testWithJSMarshaller({ threadStarter: 'StartThread', quitAfter: 1, + maxQueueSize: 0, abort: true })) .then((result) => assert.strictEqual(result.indexOf(0), -1)) @@ -140,25 +194,13 @@ new Promise(function testWithoutJSMarshaller(resolve) { .then(() => testWithJSMarshaller({ threadStarter: 'StartThreadNonblocking', quitAfter: 1, + maxQueueSize: binding.MAX_QUEUE_SIZE, abort: true })) .then((result) => assert.strictEqual(result.indexOf(0), -1)) // Start a child process to test rapid teardown -.then(() => { - return new Promise((resolve, reject) => { - let output = ''; - const child = fork(__filename, ['child'], { - stdio: [process.stdin, 'pipe', process.stderr, 'ipc'] - }); - child.on('close', (code) => { - if (code === 0) { - resolve(output.match(/\S+/g)); - } else { - reject(new Error('Child process died with code ' + code)); - } - }); - child.stdout.on('data', (data) => (output += data.toString())); - }); -}) -.then((result) => assert.strictEqual(result.indexOf(0), -1)); +.then(() => testUnref(binding.MAX_QUEUE_SIZE)) + +// Start a child process with an infinite queue to test rapid teardown +.then(() => testUnref(0));