Skip to content

Commit 5b734cc

Browse files
authored
wasip3: Attempt cancellation in file ops (#13163)
This commit updates the file stream implementations to at least attempt cancellation of blocking work instead of completely ignoring it. This isn't guaranteed to work but at least enables exploring some possible state spaces in wasi-libc for streams interacting with the host.
1 parent 939f3d0 commit 5b734cc

1 file changed

Lines changed: 41 additions & 15 deletions

File tree

  • crates/wasi/src/p3/filesystem

crates/wasi/src/p3/filesystem/host.rs

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,7 @@ impl<D> StreamProducer<D> for ReadStreamProducer {
167167
cx: &mut Context<'_>,
168168
store: StoreContextMut<'a, D>,
169169
mut dst: Destination<'a, Self::Item, Self::Buffer>,
170-
// Intentionally ignore this as in blocking mode everything is always
171-
// ready and otherwise spawned blocking work can't be cancelled.
172-
_finish: bool,
170+
finish: bool,
173171
) -> Poll<wasmtime::Result<StreamResult>> {
174172
if let Some(file) = self.file.as_blocking_file() {
175173
// Once a blocking file, always a blocking file, so assert as such.
@@ -213,22 +211,37 @@ impl<D> StreamProducer<D> for ReadStreamProducer {
213211
// Await the completion of the read task. Note that this is not a
214212
// cancellable await point because we can't cancel the other task, so
215213
// the `finish` parameter is ignored.
216-
let res = ready!(Pin::new(task).poll(cx)).expect("I/O task should not panic");
214+
let result = match Pin::new(&mut *task).poll(cx) {
215+
// If cancellation is requested, then flag that to Tokio. Note that
216+
// this still waits for the actual completion of the spawned task,
217+
// which won't actually happen if it's already executing.
218+
Poll::Pending if finish => {
219+
task.abort();
220+
ready!(Pin::new(task).poll(cx))
221+
}
222+
other => ready!(other),
223+
};
217224
self.task = None;
218-
match res {
219-
Ok(buf) if buf.is_empty() => {
225+
match result {
226+
Ok(Ok(buf)) if buf.is_empty() => {
220227
self.close(Ok(()));
221228
Poll::Ready(Ok(StreamResult::Dropped))
222229
}
223-
Ok(buf) => {
230+
Ok(Ok(buf)) => {
224231
let n = buf.len();
225232
dst.set_buffer(Cursor::new(buf));
226233
Poll::Ready(Ok(self.complete_read(n)))
227234
}
228-
Err(err) => {
235+
Ok(Err(err)) => {
229236
self.close(Err(err.into()));
230237
Poll::Ready(Ok(StreamResult::Dropped))
231238
}
239+
Err(err) => {
240+
if err.is_cancelled() {
241+
return Poll::Ready(Ok(StreamResult::Cancelled));
242+
}
243+
panic!("I/O task should not panic: {err}")
244+
}
232245
}
233246
}
234247
}
@@ -434,9 +447,7 @@ impl<D> StreamConsumer<D> for WriteStreamConsumer {
434447
cx: &mut Context<'_>,
435448
store: StoreContextMut<D>,
436449
src: Source<Self::Item>,
437-
// Intentionally ignore this as in blocking mode everything is always
438-
// ready and otherwise spawned blocking work can't be cancelled.
439-
_finish: bool,
450+
finish: bool,
440451
) -> Poll<wasmtime::Result<StreamResult>> {
441452
let mut src = src.as_direct(store);
442453
if let Some(file) = self.file.as_blocking_file() {
@@ -462,19 +473,34 @@ impl<D> StreamConsumer<D> for WriteStreamConsumer {
462473
let location = me.location;
463474
spawn_blocking(move || location.write(&file, &buf).map(|n| (buf, n)))
464475
});
465-
let res = ready!(Pin::new(task).poll(cx)).expect("I/O task should not panic");
476+
let result = match Pin::new(&mut *task).poll(cx) {
477+
// If cancellation is requested, then flag that to Tokio. Note that
478+
// this still waits for the actual completion of the spawned task,
479+
// which won't actually happen if it's already executing.
480+
Poll::Pending if finish => {
481+
task.abort();
482+
ready!(Pin::new(task).poll(cx))
483+
}
484+
other => ready!(other),
485+
};
466486
self.task = None;
467-
match res {
468-
Ok((buf, n)) => {
487+
match result {
488+
Ok(Ok((buf, n))) => {
469489
src.mark_read(n);
470490
self.buffer = buf;
471491
self.buffer.clear();
472492
Poll::Ready(Ok(self.complete_write(n)))
473493
}
474-
Err(err) => {
494+
Ok(Err(err)) => {
475495
self.close(Err(err.into()));
476496
Poll::Ready(Ok(StreamResult::Dropped))
477497
}
498+
Err(err) => {
499+
if err.is_cancelled() {
500+
return Poll::Ready(Ok(StreamResult::Cancelled));
501+
}
502+
panic!("I/O task should not panic: {err}")
503+
}
478504
}
479505
}
480506
}

0 commit comments

Comments
 (0)