Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: allow Subscribe::ready to be fallible #7713

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 11 additions & 8 deletions crates/wasi-http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,15 +207,16 @@ impl HostInputStream for HostIncomingBodyStream {

#[async_trait::async_trait]
impl Subscribe for HostIncomingBodyStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
if !self.buffer.is_empty() || self.error.is_some() {
return;
return Ok(());
}

if let IncomingBodyStreamState::Open { body, .. } = &mut self.state {
let frame = body.frame().await;
self.record_frame(frame);
}
Ok(())
}
}

Expand Down Expand Up @@ -306,11 +307,11 @@ pub enum HostFutureTrailers {

#[async_trait::async_trait]
impl Subscribe for HostFutureTrailers {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
let body = match self {
HostFutureTrailers::Waiting(body) => body,
HostFutureTrailers::Done(_) => return,
HostFutureTrailers::Consumed => return,
HostFutureTrailers::Done(_) => return Ok(()),
HostFutureTrailers::Consumed => return Ok(()),
};

// If the body is itself being read by a body stream then we need to
Expand Down Expand Up @@ -339,8 +340,8 @@ impl Subscribe for HostFutureTrailers {
// we have the body ourselves then read frames until trailers are found.
let body = match self {
HostFutureTrailers::Waiting(body) => body,
HostFutureTrailers::Done(_) => return,
HostFutureTrailers::Consumed => return,
HostFutureTrailers::Done(_) => return Ok(()),
HostFutureTrailers::Consumed => return Ok(()),
};
let hyper_body = match &mut body.body {
IncomingBodyState::Start(body) => body,
Expand All @@ -360,6 +361,7 @@ impl Subscribe for HostFutureTrailers {
}
};
*self = HostFutureTrailers::Done(result);
Ok(())
}
}

Expand Down Expand Up @@ -627,10 +629,11 @@ impl HostOutputStream for BodyWriteStream {

#[async_trait::async_trait]
impl Subscribe for BodyWriteStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
// Attempt to perform a reservation for a send. If there's capacity in
// the channel or it's already closed then this will return immediately.
// If the channel is full this will block until capacity opens up.
let _ = self.writer.reserve().await;
Ok(())
}
}
3 changes: 2 additions & 1 deletion crates/wasi-http/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,9 +430,10 @@ impl HostFutureIncomingResponse {

#[async_trait::async_trait]
impl Subscribe for HostFutureIncomingResponse {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
if let Self::Pending(handle) = self {
*self = Self::Ready(handle.await);
}
Ok(())
}
}
3 changes: 2 additions & 1 deletion crates/wasi/src/preview2/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ impl HostOutputStream for FileOutputStream {

#[async_trait::async_trait]
impl Subscribe for FileOutputStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
if let OutputState::Waiting(task) = &mut self.state {
self.state = match task.await {
Ok(nwritten) => {
Expand All @@ -297,6 +297,7 @@ impl Subscribe for FileOutputStream {
Err(e) => OutputState::Error(e),
};
}
Ok(())
}
}

Expand Down
3 changes: 2 additions & 1 deletion crates/wasi/src/preview2/host/clocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,12 @@ enum Deadline {

#[async_trait::async_trait]
impl Subscribe for Deadline {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
match self {
Deadline::Past => {}
Deadline::Instant(instant) => tokio::time::sleep_until(*instant).await,
Deadline::Never => std::future::pending().await,
}
Ok(())
}
}
16 changes: 12 additions & 4 deletions crates/wasi/src/preview2/host/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,17 @@ impl<T: WasiView> streams::HostOutputStream for T {
) -> StreamResult<u64> {
use crate::preview2::Subscribe;

self.table_mut().get_mut(&dest)?.ready().await;
self.table_mut()
.get_mut(&dest)?
.ready()
.await
.map_err(StreamError::Trap)?;

self.table_mut().get_mut(&src)?.ready().await;
self.table_mut()
.get_mut(&src)?
.ready()
.await
.map_err(StreamError::Trap)?;

self.splice(dest, src, len).await
}
Expand Down Expand Up @@ -196,7 +204,7 @@ impl<T: WasiView> streams::HostInputStream for T {
len: u64,
) -> StreamResult<Vec<u8>> {
if let InputStream::Host(s) = self.table_mut().get_mut(&stream)? {
s.ready().await;
s.ready().await.map_err(StreamError::Trap)?;
}
self.read(stream, len).await
}
Expand All @@ -216,7 +224,7 @@ impl<T: WasiView> streams::HostInputStream for T {
len: u64,
) -> StreamResult<u64> {
if let InputStream::Host(s) = self.table_mut().get_mut(&stream)? {
s.ready().await;
s.ready().await.map_err(StreamError::Trap)?;
}
self.skip(stream, len).await
}
Expand Down
6 changes: 4 additions & 2 deletions crates/wasi/src/preview2/host/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,12 +411,13 @@ impl<T: WasiView> udp::HostIncomingDatagramStream for T {

#[async_trait]
impl Subscribe for IncomingDatagramStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
// FIXME: Add `Interest::ERROR` when we update to tokio 1.32.
self.inner
.ready(Interest::READABLE)
.await
.expect("failed to await UDP socket readiness");
Ok(())
}
}

Expand Down Expand Up @@ -545,7 +546,7 @@ impl<T: WasiView> udp::HostOutgoingDatagramStream for T {

#[async_trait]
impl Subscribe for OutgoingDatagramStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
match self.send_state {
SendState::Idle | SendState::Permitted(_) => {}
SendState::Waiting => {
Expand All @@ -557,5 +558,6 @@ impl Subscribe for OutgoingDatagramStream {
self.send_state = SendState::Idle;
}
}
Ok(())
}
}
3 changes: 2 additions & 1 deletion crates/wasi/src/preview2/ip_name_lookup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ impl<T: WasiView> HostResolveAddressStream for T {

#[async_trait::async_trait]
impl Subscribe for ResolveAddressStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> Result<()> {
if let ResolveAddressStream::Waiting(future) = self {
*self = ResolveAddressStream::Done(future.await.map(|v| v.into_iter()));
}
Ok(())
}
}

Expand Down
65 changes: 48 additions & 17 deletions crates/wasi/src/preview2/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ impl HostInputStream for MemoryInputPipe {

#[async_trait::async_trait]
impl Subscribe for MemoryInputPipe {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -104,7 +106,9 @@ impl HostOutputStream for MemoryOutputPipe {

#[async_trait::async_trait]
impl Subscribe for MemoryOutputPipe {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

/// Provides a [`HostInputStream`] impl from a [`tokio::io::AsyncRead`] impl
Expand Down Expand Up @@ -193,16 +197,17 @@ impl HostInputStream for AsyncReadStream {
}
#[async_trait::async_trait]
impl Subscribe for AsyncReadStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
if self.buffer.is_some() || self.closed {
return;
return Ok(());
}
match self.receiver.recv().await {
Some(res) => self.buffer = Some(res),
None => {
panic!("no more sender for an open AsyncReadStream - should be impossible")
}
}
Ok(())
}
}

Expand All @@ -227,7 +232,9 @@ impl HostOutputStream for SinkOutputStream {

#[async_trait::async_trait]
impl Subscribe for SinkOutputStream {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

/// A stream that is ready immediately, but will always report that it's closed.
Expand All @@ -243,7 +250,9 @@ impl HostInputStream for ClosedInputStream {

#[async_trait::async_trait]
impl Subscribe for ClosedInputStream {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

/// An output stream that is always closed.
Expand All @@ -265,7 +274,9 @@ impl HostOutputStream for ClosedOutputStream {

#[async_trait::async_trait]
impl Subscribe for ClosedOutputStream {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

#[cfg(test)]
Expand Down Expand Up @@ -323,7 +334,9 @@ mod test {
// The reader task hasn't run yet. Call `ready` to await and fill the buffer.
Ok(bs) => {
assert!(bs.is_empty());
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");
assert!(matches!(reader.read(0), Err(StreamError::Closed)));
}
res => panic!("unexpected: {res:?}"),
Expand All @@ -337,7 +350,9 @@ mod test {
let bs = reader.read(10).unwrap();
if bs.is_empty() {
// Reader task hasn't run yet. Call `ready` to await and fill the buffer.
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");
// Now a read should succeed
let bs = reader.read(10).unwrap();
assert_eq!(bs.len(), 10);
Expand Down Expand Up @@ -367,7 +382,9 @@ mod test {
let bs = reader.read(123).unwrap();
if bs.is_empty() {
// Reader task hasn't run yet. Call `ready` to await and fill the buffer.
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");
// Now a read should succeed
let bs = reader.read(123).unwrap();
assert_eq!(bs.len(), 123);
Expand All @@ -382,7 +399,9 @@ mod test {
Ok(bs) => {
assert!(bs.is_empty());
// Need to await to give this side time to catch up
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");
// Now a read should show closed
assert!(matches!(reader.read(0), Err(StreamError::Closed)));
}
Expand All @@ -402,7 +421,9 @@ mod test {
let bs = reader.read(1).unwrap();
if bs.is_empty() {
// Reader task hasn't run yet. Call `ready` to await and fill the buffer.
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");
// Now a read should succeed
let bs = reader.read(1).unwrap();
assert_eq!(*bs, [123u8]);
Expand All @@ -426,7 +447,9 @@ mod test {

// Wait readiness (yes we could possibly win the race and read it out faster, leaving that
// out of the test for simplicity)
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");

// read the something else back out:
let bs = reader.read(1).unwrap();
Expand All @@ -448,7 +471,9 @@ mod test {

// Wait readiness (yes we could possibly win the race and read it out faster, leaving that
// out of the test for simplicity)
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");

// empty and now closed:
assert!(matches!(reader.read(1), Err(StreamError::Closed)));
Expand All @@ -468,15 +493,19 @@ mod test {
w
});

resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");

// Now we expect the reader task has sent 4k from the stream to the reader.
// Try to read out one bigger than the buffer available:
let bs = reader.read(4097).unwrap();
assert_eq!(bs.len(), 4096);

// Allow the crank to turn more:
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");

// Again we expect the reader task has sent 4k from the stream to the reader.
// Try to read out one bigger than the buffer available:
Expand All @@ -490,7 +519,9 @@ mod test {
drop(w);

// Allow the crank to turn more:
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");

// Now we expect the reader to be empty, and the stream closed:
assert!(matches!(reader.read(4097), Err(StreamError::Closed)));
Expand Down
Loading