12
12
#include " tracing/traced_value.h"
13
13
#include " util-inl.h"
14
14
15
+ #include < atomic>
15
16
#include < memory>
16
17
17
18
struct node_napi_env__ : public napi_env__ {
@@ -137,6 +138,7 @@ class ThreadSafeFunction : public node::AsyncResource {
137
138
*v8::String::Utf8Value (env_->isolate, name)),
138
139
thread_count(thread_count_),
139
140
is_closing(false ),
141
+ dispatch_state(kDispatchIdle ),
140
142
context(context_),
141
143
max_queue_size(max_queue_size_),
142
144
env(env_),
@@ -176,10 +178,8 @@ class ThreadSafeFunction : public node::AsyncResource {
176
178
return napi_closing;
177
179
}
178
180
} else {
179
- if (uv_async_send (&async) != 0 ) {
180
- return napi_generic_failure;
181
- }
182
181
queue.push (data);
182
+ Send ();
183
183
return napi_ok;
184
184
}
185
185
}
@@ -211,9 +211,7 @@ class ThreadSafeFunction : public node::AsyncResource {
211
211
if (is_closing && max_queue_size > 0 ) {
212
212
cond->Signal (lock);
213
213
}
214
- if (uv_async_send (&async) != 0 ) {
215
- return napi_generic_failure;
216
- }
214
+ Send ();
217
215
}
218
216
}
219
217
@@ -238,7 +236,6 @@ class ThreadSafeFunction : public node::AsyncResource {
238
236
cond = std::make_unique<node::ConditionVariable>();
239
237
}
240
238
if (max_queue_size == 0 || cond) {
241
- CHECK_EQ (0 , uv_idle_init (loop, &idle));
242
239
return napi_ok;
243
240
}
244
241
@@ -263,21 +260,46 @@ class ThreadSafeFunction : public node::AsyncResource {
263
260
264
261
napi_status Unref () {
265
262
uv_unref (reinterpret_cast <uv_handle_t *>(&async));
266
- uv_unref (reinterpret_cast <uv_handle_t *>(&idle));
267
263
268
264
return napi_ok;
269
265
}
270
266
271
267
napi_status Ref () {
272
268
uv_ref (reinterpret_cast <uv_handle_t *>(&async));
273
- uv_ref (reinterpret_cast <uv_handle_t *>(&idle));
274
269
275
270
return napi_ok;
276
271
}
277
272
278
- void DispatchOne () {
273
+ inline void * Context () {
274
+ return context;
275
+ }
276
+
277
+ protected:
278
+ void Dispatch () {
279
+ bool has_more = true ;
280
+
281
+ // Limit maximum synchronous iteration count to prevent event loop
282
+ // starvation. See `src/node_messaging.cc` for an inspiration.
283
+ unsigned int iterations_left = kMaxIterationCount ;
284
+ while (has_more && --iterations_left != 0 ) {
285
+ dispatch_state = kDispatchRunning ;
286
+ has_more = DispatchOne ();
287
+
288
+ // Send() was called while we were executing the JS function
289
+ if (dispatch_state.exchange (kDispatchIdle ) != kDispatchRunning ) {
290
+ has_more = true ;
291
+ }
292
+ }
293
+
294
+ if (has_more) {
295
+ Send ();
296
+ }
297
+ }
298
+
299
+ bool DispatchOne () {
279
300
void * data = nullptr ;
280
301
bool popped_value = false ;
302
+ bool has_more = false ;
281
303
282
304
{
283
305
node::Mutex::ScopedLock lock (this ->mutex );
@@ -302,9 +324,9 @@ class ThreadSafeFunction : public node::AsyncResource {
302
324
cond->Signal (lock);
303
325
}
304
326
CloseHandlesAndMaybeDelete ();
305
- } else {
306
- CHECK_EQ (0 , uv_idle_stop (&idle));
307
327
}
328
+ } else {
329
+ has_more = true ;
308
330
}
309
331
}
310
332
}
@@ -322,6 +344,8 @@ class ThreadSafeFunction : public node::AsyncResource {
322
344
call_js_cb (env, js_callback, context, data);
323
345
});
324
346
}
347
+
348
+ return has_more;
325
349
}
326
350
327
351
void Finalize () {
@@ -335,10 +359,6 @@ class ThreadSafeFunction : public node::AsyncResource {
335
359
EmptyQueueAndDelete ();
336
360
}
337
361
338
- inline void * Context () {
339
- return context;
340
- }
341
-
342
362
void CloseHandlesAndMaybeDelete (bool set_closing = false ) {
343
363
v8::HandleScope scope (env->isolate );
344
364
if (set_closing) {
@@ -358,18 +378,20 @@ class ThreadSafeFunction : public node::AsyncResource {
358
378
ThreadSafeFunction* ts_fn =
359
379
node::ContainerOf (&ThreadSafeFunction::async,
360
380
reinterpret_cast <uv_async_t *>(handle));
361
- v8::HandleScope scope (ts_fn->env ->isolate );
362
- ts_fn->env ->node_env ()->CloseHandle (
363
- reinterpret_cast <uv_handle_t *>(&ts_fn->idle ),
364
- [](uv_handle_t * handle) -> void {
365
- ThreadSafeFunction* ts_fn =
366
- node::ContainerOf (&ThreadSafeFunction::idle,
367
- reinterpret_cast <uv_idle_t *>(handle));
368
- ts_fn->Finalize ();
369
- });
381
+ ts_fn->Finalize ();
370
382
});
371
383
}
372
384
385
+ void Send () {
386
+ // Ask currently running Dispatch() to make one more iteration
387
+ unsigned char current_state = dispatch_state.fetch_or (kDispatchPending );
388
+ if ((current_state & kDispatchRunning ) == kDispatchRunning ) {
389
+ return ;
390
+ }
391
+
392
+ CHECK_EQ (0 , uv_async_send (&async));
393
+ }
394
+
373
395
// Default way of calling into JavaScript. Used when ThreadSafeFunction is
374
396
// without a call_js_cb_.
375
397
static void CallJs (napi_env env, napi_value cb, void * context, void * data) {
@@ -393,16 +415,10 @@ class ThreadSafeFunction : public node::AsyncResource {
393
415
}
394
416
}
395
417
396
- static void IdleCb (uv_idle_t * idle) {
397
- ThreadSafeFunction* ts_fn =
398
- node::ContainerOf (&ThreadSafeFunction::idle, idle);
399
- ts_fn->DispatchOne ();
400
- }
401
-
402
418
static void AsyncCb (uv_async_t * async) {
403
419
ThreadSafeFunction* ts_fn =
404
420
node::ContainerOf (&ThreadSafeFunction::async, async);
405
- CHECK_EQ ( 0 , uv_idle_start (& ts_fn->idle , IdleCb) );
421
+ ts_fn->Dispatch ( );
406
422
}
407
423
408
424
static void Cleanup (void * data) {
@@ -411,14 +427,20 @@ class ThreadSafeFunction : public node::AsyncResource {
411
427
}
412
428
413
429
private:
430
+ static const unsigned char kDispatchIdle = 0 ;
431
+ static const unsigned char kDispatchRunning = 1 << 0 ;
432
+ static const unsigned char kDispatchPending = 1 << 1 ;
433
+
434
+ static const unsigned int kMaxIterationCount = 1000 ;
435
+
414
436
// These are variables protected by the mutex.
415
437
node::Mutex mutex;
416
438
std::unique_ptr<node::ConditionVariable> cond;
417
439
std::queue<void *> queue;
418
440
uv_async_t async;
419
- uv_idle_t idle;
420
441
size_t thread_count;
421
442
bool is_closing;
443
+ std::atomic_uchar dispatch_state;
422
444
423
445
// These are variables set once, upon creation, and then never again, which
424
446
// means we don't need the mutex to read them.
0 commit comments