Skip to content

Commit efd56f6

Browse files
authored
fix a couple of partial read/write bugs (#11981)
* reset read/write state back to `Open` on event delivery If one end of a stream does a partial read or write, we leave the other end in a `GuestReady` state, allowing further reads or writes to proceed until the buffer has been drained or filled, respectively. However, once we've delivered the event regarding the partial operation, we need to set the state back to `Open`, since we'll have released the buffer back to the guest at that point. Signed-off-by: Joel Dice <joel.dice@fermyon.com> * delay returning `Dropped` until producer buffer drained If the `StreamProducer` calls `Destination::set_buffer`, we need to make sure all the items in that buffer have been delivered to the receiver (or the receiver closes its end) before telling it the write end has been dropped. Signed-off-by: Joel Dice <joel.dice@fermyon.com> * add short reads tests These cover a couple of scenarios where the guest and/or host read owned resource items one-at-a-time from writes of more than one item, forcing the writer to re-take ownership of the unwritten items between writes. This also covers the case where the host's `StreamProducer` returns `StreamResult::Dropped` after calling `Destination::set_buffer`, in which case Wasmtime must delay telling the other end about the dropped stream until that buffer has been drained. Signed-off-by: Joel Dice <joel.dice@fermyon.com> --------- Signed-off-by: Joel Dice <joel.dice@fermyon.com>
1 parent 498542c commit efd56f6

File tree

6 files changed

+415
-126
lines changed

6 files changed

+415
-126
lines changed

crates/misc/component-async-tests/tests/scenario/streams.rs

Lines changed: 176 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,23 @@ use {
66
util::{OneshotConsumer, OneshotProducer, PipeConsumer, PipeProducer},
77
},
88
futures::{
9-
Sink, SinkExt, Stream, StreamExt,
9+
FutureExt, Sink, SinkExt, Stream, StreamExt,
1010
channel::{mpsc, oneshot},
1111
future,
1212
},
1313
std::{
14+
mem,
15+
ops::DerefMut,
1416
pin::Pin,
1517
sync::{Arc, Mutex},
16-
task::{Context, Poll},
18+
task::{self, Context, Poll},
19+
time::Duration,
1720
},
1821
wasmtime::{
1922
Engine, Store, StoreContextMut,
2023
component::{
21-
Destination, FutureReader, Linker, ResourceTable, Source, StreamConsumer,
22-
StreamProducer, StreamReader, StreamResult,
24+
Destination, FutureReader, Lift, Linker, ResourceTable, Source, StreamConsumer,
25+
StreamProducer, StreamReader, StreamResult, VecBuffer,
2326
},
2427
},
2528
wasmtime_wasi::WasiCtxBuilder,
@@ -296,3 +299,172 @@ pub async fn async_closed_stream() -> Result<()> {
296299
})
297300
.await?
298301
}
302+
303+
struct VecProducer<T> {
304+
source: Vec<T>,
305+
sleep: Pin<Box<dyn Future<Output = ()> + Send>>,
306+
}
307+
308+
impl<T> VecProducer<T> {
309+
fn new(source: Vec<T>, delay: bool) -> Self {
310+
Self {
311+
source,
312+
sleep: if delay {
313+
tokio::time::sleep(Duration::from_millis(10)).boxed()
314+
} else {
315+
async {}.boxed()
316+
},
317+
}
318+
}
319+
}
320+
321+
impl<D, T: Lift + Unpin + 'static> StreamProducer<D> for VecProducer<T> {
322+
type Item = T;
323+
type Buffer = VecBuffer<T>;
324+
325+
fn poll_produce(
326+
mut self: Pin<&mut Self>,
327+
cx: &mut Context<'_>,
328+
_: StoreContextMut<D>,
329+
mut destination: Destination<Self::Item, Self::Buffer>,
330+
_: bool,
331+
) -> Poll<Result<StreamResult>> {
332+
let sleep = &mut self.as_mut().get_mut().sleep;
333+
task::ready!(sleep.as_mut().poll(cx));
334+
*sleep = async {}.boxed();
335+
336+
destination.set_buffer(mem::take(&mut self.get_mut().source).into());
337+
Poll::Ready(Ok(StreamResult::Dropped))
338+
}
339+
}
340+
341+
struct OneAtATime<T> {
342+
destination: Arc<Mutex<Vec<T>>>,
343+
sleep: Pin<Box<dyn Future<Output = ()> + Send>>,
344+
}
345+
346+
impl<T> OneAtATime<T> {
347+
fn new(destination: Arc<Mutex<Vec<T>>>, delay: bool) -> Self {
348+
Self {
349+
destination,
350+
sleep: if delay {
351+
tokio::time::sleep(Duration::from_millis(10)).boxed()
352+
} else {
353+
async {}.boxed()
354+
},
355+
}
356+
}
357+
}
358+
359+
impl<D, T: Lift + 'static> StreamConsumer<D> for OneAtATime<T> {
360+
type Item = T;
361+
362+
fn poll_consume(
363+
mut self: Pin<&mut Self>,
364+
cx: &mut Context<'_>,
365+
store: StoreContextMut<D>,
366+
mut source: Source<Self::Item>,
367+
_: bool,
368+
) -> Poll<Result<StreamResult>> {
369+
let sleep = &mut self.as_mut().get_mut().sleep;
370+
task::ready!(sleep.as_mut().poll(cx));
371+
*sleep = async {}.boxed();
372+
373+
let value = &mut None;
374+
source.read(store, value)?;
375+
self.destination.lock().unwrap().push(value.take().unwrap());
376+
Poll::Ready(Ok(StreamResult::Completed))
377+
}
378+
}
379+
380+
mod short_reads {
381+
wasmtime::component::bindgen!({
382+
path: "wit",
383+
world: "short-reads-guest",
384+
exports: { default: async | task_exit },
385+
});
386+
}
387+
388+
#[tokio::test]
389+
pub async fn async_short_reads() -> Result<()> {
390+
test_async_short_reads(false).await
391+
}
392+
393+
#[tokio::test]
394+
async fn async_short_reads_with_delay() -> Result<()> {
395+
test_async_short_reads(true).await
396+
}
397+
398+
async fn test_async_short_reads(delay: bool) -> Result<()> {
399+
use short_reads::exports::local::local::short_reads::Thing;
400+
401+
let engine = Engine::new(&config())?;
402+
403+
let component = make_component(
404+
&engine,
405+
&[test_programs_artifacts::ASYNC_SHORT_READS_COMPONENT],
406+
)
407+
.await?;
408+
409+
let mut linker = Linker::new(&engine);
410+
411+
wasmtime_wasi::p2::add_to_linker_async(&mut linker)?;
412+
413+
let mut store = Store::new(
414+
&engine,
415+
Ctx {
416+
wasi: WasiCtxBuilder::new().inherit_stdio().build(),
417+
table: ResourceTable::default(),
418+
continue_: false,
419+
wakers: Arc::new(Mutex::new(None)),
420+
},
421+
);
422+
423+
let guest =
424+
short_reads::ShortReadsGuest::instantiate_async(&mut store, &component, &linker).await?;
425+
let thing = guest.local_local_short_reads().thing();
426+
427+
let strings = ["a", "b", "c", "d", "e"];
428+
let mut things = Vec::with_capacity(strings.len());
429+
for string in strings {
430+
things.push(thing.call_constructor(&mut store, string).await?);
431+
}
432+
433+
store
434+
.run_concurrent(async |store| {
435+
let count = things.len();
436+
let stream =
437+
store.with(|store| StreamReader::new(store, VecProducer::new(things, delay)));
438+
439+
let (stream, task) = guest
440+
.local_local_short_reads()
441+
.call_short_reads(store, stream)
442+
.await?;
443+
444+
let received_things = Arc::new(Mutex::new(Vec::<Thing>::with_capacity(count)));
445+
// Read just one item at a time from the guest, forcing it to
446+
// re-take ownership of any unwritten items.
447+
store.with(|store| stream.pipe(store, OneAtATime::new(received_things.clone(), delay)));
448+
449+
task.block(store).await;
450+
451+
assert_eq!(count, received_things.lock().unwrap().len());
452+
453+
let mut received_strings = Vec::with_capacity(strings.len());
454+
let received_things = mem::take(received_things.lock().unwrap().deref_mut());
455+
for it in received_things {
456+
received_strings.push(thing.call_get(store, it).await?.0);
457+
}
458+
459+
assert_eq!(
460+
&strings[..],
461+
&received_strings
462+
.iter()
463+
.map(|s| s.as_str())
464+
.collect::<Vec<_>>()
465+
);
466+
467+
anyhow::Ok(())
468+
})
469+
.await?
470+
}

crates/misc/component-async-tests/tests/test_all.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use scenario::round_trip_many::{
3030
async_round_trip_many_stackful, async_round_trip_many_stackless,
3131
async_round_trip_many_synchronous, async_round_trip_many_wait,
3232
};
33-
use scenario::streams::{async_closed_stream, async_closed_streams};
33+
use scenario::streams::{async_closed_stream, async_closed_streams, async_short_reads};
3434
use scenario::transmit::{
3535
async_cancel_callee, async_cancel_caller, async_cancel_transmit, async_intertask_communication,
3636
async_poll_stackless, async_poll_synchronous, async_readiness, async_synchronous_transmit,

crates/misc/component-async-tests/wit/test.wit

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,15 @@ interface closed-stream {
180180
get: func() -> stream<u8>;
181181
}
182182

183+
interface short-reads {
184+
resource thing {
185+
constructor(s: string);
186+
get: async func() -> string;
187+
}
188+
189+
short-reads: async func(s: stream<thing>) -> stream<thing>;
190+
}
191+
183192
world yield-caller {
184193
import continue;
185194
import ready;
@@ -348,3 +357,7 @@ world sleep-post-return-caller {
348357
world closed-stream-guest {
349358
export closed-stream;
350359
}
360+
361+
world short-reads-guest {
362+
export short-reads;
363+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
use {
2+
bindings::{
3+
exports::local::local::short_reads::{self, Guest, GuestThing},
4+
wit_stream,
5+
},
6+
wit_bindgen::{StreamReader, StreamResult, rt::async_support},
7+
};
8+
9+
mod bindings {
10+
wit_bindgen::generate!({
11+
path: "../misc/component-async-tests/wit",
12+
world: "short-reads-guest",
13+
});
14+
15+
use super::Component;
16+
export!(Component);
17+
}
18+
19+
struct Thing {
20+
value: String,
21+
}
22+
23+
impl GuestThing for Thing {
24+
fn new(value: String) -> Self {
25+
Self { value }
26+
}
27+
28+
async fn get(&self) -> String {
29+
self.value.clone()
30+
}
31+
}
32+
33+
struct Component;
34+
35+
impl Guest for Component {
36+
type Thing = Thing;
37+
38+
async fn short_reads(
39+
mut stream: StreamReader<short_reads::Thing>,
40+
) -> StreamReader<short_reads::Thing> {
41+
let (mut tx, rx) = wit_stream::new();
42+
43+
async_support::spawn(async move {
44+
// Read the things one at a time, forcing the host to re-take
45+
// ownership of any unwritten items between writes.
46+
let mut things = Vec::new();
47+
loop {
48+
let (status, buffer) = stream.read(Vec::with_capacity(1)).await;
49+
match status {
50+
StreamResult::Complete(_) => {
51+
things.extend(buffer);
52+
}
53+
StreamResult::Dropped => break,
54+
StreamResult::Cancelled => unreachable!(),
55+
}
56+
}
57+
// Write the things all at once. The host will read them only one
58+
// at a time, forcing us to re-take ownership of any unwritten
59+
// items between writes.
60+
things = tx.write_all(things).await;
61+
assert!(things.is_empty());
62+
});
63+
64+
rx
65+
}
66+
}
67+
68+
// Unused function; required since this file is built as a `bin`:
69+
fn main() {}

crates/wasmtime/src/runtime/component/concurrent.rs

Lines changed: 2 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,7 @@ use crate::component::func::{self, Func, Options};
5454
use crate::component::{HasData, HasSelf, Instance, Resource, ResourceTable, ResourceTableError};
5555
use crate::fiber::{self, StoreFiber, StoreFiberYield};
5656
use crate::store::{Store, StoreId, StoreInner, StoreOpaque, StoreToken};
57-
use crate::vm::component::{
58-
CallContext, ComponentInstance, InstanceFlags, ResourceTables, TransmitLocalState,
59-
};
57+
use crate::vm::component::{CallContext, ComponentInstance, InstanceFlags, ResourceTables};
6058
use crate::vm::{AlwaysMut, SendSyncPtr, VMFuncRef, VMMemoryDefinition, VMStore};
6159
use crate::{AsContext, AsContextMut, FuncType, StoreContext, StoreContextMut, ValRaw, ValType};
6260
use anyhow::{Context as _, Result, anyhow, bail};
@@ -1568,7 +1566,7 @@ impl Instance {
15681566
"deliver event {event:?} to {guest_task:?} for {waitable:?} (handle {handle}); set {set:?}"
15691567
);
15701568

1571-
waitable.on_delivery(self.id().get_mut(store), event);
1569+
waitable.on_delivery(store, self, event);
15721570

15731571
Some((event, Some((waitable, handle))))
15741572
} else {
@@ -4356,55 +4354,6 @@ impl Waitable {
43564354
Ok(())
43574355
}
43584356

4359-
/// Handle the imminent delivery of the specified event, e.g. by updating
4360-
/// the state of the stream or future.
4361-
fn on_delivery(&self, instance: Pin<&mut ComponentInstance>, event: Event) {
4362-
match event {
4363-
Event::FutureRead {
4364-
pending: Some((ty, handle)),
4365-
..
4366-
}
4367-
| Event::FutureWrite {
4368-
pending: Some((ty, handle)),
4369-
..
4370-
} => {
4371-
let runtime_instance = instance.component().types()[ty].instance;
4372-
let (rep, state) = instance.guest_tables().0[runtime_instance]
4373-
.future_rep(ty, handle)
4374-
.unwrap();
4375-
assert_eq!(rep, self.rep());
4376-
assert_eq!(*state, TransmitLocalState::Busy);
4377-
*state = match event {
4378-
Event::FutureRead { .. } => TransmitLocalState::Read { done: false },
4379-
Event::FutureWrite { .. } => TransmitLocalState::Write { done: false },
4380-
_ => unreachable!(),
4381-
};
4382-
}
4383-
Event::StreamRead {
4384-
pending: Some((ty, handle)),
4385-
code,
4386-
}
4387-
| Event::StreamWrite {
4388-
pending: Some((ty, handle)),
4389-
code,
4390-
} => {
4391-
let runtime_instance = instance.component().types()[ty].instance;
4392-
let (rep, state) = instance.guest_tables().0[runtime_instance]
4393-
.stream_rep(ty, handle)
4394-
.unwrap();
4395-
assert_eq!(rep, self.rep());
4396-
assert_eq!(*state, TransmitLocalState::Busy);
4397-
let done = matches!(code, ReturnCode::Dropped(_));
4398-
*state = match event {
4399-
Event::StreamRead { .. } => TransmitLocalState::Read { done },
4400-
Event::StreamWrite { .. } => TransmitLocalState::Write { done },
4401-
_ => unreachable!(),
4402-
};
4403-
}
4404-
_ => {}
4405-
}
4406-
}
4407-
44084357
/// Remove this waitable from the instance's rep table.
44094358
fn delete_from(&self, state: &mut ConcurrentState) -> Result<()> {
44104359
match self {

0 commit comments

Comments
 (0)