Skip to content

Commit 14a2a00

Browse files
indutnytargos
authored andcommitted
node-api: faster threadsafe_function
Invoke threadsafe_function during the same tick and avoid marshalling costs between threads and/or churning event loop if either: 1. There's a queued call already 2. `Push()` is called while the main thread was running threadsafe_function PR-URL: #38506 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Rich Trott <rtrott@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
1 parent ae9128e commit 14a2a00

File tree

3 files changed

+66
-35
lines changed

3 files changed

+66
-35
lines changed

src/node_api.cc

Lines changed: 55 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "tracing/traced_value.h"
1313
#include "util-inl.h"
1414

15+
#include <atomic>
1516
#include <memory>
1617

1718
struct node_napi_env__ : public napi_env__ {
@@ -137,6 +138,7 @@ class ThreadSafeFunction : public node::AsyncResource {
137138
*v8::String::Utf8Value(env_->isolate, name)),
138139
thread_count(thread_count_),
139140
is_closing(false),
141+
dispatch_state(kDispatchIdle),
140142
context(context_),
141143
max_queue_size(max_queue_size_),
142144
env(env_),
@@ -176,10 +178,8 @@ class ThreadSafeFunction : public node::AsyncResource {
176178
return napi_closing;
177179
}
178180
} else {
179-
if (uv_async_send(&async) != 0) {
180-
return napi_generic_failure;
181-
}
182181
queue.push(data);
182+
Send();
183183
return napi_ok;
184184
}
185185
}
@@ -211,9 +211,7 @@ class ThreadSafeFunction : public node::AsyncResource {
211211
if (is_closing && max_queue_size > 0) {
212212
cond->Signal(lock);
213213
}
214-
if (uv_async_send(&async) != 0) {
215-
return napi_generic_failure;
216-
}
214+
Send();
217215
}
218216
}
219217

@@ -238,7 +236,6 @@ class ThreadSafeFunction : public node::AsyncResource {
238236
cond = std::make_unique<node::ConditionVariable>();
239237
}
240238
if (max_queue_size == 0 || cond) {
241-
CHECK_EQ(0, uv_idle_init(loop, &idle));
242239
return napi_ok;
243240
}
244241

@@ -263,21 +260,46 @@ class ThreadSafeFunction : public node::AsyncResource {
263260

264261
napi_status Unref() {
265262
uv_unref(reinterpret_cast<uv_handle_t*>(&async));
266-
uv_unref(reinterpret_cast<uv_handle_t*>(&idle));
267263

268264
return napi_ok;
269265
}
270266

271267
napi_status Ref() {
272268
uv_ref(reinterpret_cast<uv_handle_t*>(&async));
273-
uv_ref(reinterpret_cast<uv_handle_t*>(&idle));
274269

275270
return napi_ok;
276271
}
277272

278-
void DispatchOne() {
273+
inline void* Context() {
274+
return context;
275+
}
276+
277+
protected:
278+
void Dispatch() {
279+
bool has_more = true;
280+
281+
// Limit maximum synchronous iteration count to prevent event loop
282+
// starvation. See `src/node_messaging.cc` for an inspiration.
283+
unsigned int iterations_left = kMaxIterationCount;
284+
while (has_more && --iterations_left != 0) {
285+
dispatch_state = kDispatchRunning;
286+
has_more = DispatchOne();
287+
288+
// Send() was called while we were executing the JS function
289+
if (dispatch_state.exchange(kDispatchIdle) != kDispatchRunning) {
290+
has_more = true;
291+
}
292+
}
293+
294+
if (has_more) {
295+
Send();
296+
}
297+
}
298+
299+
bool DispatchOne() {
279300
void* data = nullptr;
280301
bool popped_value = false;
302+
bool has_more = false;
281303

282304
{
283305
node::Mutex::ScopedLock lock(this->mutex);
@@ -302,9 +324,9 @@ class ThreadSafeFunction : public node::AsyncResource {
302324
cond->Signal(lock);
303325
}
304326
CloseHandlesAndMaybeDelete();
305-
} else {
306-
CHECK_EQ(0, uv_idle_stop(&idle));
307327
}
328+
} else {
329+
has_more = true;
308330
}
309331
}
310332
}
@@ -322,6 +344,8 @@ class ThreadSafeFunction : public node::AsyncResource {
322344
call_js_cb(env, js_callback, context, data);
323345
});
324346
}
347+
348+
return has_more;
325349
}
326350

327351
void Finalize() {
@@ -335,10 +359,6 @@ class ThreadSafeFunction : public node::AsyncResource {
335359
EmptyQueueAndDelete();
336360
}
337361

338-
inline void* Context() {
339-
return context;
340-
}
341-
342362
void CloseHandlesAndMaybeDelete(bool set_closing = false) {
343363
v8::HandleScope scope(env->isolate);
344364
if (set_closing) {
@@ -358,18 +378,20 @@ class ThreadSafeFunction : public node::AsyncResource {
358378
ThreadSafeFunction* ts_fn =
359379
node::ContainerOf(&ThreadSafeFunction::async,
360380
reinterpret_cast<uv_async_t*>(handle));
361-
v8::HandleScope scope(ts_fn->env->isolate);
362-
ts_fn->env->node_env()->CloseHandle(
363-
reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
364-
[](uv_handle_t* handle) -> void {
365-
ThreadSafeFunction* ts_fn =
366-
node::ContainerOf(&ThreadSafeFunction::idle,
367-
reinterpret_cast<uv_idle_t*>(handle));
368-
ts_fn->Finalize();
369-
});
381+
ts_fn->Finalize();
370382
});
371383
}
372384

385+
void Send() {
386+
// Ask currently running Dispatch() to make one more iteration
387+
unsigned char current_state = dispatch_state.fetch_or(kDispatchPending);
388+
if ((current_state & kDispatchRunning) == kDispatchRunning) {
389+
return;
390+
}
391+
392+
CHECK_EQ(0, uv_async_send(&async));
393+
}
394+
373395
// Default way of calling into JavaScript. Used when ThreadSafeFunction is
374396
// without a call_js_cb_.
375397
static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
@@ -393,16 +415,10 @@ class ThreadSafeFunction : public node::AsyncResource {
393415
}
394416
}
395417

396-
static void IdleCb(uv_idle_t* idle) {
397-
ThreadSafeFunction* ts_fn =
398-
node::ContainerOf(&ThreadSafeFunction::idle, idle);
399-
ts_fn->DispatchOne();
400-
}
401-
402418
static void AsyncCb(uv_async_t* async) {
403419
ThreadSafeFunction* ts_fn =
404420
node::ContainerOf(&ThreadSafeFunction::async, async);
405-
CHECK_EQ(0, uv_idle_start(&ts_fn->idle, IdleCb));
421+
ts_fn->Dispatch();
406422
}
407423

408424
static void Cleanup(void* data) {
@@ -411,14 +427,20 @@ class ThreadSafeFunction : public node::AsyncResource {
411427
}
412428

413429
private:
430+
static const unsigned char kDispatchIdle = 0;
431+
static const unsigned char kDispatchRunning = 1 << 0;
432+
static const unsigned char kDispatchPending = 1 << 1;
433+
434+
static const unsigned int kMaxIterationCount = 1000;
435+
414436
// These are variables protected by the mutex.
415437
node::Mutex mutex;
416438
std::unique_ptr<node::ConditionVariable> cond;
417439
std::queue<void*> queue;
418440
uv_async_t async;
419-
uv_idle_t idle;
420441
size_t thread_count;
421442
bool is_closing;
443+
std::atomic_uchar dispatch_state;
422444

423445
// These are variables set once, upon creation, and then never again, which
424446
// means we don't need the mutex to read them.

test/node-api/test_threadsafe_function/binding.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
#include <node_api.h>
88
#include "../../js-native-api/common.h"
99

10-
#define ARRAY_LENGTH 10
10+
#define ARRAY_LENGTH 10000
1111
#define MAX_QUEUE_SIZE 2
1212

1313
static uv_thread_t uv_threads[2];
@@ -72,7 +72,7 @@ static void data_source_thread(void* data) {
7272
for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) {
7373
status = napi_call_threadsafe_function(ts_fn, &ints[index],
7474
ts_fn_info->block_on_full);
75-
if (ts_fn_info->max_queue_size == 0) {
75+
if (ts_fn_info->max_queue_size == 0 && (index % 1000 == 0)) {
7676
// Let's make this thread really busy for 200 ms to give the main thread a
7777
// chance to abort.
7878
uint64_t start = uv_hrtime();

test/node-api/test_threadsafe_function/test.js

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,15 @@ new Promise(function testWithoutJSMarshaller(resolve) {
211211
}))
212212
.then((result) => assert.strictEqual(result.indexOf(0), -1))
213213

214+
// Make sure that threadsafe function isn't stalled when we hit
215+
// `kMaxIterationCount` in `src/node_api.cc`
216+
.then(() => testWithJSMarshaller({
217+
threadStarter: 'StartThreadNonblocking',
218+
maxQueueSize: binding.ARRAY_LENGTH >>> 1,
219+
quitAfter: binding.ARRAY_LENGTH
220+
}))
221+
.then((result) => assert.deepStrictEqual(result, expectedArray))
222+
214223
// Start a child process to test rapid teardown
215224
.then(() => testUnref(binding.MAX_QUEUE_SIZE))
216225

0 commit comments

Comments
 (0)