Skip to content

Commit

Permalink
feat(neon): Add JoinHandle as a result of Channel::send
Browse files Browse the repository at this point in the history
  • Loading branch information
kjvalencik committed Sep 3, 2021
1 parent 24e7d6f commit 7a90ca0
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 7 deletions.
65 changes: 58 additions & 7 deletions src/event/event_queue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{mpsc, Arc};

use neon_runtime::raw::Env;
use neon_runtime::tsfn::ThreadsafeFunction;
Expand Down Expand Up @@ -104,9 +104,10 @@ impl Channel {

/// Schedules a closure to execute on the JavaScript thread that created this Channel
/// Panics if there is a libuv error
pub fn send<F>(&self, f: F)
pub fn send<T, F>(&self, f: F) -> JoinHandle<T>
where
F: FnOnce(TaskContext) -> NeonResult<()> + Send + 'static,
T: Send + 'static,
F: FnOnce(TaskContext) -> NeonResult<T> + Send + 'static,
{
self.try_send(f).unwrap()
}
Expand All @@ -115,21 +116,29 @@ impl Channel {
/// Returns an `Error` if the task could not be scheduled.
///
/// See [`SendError`] for additional details on failure causes.
pub fn try_send<F>(&self, f: F) -> Result<(), SendError>
pub fn try_send<T, F>(&self, f: F) -> Result<JoinHandle<T>, SendError>
where
F: FnOnce(TaskContext) -> NeonResult<()> + Send + 'static,
T: Send + 'static,
F: FnOnce(TaskContext) -> NeonResult<T> + Send + 'static,
{
let (tx, rx) = mpsc::sync_channel(1);
let callback = Box::new(move |env| {
let env = unsafe { std::mem::transmute(env) };

// Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because
// N-API creates a `HandleScope` before calling the callback.
TaskContext::with_context(env, move |cx| {
let _ = f(cx);
// Error can be ignored; it only means the user didn't join
let _ = tx.send(f(cx));
});
});

self.state.tsfn.call(callback, None).map_err(|_| SendError)
self.state
.tsfn
.call(callback, None)
.map_err(|_| SendError)?;

Ok(JoinHandle { rx })
}

/// Returns a boolean indicating if this `Channel` will prevent the Node event
Expand Down Expand Up @@ -202,6 +211,48 @@ impl Drop for Channel {
}
}

/// An owned permission to join on the result of a closure sent to the JavaScript main
/// thread with [`Channel::send`].
pub struct JoinHandle<T> {
rx: mpsc::Receiver<NeonResult<T>>,
}

impl<T> JoinHandle<T> {
/// Waits for the associated closure to finish executing
///
/// If the closure panics or throws an exception, `Err` is returned
pub fn join(self) -> Result<T, JoinError> {
self.rx
.recv()
// If the sending side dropped without sending, it must have panicked
.map_err(|_| JoinError(JoinErrorType::Panic))?
// If the closure returned `Err`, a JavaScript exception was thrown
.map_err(|_| JoinError(JoinErrorType::Throw))
}
}

#[derive(Debug)]
/// Error returned by [`JoinHandle::join`] indicating the associated closure panicked
/// or threw an exception.
pub struct JoinError(JoinErrorType);

#[derive(Debug)]
enum JoinErrorType {
Panic,
Throw,
}

impl std::fmt::Display for JoinError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match &self.0 {
JoinErrorType::Panic => f.write_str("Closure panicked before returning"),
JoinErrorType::Throw => f.write_str("Closure threw an exception"),
}
}
}

impl std::error::Error for JoinError {}

/// Error indicating that a closure was unable to be scheduled to execute on the event loop.
///
/// The most likely cause of a failure is that Node is shutting down. This may occur if the
Expand Down
13 changes: 13 additions & 0 deletions test/napi/lib/threads.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,17 @@ const assert = require('chai').assert;
// Asynchronously GC to give the task queue a chance to execute
setTimeout(() => global.gc(), 10);
});

it('should be able to join on the result of a channel', function (cb) {
let msg = "Uninitialized";

addon.channel_join(() => msg, (res) => {
assert.strictEqual(res, "Received: Hello, World!");
cb();
});

setTimeout(() => {
msg = "Hello, World!";
}, 10);
});
});
39 changes: 39 additions & 0 deletions test/napi/src/js/threads.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::cell::RefCell;
use std::sync::Arc;
use std::time::Duration;

use neon::prelude::*;

Expand Down Expand Up @@ -185,3 +186,41 @@ pub fn drop_global_queue(mut cx: FunctionContext) -> JsResult<JsUndefined> {

Ok(cx.undefined())
}

pub fn channel_join(mut cx: FunctionContext) -> JsResult<JsUndefined> {
let get_message = cx.argument::<JsFunction>(0)?.root(&mut cx);
let callback = cx.argument::<JsFunction>(1)?.root(&mut cx);
let channel = cx.channel();

std::thread::spawn(move || {
// Give a chance for the data to change
std::thread::sleep(Duration::from_millis(20));

// Get the current message
let message = channel
.send(move |mut cx| {
let this = cx.undefined();

get_message
.into_inner(&mut cx)
.call::<_, _, JsValue, _>(&mut cx, this, [])?
.downcast_or_throw::<JsString, _>(&mut cx)
.map(|v| v.value(&mut cx))
})
.join()
.unwrap();

let response = format!("Received: {}", message);

channel.send(move |mut cx| {
let this = cx.undefined();
let args = [cx.string(response)];

callback.into_inner(&mut cx).call(&mut cx, this, args)?;

Ok(())
});
});

Ok(cx.undefined())
}
1 change: 1 addition & 0 deletions test/napi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> {
cx.export_function("greeter_greet", greeter_greet)?;
cx.export_function("leak_channel", leak_channel)?;
cx.export_function("drop_global_queue", drop_global_queue)?;
cx.export_function("channel_join", channel_join)?;

Ok(())
}

0 comments on commit 7a90ca0

Please sign in to comment.