Skip to content

Commit b2fe5d1

Browse files
authored
Convert async iterators to streams (#2401)
* Add stream module to futures crate The stream module exposes the JsStream type used to convert JS objects implementing the AsyncIterator interface to be used as Rust streams. * Add stream feature to futures crate
1 parent 9f725e7 commit b2fe5d1

File tree

5 files changed

+114
-0
lines changed

5 files changed

+114
-0
lines changed

crates/futures/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ edition = "2018"
1414
cfg-if = "1.0.0"
1515
js-sys = { path = "../js-sys", version = '0.3.46' }
1616
wasm-bindgen = { path = "../..", version = '0.2.69' }
17+
futures-core = { version = '0.3.8', default-features = false, optional = true }
18+
19+
[features]
20+
futures-core-03-stream = ['futures-core']
1721

1822
[target.'cfg(target_feature = "atomics")'.dependencies.web-sys]
1923
path = "../web-sys"
@@ -26,3 +30,4 @@ features = [
2630
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
2731
wasm-bindgen-test = { path = '../test', version = '0.3.19' }
2832
futures-channel-preview = { version = "0.3.0-alpha.18" }
33+
futures-lite = { version = "1.11.3", default-features = false }

crates/futures/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ This crate bridges the gap between a Rust `Future` and a JavaScript
88
1. From a JavaScript `Promise` into a Rust `Future`.
99
2. From a Rust `Future` into a JavaScript `Promise`.
1010

11+
Additionally under the feature flag `futures-core-03-stream` there is experimental
12+
support for `AsyncIterator` to `Stream` conversion.
13+
1114
See the [API documentation][docs] for more info.
1215

1316
[docs]: https://rustwasm.github.io/wasm-bindgen/api/wasm_bindgen_futures/

crates/futures/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ use std::task::{Context, Poll, Waker};
4343
use wasm_bindgen::prelude::*;
4444

4545
mod queue;
46+
#[cfg(feature = "futures-core-03-stream")]
47+
pub mod stream;
4648

4749
mod task {
4850
use cfg_if::cfg_if;

crates/futures/src/stream.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
//! Converting JavaScript `AsyncIterator`s to Rust `Stream`s.
2+
//!
3+
//! Analogous to the promise to future convertion, this module allows the
4+
//! turing objects implementing the async iterator protocol into `Stream`s
5+
//! that produce values that can be awaited from.
6+
//!
7+
8+
use crate::JsFuture;
9+
use core::future::Future;
10+
use core::pin::Pin;
11+
use core::task::{Context, Poll};
12+
use futures_core::stream::Stream;
13+
use js_sys::{AsyncIterator, IteratorNext};
14+
use wasm_bindgen::{prelude::*, JsCast};
15+
16+
/// A `Stream` that yields values from an underlying `AsyncIterator`.
17+
pub struct JsStream {
18+
iter: AsyncIterator,
19+
next: Option<JsFuture>,
20+
done: bool,
21+
}
22+
23+
impl JsStream {
24+
fn next_future(&self) -> Result<JsFuture, JsValue> {
25+
self.iter.next().map(JsFuture::from)
26+
}
27+
}
28+
29+
impl From<AsyncIterator> for JsStream {
30+
fn from(iter: AsyncIterator) -> Self {
31+
JsStream {
32+
iter,
33+
next: None,
34+
done: false,
35+
}
36+
}
37+
}
38+
39+
impl Stream for JsStream {
40+
type Item = Result<JsValue, JsValue>;
41+
42+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
43+
if self.done {
44+
return Poll::Ready(None);
45+
}
46+
47+
let future = match self.next.as_mut() {
48+
Some(val) => val,
49+
None => match self.next_future() {
50+
Ok(val) => {
51+
self.next = Some(val);
52+
self.next.as_mut().unwrap()
53+
}
54+
Err(e) => {
55+
self.done = true;
56+
return Poll::Ready(Some(Err(e)));
57+
}
58+
},
59+
};
60+
61+
match Pin::new(future).poll(cx) {
62+
Poll::Ready(res) => match res {
63+
Ok(iter_next) => {
64+
let next = iter_next.unchecked_into::<IteratorNext>();
65+
if next.done() {
66+
self.done = true;
67+
Poll::Ready(None)
68+
} else {
69+
self.next.take();
70+
Poll::Ready(Some(Ok(next.value())))
71+
}
72+
}
73+
Err(e) => {
74+
self.done = true;
75+
Poll::Ready(Some(Err(e)))
76+
}
77+
},
78+
Poll::Pending => Poll::Pending,
79+
}
80+
}
81+
}

crates/futures/tests/tests.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,3 +88,26 @@ async fn can_create_multiple_futures_from_same_promise() {
8888
a.await.unwrap();
8989
b.await.unwrap();
9090
}
91+
92+
#[cfg(feature = "futures-core-03-stream")]
93+
#[wasm_bindgen_test]
94+
async fn can_use_an_async_iterable_as_stream() {
95+
use futures_lite::stream::StreamExt;
96+
use wasm_bindgen::JsCast;
97+
use wasm_bindgen_futures::stream::JsStream;
98+
99+
let async_iter = js_sys::Function::new_no_args(
100+
"return async function*() {
101+
yield 42;
102+
yield 24;
103+
}()",
104+
)
105+
.call0(&JsValue::undefined())
106+
.unwrap()
107+
.unchecked_into::<js_sys::AsyncIterator>();
108+
109+
let mut stream = JsStream::from(async_iter);
110+
assert_eq!(stream.next().await, Some(Ok(JsValue::from(42))));
111+
assert_eq!(stream.next().await, Some(Ok(JsValue::from(24))));
112+
assert_eq!(stream.next().await, None);
113+
}

0 commit comments

Comments
 (0)