Skip to content

Commit 52b5b10

Browse files
committed
Add stream module to futures crate
The stream module exposes the JsStream type used to convert JS objects implementing the AsyncIterator interface to be used as Rust streams.
1 parent 9f725e7 commit 52b5b10

File tree

4 files changed

+105
-2
lines changed

4 files changed

+105
-2
lines changed

crates/futures/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ edition = "2018"
1414
cfg-if = "1.0.0"
1515
js-sys = { path = "../js-sys", version = '0.3.46' }
1616
wasm-bindgen = { path = "../..", version = '0.2.69' }
17+
futures-core = { version = '0.3.8', default-features = false }
1718

1819
[target.'cfg(target_feature = "atomics")'.dependencies.web-sys]
1920
path = "../web-sys"
@@ -26,3 +27,4 @@ features = [
2627
[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
2728
wasm-bindgen-test = { path = '../test', version = '0.3.19' }
2829
futures-channel-preview = { version = "0.3.0-alpha.18" }
30+
futures-lite = { version = "1.11.3", default-features = false }

crates/futures/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ use std::task::{Context, Poll, Waker};
4343
use wasm_bindgen::prelude::*;
4444

4545
mod queue;
46+
pub mod stream;
4647

4748
mod task {
4849
use cfg_if::cfg_if;

crates/futures/src/stream.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
//! Converting JavaScript `AsyncIterator`s to Rust `Stream`s.
2+
//!
3+
//! Analogous to the promise to future convertion, this module allows the
4+
//! turing objects implementing the async iterator protocol into `Stream`s
5+
//! that produce values that can be awaited from.
6+
//!
7+
8+
use crate::JsFuture;
9+
use core::future::Future;
10+
use core::pin::Pin;
11+
use core::task::{Context, Poll};
12+
use futures_core::stream::Stream;
13+
use js_sys::{AsyncIterator, IteratorNext};
14+
use wasm_bindgen::{prelude::*, JsCast};
15+
16+
/// A `Stream` that yields values from an underlying `AsyncIterator`.
17+
pub struct JsStream {
18+
iter: AsyncIterator,
19+
next: Option<JsFuture>,
20+
done: bool,
21+
}
22+
23+
impl JsStream {
24+
fn next_future(&self) -> Result<JsFuture, JsValue> {
25+
self.iter.next().map(JsFuture::from)
26+
}
27+
}
28+
29+
impl From<AsyncIterator> for JsStream {
30+
fn from(iter: AsyncIterator) -> Self {
31+
JsStream {
32+
iter,
33+
next: None,
34+
done: false,
35+
}
36+
}
37+
}
38+
39+
impl Stream for JsStream {
40+
type Item = Result<JsValue, JsValue>;
41+
42+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
43+
if self.done {
44+
return Poll::Ready(None);
45+
}
46+
47+
let future = match self.next.as_mut() {
48+
Some(val) => val,
49+
None => match self.next_future() {
50+
Ok(val) => {
51+
self.next = Some(val);
52+
self.next.as_mut().unwrap()
53+
}
54+
Err(e) => {
55+
self.done = true;
56+
return Poll::Ready(Some(Err(e)));
57+
}
58+
},
59+
};
60+
61+
match Pin::new(future).poll(cx) {
62+
Poll::Ready(res) => match res {
63+
Ok(iter_next) => {
64+
let next = iter_next.unchecked_into::<IteratorNext>();
65+
if next.done() {
66+
self.done = true;
67+
Poll::Ready(None)
68+
} else {
69+
self.next.take();
70+
Poll::Ready(Some(Ok(next.value())))
71+
}
72+
}
73+
Err(e) => {
74+
self.done = true;
75+
Poll::Ready(Some(Err(e)))
76+
}
77+
},
78+
Poll::Pending => Poll::Pending,
79+
}
80+
}
81+
}

crates/futures/tests/tests.rs

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@
33
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser);
44

55
use futures_channel::oneshot;
6-
use wasm_bindgen::prelude::*;
7-
use wasm_bindgen_futures::{future_to_promise, spawn_local, JsFuture};
6+
use wasm_bindgen::{prelude::*, JsCast};
7+
use wasm_bindgen_futures::{future_to_promise, spawn_local, stream::JsStream, JsFuture};
88
use wasm_bindgen_test::*;
99

1010
#[wasm_bindgen_test]
@@ -88,3 +88,22 @@ async fn can_create_multiple_futures_from_same_promise() {
8888
a.await.unwrap();
8989
b.await.unwrap();
9090
}
91+
92+
#[wasm_bindgen_test]
93+
async fn can_use_an_async_iterable_as_stream() {
94+
use futures_lite::stream::StreamExt;
95+
let async_iter = js_sys::Function::new_no_args(
96+
"return async function*() {
97+
yield 42;
98+
yield 24;
99+
}()",
100+
)
101+
.call0(&JsValue::undefined())
102+
.unwrap()
103+
.unchecked_into::<js_sys::AsyncIterator>();
104+
105+
let mut stream = JsStream::from(async_iter);
106+
assert_eq!(stream.next().await, Some(Ok(JsValue::from(42))));
107+
assert_eq!(stream.next().await, Some(Ok(JsValue::from(24))));
108+
assert_eq!(stream.next().await, None);
109+
}

0 commit comments

Comments
 (0)