Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: allow Subscribe::ready to be fallible #7713

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
refactor: allow Subscribe::ready to be fallible
Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
rvolosatovs committed Dec 21, 2023
commit fd63bc5dff61bf6001693572d152f14397a52be9
19 changes: 11 additions & 8 deletions crates/wasi-http/src/body.rs
Original file line number Diff line number Diff line change
@@ -207,15 +207,16 @@ impl HostInputStream for HostIncomingBodyStream {

#[async_trait::async_trait]
impl Subscribe for HostIncomingBodyStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
if !self.buffer.is_empty() || self.error.is_some() {
return;
return Ok(());
}

if let IncomingBodyStreamState::Open { body, .. } = &mut self.state {
let frame = body.frame().await;
self.record_frame(frame);
}
Ok(())
}
}

@@ -306,11 +307,11 @@ pub enum HostFutureTrailers {

#[async_trait::async_trait]
impl Subscribe for HostFutureTrailers {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
let body = match self {
HostFutureTrailers::Waiting(body) => body,
HostFutureTrailers::Done(_) => return,
HostFutureTrailers::Consumed => return,
HostFutureTrailers::Done(_) => return Ok(()),
HostFutureTrailers::Consumed => return Ok(()),
};

// If the body is itself being read by a body stream then we need to
@@ -339,8 +340,8 @@ impl Subscribe for HostFutureTrailers {
// we have the body ourselves then read frames until trailers are found.
let body = match self {
HostFutureTrailers::Waiting(body) => body,
HostFutureTrailers::Done(_) => return,
HostFutureTrailers::Consumed => return,
HostFutureTrailers::Done(_) => return Ok(()),
HostFutureTrailers::Consumed => return Ok(()),
};
let hyper_body = match &mut body.body {
IncomingBodyState::Start(body) => body,
@@ -360,6 +361,7 @@ impl Subscribe for HostFutureTrailers {
}
};
*self = HostFutureTrailers::Done(result);
Ok(())
}
}

@@ -627,10 +629,11 @@ impl HostOutputStream for BodyWriteStream {

#[async_trait::async_trait]
impl Subscribe for BodyWriteStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
// Attempt to perform a reservation for a send. If there's capacity in
// the channel or it's already closed then this will return immediately.
// If the channel is full this will block until capacity opens up.
let _ = self.writer.reserve().await;
Ok(())
}
}
3 changes: 2 additions & 1 deletion crates/wasi-http/src/types.rs
Original file line number Diff line number Diff line change
@@ -430,9 +430,10 @@ impl HostFutureIncomingResponse {

#[async_trait::async_trait]
impl Subscribe for HostFutureIncomingResponse {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
if let Self::Pending(handle) = self {
*self = Self::Ready(handle.await);
}
Ok(())
}
}
3 changes: 2 additions & 1 deletion crates/wasi/src/preview2/filesystem.rs
Original file line number Diff line number Diff line change
@@ -285,7 +285,7 @@ impl HostOutputStream for FileOutputStream {

#[async_trait::async_trait]
impl Subscribe for FileOutputStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
if let OutputState::Waiting(task) = &mut self.state {
self.state = match task.await {
Ok(nwritten) => {
@@ -297,6 +297,7 @@ impl Subscribe for FileOutputStream {
Err(e) => OutputState::Error(e),
};
}
Ok(())
}
}

3 changes: 2 additions & 1 deletion crates/wasi/src/preview2/host/clocks.rs
Original file line number Diff line number Diff line change
@@ -93,11 +93,12 @@ enum Deadline {

#[async_trait::async_trait]
impl Subscribe for Deadline {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
match self {
Deadline::Past => {}
Deadline::Instant(instant) => tokio::time::sleep_until(*instant).await,
Deadline::Never => std::future::pending().await,
}
Ok(())
}
}
16 changes: 12 additions & 4 deletions crates/wasi/src/preview2/host/io.rs
Original file line number Diff line number Diff line change
@@ -165,9 +165,17 @@ impl<T: WasiView> streams::HostOutputStream for T {
) -> StreamResult<u64> {
use crate::preview2::Subscribe;

self.table_mut().get_mut(&dest)?.ready().await;
self.table_mut()
.get_mut(&dest)?
.ready()
.await
.map_err(StreamError::Trap)?;

self.table_mut().get_mut(&src)?.ready().await;
self.table_mut()
.get_mut(&src)?
.ready()
.await
.map_err(StreamError::Trap)?;

self.splice(dest, src, len).await
}
@@ -196,7 +204,7 @@ impl<T: WasiView> streams::HostInputStream for T {
len: u64,
) -> StreamResult<Vec<u8>> {
if let InputStream::Host(s) = self.table_mut().get_mut(&stream)? {
s.ready().await;
s.ready().await.map_err(StreamError::Trap)?;
}
self.read(stream, len).await
}
@@ -216,7 +224,7 @@ impl<T: WasiView> streams::HostInputStream for T {
len: u64,
) -> StreamResult<u64> {
if let InputStream::Host(s) = self.table_mut().get_mut(&stream)? {
s.ready().await;
s.ready().await.map_err(StreamError::Trap)?;
}
self.skip(stream, len).await
}
6 changes: 4 additions & 2 deletions crates/wasi/src/preview2/host/udp.rs
Original file line number Diff line number Diff line change
@@ -411,12 +411,13 @@ impl<T: WasiView> udp::HostIncomingDatagramStream for T {

#[async_trait]
impl Subscribe for IncomingDatagramStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
// FIXME: Add `Interest::ERROR` when we update to tokio 1.32.
self.inner
.ready(Interest::READABLE)
.await
.expect("failed to await UDP socket readiness");
Ok(())
}
}

@@ -545,7 +546,7 @@ impl<T: WasiView> udp::HostOutgoingDatagramStream for T {

#[async_trait]
impl Subscribe for OutgoingDatagramStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
match self.send_state {
SendState::Idle | SendState::Permitted(_) => {}
SendState::Waiting => {
@@ -557,5 +558,6 @@ impl Subscribe for OutgoingDatagramStream {
self.send_state = SendState::Idle;
}
}
Ok(())
}
}
3 changes: 2 additions & 1 deletion crates/wasi/src/preview2/ip_name_lookup.rs
Original file line number Diff line number Diff line change
@@ -80,10 +80,11 @@ impl<T: WasiView> HostResolveAddressStream for T {

#[async_trait::async_trait]
impl Subscribe for ResolveAddressStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> Result<()> {
if let ResolveAddressStream::Waiting(future) = self {
*self = ResolveAddressStream::Done(future.await.map(|v| v.into_iter()));
}
Ok(())
}
}

65 changes: 48 additions & 17 deletions crates/wasi/src/preview2/pipe.rs
Original file line number Diff line number Diff line change
@@ -49,7 +49,9 @@ impl HostInputStream for MemoryInputPipe {

#[async_trait::async_trait]
impl Subscribe for MemoryInputPipe {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

#[derive(Debug, Clone)]
@@ -104,7 +106,9 @@ impl HostOutputStream for MemoryOutputPipe {

#[async_trait::async_trait]
impl Subscribe for MemoryOutputPipe {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

/// Provides a [`HostInputStream`] impl from a [`tokio::io::AsyncRead`] impl
@@ -193,16 +197,17 @@ impl HostInputStream for AsyncReadStream {
}
#[async_trait::async_trait]
impl Subscribe for AsyncReadStream {
async fn ready(&mut self) {
async fn ready(&mut self) -> wasmtime::Result<()> {
if self.buffer.is_some() || self.closed {
return;
return Ok(());
}
match self.receiver.recv().await {
Some(res) => self.buffer = Some(res),
None => {
panic!("no more sender for an open AsyncReadStream - should be impossible")
}
}
Ok(())
}
}

@@ -227,7 +232,9 @@ impl HostOutputStream for SinkOutputStream {

#[async_trait::async_trait]
impl Subscribe for SinkOutputStream {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

/// A stream that is ready immediately, but will always report that it's closed.
@@ -243,7 +250,9 @@ impl HostInputStream for ClosedInputStream {

#[async_trait::async_trait]
impl Subscribe for ClosedInputStream {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

/// An output stream that is always closed.
@@ -265,7 +274,9 @@ impl HostOutputStream for ClosedOutputStream {

#[async_trait::async_trait]
impl Subscribe for ClosedOutputStream {
async fn ready(&mut self) {}
async fn ready(&mut self) -> wasmtime::Result<()> {
Ok(())
}
}

#[cfg(test)]
@@ -323,7 +334,9 @@ mod test {
// The reader task hasn't run yet. Call `ready` to await and fill the buffer.
Ok(bs) => {
assert!(bs.is_empty());
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");
assert!(matches!(reader.read(0), Err(StreamError::Closed)));
}
res => panic!("unexpected: {res:?}"),
@@ -337,7 +350,9 @@ mod test {
let bs = reader.read(10).unwrap();
if bs.is_empty() {
// Reader task hasn't run yet. Call `ready` to await and fill the buffer.
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");
// Now a read should succeed
let bs = reader.read(10).unwrap();
assert_eq!(bs.len(), 10);
@@ -367,7 +382,9 @@ mod test {
let bs = reader.read(123).unwrap();
if bs.is_empty() {
// Reader task hasn't run yet. Call `ready` to await and fill the buffer.
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");
// Now a read should succeed
let bs = reader.read(123).unwrap();
assert_eq!(bs.len(), 123);
@@ -382,7 +399,9 @@ mod test {
Ok(bs) => {
assert!(bs.is_empty());
// Need to await to give this side time to catch up
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");
// Now a read should show closed
assert!(matches!(reader.read(0), Err(StreamError::Closed)));
}
@@ -402,7 +421,9 @@ mod test {
let bs = reader.read(1).unwrap();
if bs.is_empty() {
// Reader task hasn't run yet. Call `ready` to await and fill the buffer.
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");
// Now a read should succeed
let bs = reader.read(1).unwrap();
assert_eq!(*bs, [123u8]);
@@ -426,7 +447,9 @@ mod test {

// Wait readiness (yes we could possibly win the race and read it out faster, leaving that
// out of the test for simplicity)
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");

// read the something else back out:
let bs = reader.read(1).unwrap();
@@ -448,7 +471,9 @@ mod test {

// Wait readiness (yes we could possibly win the race and read it out faster, leaving that
// out of the test for simplicity)
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");

// empty and now closed:
assert!(matches!(reader.read(1), Err(StreamError::Closed)));
@@ -468,15 +493,19 @@ mod test {
w
});

resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");

// Now we expect the reader task has sent 4k from the stream to the reader.
// Try to read out one bigger than the buffer available:
let bs = reader.read(4097).unwrap();
assert_eq!(bs.len(), 4096);

// Allow the crank to turn more:
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");

// Again we expect the reader task has sent 4k from the stream to the reader.
// Try to read out one bigger than the buffer available:
@@ -490,7 +519,9 @@ mod test {
drop(w);

// Allow the crank to turn more:
resolves_immediately(reader.ready()).await;
resolves_immediately(reader.ready())
.await
.expect("`ready` should not fail");

// Now we expect the reader to be empty, and the stream closed:
assert!(matches!(reader.read(4097), Err(StreamError::Closed)));
Loading