Skip to content

Commit 9f33b4b

Browse files
alexcrichtondicej
andauthored
[37.0.0] Backport WASIp3-related PRs (#11624)
* make `Destination::as_direct` work for both host and guest readers (#11612) In order to reduce code duplication (and code paths to test) in `wasmtime-wasi` and custom host embeddings, I've made `Destination::as_direct` (formerly known as `as_direct_destination`) work for host readers as well as guest ones. In the process, I noticed and fixed a couple of related issues: - I had forgotten to implement or test host reader support in `DirectSource` 🤦 - The code to support host-to-host pipes failed to account for partial reads I've also simplified the `StreamConsumer` and `StreamProducer` APIs slightly by having them take their `Source` and `Destination` parameters by value rather than by reference, respectively. Note that, per WebAssembly/component-model#561, I've tweaked the documentation for `StreamProducer` to indicate that implementations might reasonably opt to "pretend" they're ready without buffering any items when handling zero-length reads given that buffering has its own hazards. Likewise, I've updated the `wasi-filesystem` and `wasi-cli` implementations to "pretend" instead of buffering. Signed-off-by: Joel Dice <joel.dice@fermyon.com> * wasip3: Refine 0-length behavior in cli/sockets (#11614) Add some comments for issues and `if` blocks in a few places. I'd like to add some tests but I ran into #11611 so I'll defer tests to later. * wasip3: Refactor the readdir stream iterators (#11615) * wasip3: Refactor `BlockingDirectoryStreamProducer` * Generalize this into a `FallibleStreamProducer` structure * Don't read the entire iterator on the first call to `poll_produce` * Do a blocking read of `dir.entries()` in the original function call to avoid handling state in the iterator itself. * wasip3: Refactor `NonblockingDirectoryStreamProducer` * Start the reading task before iteration starts to move the spawn out of the `poll_*` method. * Rely on fusing behavior of mpsc/tasks to avoid extra state structure. * Specifically handle 0-length reads. Mostly try to refactor the state representation to be more struct-like rather than enum like which is a little easier to follow. * wasip3: Port `preview1_fd_readdir` to WASIp3 Have at least one test looking at the readdir behavior. * Fix `finish` handling in `FallibleIteratorProducer` * Fix a typo --------- Signed-off-by: Joel Dice <joel.dice@fermyon.com> Co-authored-by: Joel Dice <joel.dice@fermyon.com>
1 parent 1047b51 commit 9f33b4b

File tree

12 files changed

+833
-473
lines changed

12 files changed

+833
-473
lines changed

crates/misc/component-async-tests/src/util.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ impl<D, T: Send + Sync + Lower + 'static, S: Stream<Item = T> + Send + 'static>
5353
self: Pin<&mut Self>,
5454
cx: &mut Context<'_>,
5555
_: StoreContextMut<D>,
56-
destination: &'a mut Destination<'a, Self::Item, Self::Buffer>,
56+
mut destination: Destination<'a, Self::Item, Self::Buffer>,
5757
finish: bool,
5858
) -> Poll<Result<StreamResult>> {
5959
// SAFETY: This is a standard pin-projection, and we never move
@@ -94,7 +94,7 @@ impl<D, T: Lift + 'static, S: Sink<T, Error: std::error::Error + Send + Sync> +
9494
self: Pin<&mut Self>,
9595
cx: &mut Context<'_>,
9696
store: StoreContextMut<D>,
97-
source: &mut Source<Self::Item>,
97+
mut source: Source<Self::Item>,
9898
finish: bool,
9999
) -> Poll<Result<StreamResult>> {
100100
// SAFETY: This is a standard pin-projection, and we never move

crates/misc/component-async-tests/tests/scenario/streams.rs

Lines changed: 132 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,107 @@ use {
66
util::{OneshotConsumer, OneshotProducer, PipeConsumer, PipeProducer},
77
},
88
futures::{
9-
SinkExt, StreamExt,
9+
Sink, SinkExt, Stream, StreamExt,
1010
channel::{mpsc, oneshot},
1111
future,
1212
},
13-
std::sync::{Arc, Mutex},
13+
std::{
14+
pin::Pin,
15+
sync::{Arc, Mutex},
16+
task::{Context, Poll},
17+
},
1418
wasmtime::{
15-
Engine, Store,
16-
component::{FutureReader, Linker, ResourceTable, StreamReader},
19+
Engine, Store, StoreContextMut,
20+
component::{
21+
Destination, FutureReader, Linker, ResourceTable, Source, StreamConsumer,
22+
StreamProducer, StreamReader, StreamResult,
23+
},
1724
},
1825
wasmtime_wasi::WasiCtxBuilder,
1926
};
2027

28+
pub struct DirectPipeProducer<S>(S);
29+
30+
impl<D, S: Stream<Item = u8> + Send + 'static> StreamProducer<D> for DirectPipeProducer<S> {
31+
type Item = u8;
32+
type Buffer = Option<u8>;
33+
34+
fn poll_produce<'a>(
35+
self: Pin<&mut Self>,
36+
cx: &mut Context<'_>,
37+
store: StoreContextMut<D>,
38+
destination: Destination<'a, Self::Item, Self::Buffer>,
39+
finish: bool,
40+
) -> Poll<Result<StreamResult>> {
41+
// SAFETY: This is a standard pin-projection, and we never move
42+
// out of `self`.
43+
let stream = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
44+
45+
match stream.poll_next(cx) {
46+
Poll::Pending => {
47+
if finish {
48+
Poll::Ready(Ok(StreamResult::Cancelled))
49+
} else {
50+
Poll::Pending
51+
}
52+
}
53+
Poll::Ready(Some(item)) => {
54+
let mut destination = destination.as_direct(store, 1);
55+
destination.remaining()[0] = item;
56+
destination.mark_written(1);
57+
Poll::Ready(Ok(StreamResult::Completed))
58+
}
59+
Poll::Ready(None) => Poll::Ready(Ok(StreamResult::Dropped)),
60+
}
61+
}
62+
}
63+
64+
pub struct DirectPipeConsumer<S>(S);
65+
66+
impl<D, S: Sink<u8, Error: std::error::Error + Send + Sync> + Send + 'static> StreamConsumer<D>
67+
for DirectPipeConsumer<S>
68+
{
69+
type Item = u8;
70+
71+
fn poll_consume(
72+
self: Pin<&mut Self>,
73+
cx: &mut Context<'_>,
74+
store: StoreContextMut<D>,
75+
source: Source<Self::Item>,
76+
finish: bool,
77+
) -> Poll<Result<StreamResult>> {
78+
// SAFETY: This is a standard pin-projection, and we never move
79+
// out of `self`.
80+
let mut sink = unsafe { self.map_unchecked_mut(|v| &mut v.0) };
81+
82+
let on_pending = || {
83+
if finish {
84+
Poll::Ready(Ok(StreamResult::Cancelled))
85+
} else {
86+
Poll::Pending
87+
}
88+
};
89+
90+
match sink.as_mut().poll_flush(cx) {
91+
Poll::Pending => on_pending(),
92+
Poll::Ready(result) => {
93+
result?;
94+
match sink.as_mut().poll_ready(cx) {
95+
Poll::Pending => on_pending(),
96+
Poll::Ready(result) => {
97+
result?;
98+
let mut source = source.as_direct(store);
99+
let item = source.remaining()[0];
100+
source.mark_read(1);
101+
sink.start_send(item)?;
102+
Poll::Ready(Ok(StreamResult::Completed))
103+
}
104+
}
105+
}
106+
}
107+
}
108+
}
109+
21110
#[tokio::test]
22111
pub async fn async_closed_streams() -> Result<()> {
23112
let engine = Engine::new(&config())?;
@@ -49,35 +138,45 @@ pub async fn async_closed_streams() -> Result<()> {
49138
let value = 42_u8;
50139

51140
// First, test stream host->host
52-
{
53-
let (mut input_tx, input_rx) = mpsc::channel(1);
54-
let (output_tx, mut output_rx) = mpsc::channel(1);
55-
StreamReader::new(instance, &mut store, PipeProducer::new(input_rx))
56-
.pipe(&mut store, PipeConsumer::new(output_tx));
57-
58-
instance
59-
.run_concurrent(&mut store, async |_| {
60-
let (a, b) = future::join(
61-
async {
62-
for &value in &values {
63-
input_tx.send(value).await?;
64-
}
65-
drop(input_tx);
66-
anyhow::Ok(())
67-
},
68-
async {
69-
for &value in &values {
70-
assert_eq!(Some(value), output_rx.next().await);
71-
}
72-
assert!(output_rx.next().await.is_none());
73-
Ok(())
74-
},
75-
)
76-
.await;
77-
78-
a.and(b)
79-
})
80-
.await??;
141+
for direct_producer in [true, false] {
142+
for direct_consumer in [true, false] {
143+
let (mut input_tx, input_rx) = mpsc::channel(1);
144+
let (output_tx, mut output_rx) = mpsc::channel(1);
145+
let reader = if direct_producer {
146+
StreamReader::new(instance, &mut store, DirectPipeProducer(input_rx))
147+
} else {
148+
StreamReader::new(instance, &mut store, PipeProducer::new(input_rx))
149+
};
150+
if direct_consumer {
151+
reader.pipe(&mut store, DirectPipeConsumer(output_tx));
152+
} else {
153+
reader.pipe(&mut store, PipeConsumer::new(output_tx));
154+
}
155+
156+
instance
157+
.run_concurrent(&mut store, async |_| {
158+
let (a, b) = future::join(
159+
async {
160+
for &value in &values {
161+
input_tx.send(value).await?;
162+
}
163+
drop(input_tx);
164+
anyhow::Ok(())
165+
},
166+
async {
167+
for &value in &values {
168+
assert_eq!(Some(value), output_rx.next().await);
169+
}
170+
assert!(output_rx.next().await.is_none());
171+
Ok(())
172+
},
173+
)
174+
.await;
175+
176+
a.and(b)
177+
})
178+
.await??;
179+
}
81180
}
82181

83182
// Next, test futures host->host

crates/misc/component-async-tests/tests/scenario/transmit.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ impl<D> StreamProducer<D> for ReadinessProducer {
4242
self: Pin<&mut Self>,
4343
cx: &mut Context<'_>,
4444
mut store: StoreContextMut<'a, D>,
45-
destination: &'a mut Destination<'a, Self::Item, Self::Buffer>,
45+
destination: Destination<'a, Self::Item, Self::Buffer>,
4646
finish: bool,
4747
) -> Poll<Result<StreamResult>> {
4848
let me = self.get_mut();
@@ -62,7 +62,7 @@ impl<D> StreamProducer<D> for ReadinessProducer {
6262
Poll::Ready(Ok(StreamResult::Completed))
6363
} else {
6464
assert_eq!(capacity, Some(me.buffer.len()));
65-
let mut destination = destination.as_direct_destination(store).unwrap();
65+
let mut destination = destination.as_direct(store, me.buffer.len());
6666
destination.remaining().copy_from_slice(&me.buffer);
6767
destination.mark_written(me.buffer.len());
6868

@@ -85,7 +85,7 @@ impl<D> StreamConsumer<D> for ReadinessConsumer {
8585
self: Pin<&mut Self>,
8686
cx: &mut Context<'_>,
8787
mut store: StoreContextMut<D>,
88-
source: &mut Source<Self::Item>,
88+
source: Source<Self::Item>,
8989
finish: bool,
9090
) -> Poll<Result<StreamResult>> {
9191
let me = self.get_mut();
@@ -105,7 +105,7 @@ impl<D> StreamConsumer<D> for ReadinessConsumer {
105105
Poll::Ready(Ok(StreamResult::Completed))
106106
} else {
107107
assert_eq!(available, me.expected.len());
108-
let mut source = source.as_direct_source(store);
108+
let mut source = source.as_direct(store);
109109
assert_eq!(&me.expected, source.remaining());
110110
source.mark_read(me.expected.len());
111111

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
use test_programs::p3::wasi;
2+
use test_programs::p3::wasi::filesystem::types::{
3+
Descriptor, DescriptorFlags, DescriptorType, DirectoryEntry, OpenFlags, PathFlags,
4+
};
5+
6+
struct Component;
7+
8+
test_programs::p3::export!(Component);
9+
10+
impl test_programs::p3::exports::wasi::cli::run::Guest for Component {
11+
async fn run() -> Result<(), ()> {
12+
let preopens = wasi::filesystem::preopens::get_directories();
13+
let (dir, _) = &preopens[0];
14+
15+
test_readdir(dir).await;
16+
test_readdir_lots(dir).await;
17+
Ok(())
18+
}
19+
}
20+
21+
fn main() {
22+
unreachable!()
23+
}
24+
25+
async fn read_dir(dir: &Descriptor) -> Vec<DirectoryEntry> {
26+
let (dirs, result) = dir.read_directory().await;
27+
let mut dirs = dirs.collect().await;
28+
result.await.unwrap();
29+
dirs.sort_by_key(|d| d.name.clone());
30+
dirs
31+
}
32+
33+
async fn assert_empty_dir(dir: &Descriptor) {
34+
let dirs = read_dir(dir).await;
35+
assert_eq!(dirs.len(), 0);
36+
}
37+
38+
async fn test_readdir(dir: &Descriptor) {
39+
// Check the behavior in an empty directory
40+
assert_empty_dir(dir).await;
41+
42+
dir.open_at(
43+
PathFlags::empty(),
44+
"file".to_string(),
45+
OpenFlags::CREATE,
46+
DescriptorFlags::READ | DescriptorFlags::WRITE,
47+
)
48+
.await
49+
.unwrap();
50+
51+
dir.create_directory_at("nested".to_string()).await.unwrap();
52+
let nested = dir
53+
.open_at(
54+
PathFlags::empty(),
55+
"nested".to_string(),
56+
OpenFlags::DIRECTORY,
57+
DescriptorFlags::empty(),
58+
)
59+
.await
60+
.unwrap();
61+
62+
let entries = read_dir(dir).await;
63+
assert_eq!(entries.len(), 2);
64+
assert_eq!(entries[0].name, "file");
65+
assert_eq!(entries[0].type_, DescriptorType::RegularFile);
66+
assert_eq!(entries[1].name, "nested");
67+
assert_eq!(entries[1].type_, DescriptorType::Directory);
68+
69+
assert_empty_dir(&nested).await;
70+
drop(nested);
71+
72+
dir.unlink_file_at("file".to_string()).await.unwrap();
73+
dir.remove_directory_at("nested".to_string()).await.unwrap();
74+
}
75+
76+
async fn test_readdir_lots(dir: &Descriptor) {
77+
for count in 0..1000 {
78+
dir.open_at(
79+
PathFlags::empty(),
80+
format!("file.{count}"),
81+
OpenFlags::CREATE,
82+
DescriptorFlags::READ | DescriptorFlags::WRITE,
83+
)
84+
.await
85+
.expect("failed to create file");
86+
}
87+
88+
assert_eq!(read_dir(dir).await.len(), 1000);
89+
90+
for count in 0..1000 {
91+
dir.unlink_file_at(format!("file.{count}"))
92+
.await
93+
.expect("removing a file");
94+
}
95+
}

0 commit comments

Comments
 (0)