Skip to content

Commit

Permalink
Merge pull request #874 from neon-bindings/kv/futures
Browse files Browse the repository at this point in the history
Implement Futures RFC
  • Loading branch information
kjvalencik authored Jul 7, 2022
2 parents bceacdd + 144d1d4 commit f21146d
Show file tree
Hide file tree
Showing 12 changed files with 440 additions and 40 deletions.
8 changes: 4 additions & 4 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[alias]
# Neon defines mutually exclusive feature flags which prevents using `cargo clippy --all-features`
# The following aliases simplify linting the entire workspace
neon-check = " check --all --all-targets --features napi-experimental"
neon-clippy = "clippy --all --all-targets --features napi-experimental -- -A clippy::missing_safety_doc"
neon-test = " test --all --features=napi-experimental"
neon-doc = " rustdoc -p neon --features=napi-experimental -- --cfg docsrs"
neon-check = " check --all --all-targets --features napi-experimental,futures"
neon-clippy = "clippy --all --all-targets --features napi-experimental,futures -- -A clippy::missing_safety_doc"
neon-test = " test --all --features=napi-experimental,futures"
neon-doc = " rustdoc -p neon --features=napi-experimental,futures -- --cfg docsrs"
11 changes: 11 additions & 0 deletions crates/neon/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,19 @@ smallvec = "1.4.2"
once_cell = "1.10.0"
neon-macros = { version = "=0.10.1", path = "../neon-macros" }

[dependencies.tokio]
version = "1.18.2"
default-features = false
features = ["sync"]
optional = true

[features]
default = ["napi-1"]

# Experimental Rust Futures API
# https://github.com/neon-bindings/rfcs/pull/46
futures = ["tokio"]

# Default N-API version. Prefer to select a minimum required version.
# DEPRECATED: This is an alias that should be removed
napi-runtime = ["napi-8"]
Expand Down Expand Up @@ -63,5 +73,6 @@ proc-macros = []
[package.metadata.docs.rs]
rustdoc-args = ["--cfg", "docsrs"]
features = [
"futures",
"napi-experimental",
]
142 changes: 115 additions & 27 deletions crates/neon/src/event/channel.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,49 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
mpsc, Arc,
use std::{
error, fmt, mem,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};

use crate::{
context::{Context, TaskContext},
result::NeonResult,
result::{NeonResult, ResultExt, Throw},
sys::{raw::Env, tsfn::ThreadsafeFunction},
};

#[cfg(feature = "futures")]
use {
std::future::Future,
std::pin::Pin,
std::task::{self, Poll},
tokio::sync::oneshot,
};

#[cfg(not(feature = "futures"))]
// Synchronous oneshot channel API compatible with `tokio::sync::oneshot`
mod oneshot {
use std::sync::mpsc;

pub(super) mod error {
pub use super::mpsc::RecvError;
}

pub(super) struct Receiver<T>(mpsc::Receiver<T>);

impl<T> Receiver<T> {
pub(super) fn blocking_recv(self) -> Result<T, mpsc::RecvError> {
self.0.recv()
}
}

pub(super) fn channel<T>() -> (mpsc::SyncSender<T>, Receiver<T>) {
let (tx, rx) = mpsc::sync_channel(1);

(tx, Receiver(rx))
}
}

type Callback = Box<dyn FnOnce(Env) + Send + 'static>;

/// Channel for scheduling Rust closures to execute on the JavaScript main thread.
Expand Down Expand Up @@ -70,8 +105,8 @@ pub struct Channel {
has_ref: bool,
}

impl std::fmt::Debug for Channel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl fmt::Debug for Channel {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("Channel")
}
}
Expand Down Expand Up @@ -131,15 +166,15 @@ impl Channel {
T: Send + 'static,
F: FnOnce(TaskContext) -> NeonResult<T> + Send + 'static,
{
let (tx, rx) = mpsc::sync_channel(1);
let (tx, rx) = oneshot::channel();
let callback = Box::new(move |env| {
let env = unsafe { std::mem::transmute(env) };
let env = unsafe { mem::transmute(env) };

// Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because
// N-API creates a `HandleScope` before calling the callback.
TaskContext::with_context(env, move |cx| {
// Error can be ignored; it only means the user didn't join
let _ = tx.send(f(cx).map_err(|_| ()));
let _ = tx.send(f(cx).map_err(Into::into));
});
});

Expand Down Expand Up @@ -225,20 +260,40 @@ impl Drop for Channel {
/// thread with [`Channel::send`].
pub struct JoinHandle<T> {
// `Err` is always `Throw`, but `Throw` cannot be sent across threads
rx: mpsc::Receiver<Result<T, ()>>,
rx: oneshot::Receiver<Result<T, SendThrow>>,
}

impl<T> JoinHandle<T> {
/// Waits for the associated closure to finish executing
///
/// If the closure panics or throws an exception, `Err` is returned
///
/// # Panics
///
/// This function panics if called within an asynchronous execution context.
pub fn join(self) -> Result<T, JoinError> {
self.rx
.recv()
// If the sending side dropped without sending, it must have panicked
.map_err(|_| JoinError(JoinErrorType::Panic))?
// If the closure returned `Err`, a JavaScript exception was thrown
.map_err(|_| JoinError(JoinErrorType::Throw))
Ok(self.rx.blocking_recv()??)
}
}

#[cfg(feature = "futures")]
#[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
impl<T> Future for JoinHandle<T> {
type Output = Result<T, JoinError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Self::Output> {
match Pin::new(&mut self.rx).poll(cx) {
Poll::Ready(result) => {
// Flatten `Result<Result<T, SendThrow>, RecvError>` by mapping to
// `Result<T, JoinError>`. This can be simplified by replacing the
// closure with a try-block after stabilization.
// https://doc.rust-lang.org/beta/unstable-book/language-features/try-blocks.html
let get_result = move || Ok(result??);

Poll::Ready(get_result())
}
Poll::Pending => Poll::Pending,
}
}
}

Expand All @@ -253,16 +308,49 @@ enum JoinErrorType {
Throw,
}

impl std::fmt::Display for JoinError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl JoinError {
fn as_str(&self) -> &str {
match &self.0 {
JoinErrorType::Panic => f.write_str("Closure panicked before returning"),
JoinErrorType::Throw => f.write_str("Closure threw an exception"),
JoinErrorType::Panic => "Closure panicked before returning",
JoinErrorType::Throw => "Closure threw an exception",
}
}
}

impl std::error::Error for JoinError {}
impl fmt::Display for JoinError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str(self.as_str())
}
}

impl error::Error for JoinError {}

impl From<oneshot::error::RecvError> for JoinError {
fn from(_: oneshot::error::RecvError) -> Self {
JoinError(JoinErrorType::Panic)
}
}

// Marker that a `Throw` occurred that can be sent across threads for use in `JoinError`
pub(crate) struct SendThrow(());

impl From<SendThrow> for JoinError {
fn from(_: SendThrow) -> Self {
JoinError(JoinErrorType::Throw)
}
}

impl From<Throw> for SendThrow {
fn from(_: Throw) -> SendThrow {
SendThrow(())
}
}

impl<T> ResultExt<T> for Result<T, JoinError> {
fn or_throw<'a, C: Context<'a>>(self, cx: &mut C) -> NeonResult<T> {
self.or_else(|err| cx.throw_error(err.as_str()))
}
}

/// Error indicating that a closure was unable to be scheduled to execute on the event loop.
///
Expand All @@ -275,19 +363,19 @@ impl std::error::Error for JoinError {}
#[cfg_attr(docsrs, doc(cfg(feature = "napi-4")))]
pub struct SendError;

impl std::fmt::Display for SendError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl fmt::Display for SendError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "SendError")
}
}

impl std::fmt::Debug for SendError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
std::fmt::Display::fmt(self, f)
impl fmt::Debug for SendError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Display::fmt(self, f)
}
}

impl std::error::Error for SendError {}
impl error::Error for SendError {}

struct ChannelState {
tsfn: ThreadsafeFunction<Callback>,
Expand Down
2 changes: 2 additions & 0 deletions crates/neon/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ mod task;

pub use self::task::TaskBuilder;

#[cfg(all(feature = "napi-5", feature = "futures"))]
pub(crate) use self::channel::SendThrow;
#[cfg(feature = "napi-4")]
pub use self::channel::{Channel, JoinError, JoinHandle, SendError};

Expand Down
12 changes: 11 additions & 1 deletion crates/neon/src/result/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use std::{
marker::PhantomData,
};

use crate::{context::Context, handle::Handle};
use crate::{context::Context, handle::Handle, types::Value};

/// A [unit type][unit] indicating that the JavaScript thread is throwing an exception.
///
Expand Down Expand Up @@ -74,3 +74,13 @@ pub type JsResult<'b, T> = NeonResult<Handle<'b, T>>;
pub trait ResultExt<T> {
fn or_throw<'a, C: Context<'a>>(self, cx: &mut C) -> NeonResult<T>;
}

impl<'a, 'b, T, E> ResultExt<Handle<'a, T>> for Result<Handle<'a, T>, Handle<'b, E>>
where
T: Value,
E: Value,
{
fn or_throw<'cx, C: Context<'cx>>(self, cx: &mut C) -> JsResult<'a, T> {
self.or_else(|err| cx.throw(err))
}
}
4 changes: 4 additions & 0 deletions crates/neon/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ pub use self::{
#[cfg(feature = "napi-5")]
pub use self::date::{DateError, DateErrorKind, JsDate};

#[cfg(all(feature = "napi-5", feature = "futures"))]
#[cfg_attr(docsrs, doc(cfg(all(feature = "napi-5", feature = "futures"))))]
pub use self::promise::JsFuture;

pub(crate) fn build<'a, T: Managed, F: FnOnce(&mut raw::Local) -> bool>(
env: Env,
init: F,
Expand Down
Loading

0 comments on commit f21146d

Please sign in to comment.