|
1 | | -use crate::p3::WasiHttpView; |
2 | 1 | use crate::p3::bindings::http::types::{ErrorCode, Trailers}; |
| 2 | +use crate::p3::{WasiHttp, WasiHttpCtxView}; |
3 | 3 | use anyhow::Context as _; |
4 | | -use bytes::{Bytes, BytesMut}; |
5 | | -use core::future::poll_fn; |
6 | | -use core::pin::{Pin, pin}; |
| 4 | +use bytes::Bytes; |
| 5 | +use core::pin::Pin; |
7 | 6 | use core::task::{Context, Poll, ready}; |
| 7 | +use http::HeaderMap; |
8 | 8 | use http_body_util::combinators::BoxBody; |
9 | 9 | use std::sync::Arc; |
10 | 10 | use tokio::sync::{mpsc, oneshot}; |
| 11 | +use tokio_util::sync::PollSender; |
| 12 | +use wasmtime::StoreContextMut; |
11 | 13 | use wasmtime::component::{ |
12 | | - Accessor, AccessorTask, FutureReader, FutureWriter, GuardedFutureReader, GuardedFutureWriter, |
13 | | - GuardedStreamReader, HasData, Resource, StreamReader, |
| 14 | + Accessor, FutureConsumer, FutureReader, Resource, Source, StreamConsumer, StreamReader, |
| 15 | + StreamResult, |
14 | 16 | }; |
15 | 17 |
|
16 | 18 | /// The concrete type behind a `wasi:http/types/body` resource. |
17 | 19 | pub(crate) enum Body { |
18 | 20 | /// Body constructed by the guest |
19 | | - Guest(GuestBodyContext), |
| 21 | + Guest { |
| 22 | + /// The body stream |
| 23 | + contents_rx: Option<StreamReader<u8>>, |
| 24 | + /// Future, on which guest will write result and optional trailers |
| 25 | + trailers_rx: FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>, |
| 26 | + /// Channel, on which transmission result will be written |
| 27 | + result_tx: oneshot::Sender<Result<(), ErrorCode>>, |
| 28 | + }, |
20 | 29 | /// Body constructed by the host. |
21 | 30 | Host(BoxBody<Bytes, ErrorCode>), |
22 | 31 | /// Body is consumed. |
23 | 32 | Consumed, |
24 | 33 | } |
25 | 34 |
|
26 | | -/// Context of a body constructed by the guest |
27 | | -pub struct GuestBodyContext { |
28 | | - /// The body stream |
29 | | - pub(crate) contents_rx: Option<StreamReader<u8>>, |
30 | | - /// Future, on which guest will write result and optional trailers |
31 | | - pub(crate) trailers_rx: FutureReader<Result<Option<Resource<Trailers>>, ErrorCode>>, |
32 | | - /// Future, on which transmission result will be written |
33 | | - pub(crate) result_tx: FutureWriter<Result<(), ErrorCode>>, |
| 35 | +pub(crate) struct GuestBodyConsumer { |
| 36 | + pub(crate) tx: PollSender<Bytes>, |
34 | 37 | } |
35 | 38 |
|
36 | | -pub struct GuestBodyTaskContext { |
37 | | - pub(crate) cx: GuestBodyContext, |
38 | | - pub(crate) contents_tx: mpsc::Sender<Bytes>, |
39 | | - pub(crate) trailers_tx: oneshot::Sender<Result<Option<Arc<http::HeaderMap>>, ErrorCode>>, |
40 | | -} |
41 | | - |
42 | | -impl GuestBodyTaskContext { |
43 | | - /// Consume the body given an I/O operation `io`. |
44 | | - /// |
45 | | - /// This function returns a [GuestBodyTask], which implements a [AccessorTask] and |
46 | | - /// must be run using the engine's event loop. |
47 | | - pub fn consume<Fut>(self, io: Fut) -> GuestBodyTask<Fut> |
48 | | - where |
49 | | - Fut: Future<Output = Result<(), ErrorCode>>, |
50 | | - { |
51 | | - GuestBodyTask { cx: self, io } |
52 | | - } |
53 | | -} |
| 39 | +impl<D> StreamConsumer<D> for GuestBodyConsumer { |
| 40 | + type Item = u8; |
54 | 41 |
|
55 | | -pub struct GuestBodyTask<T> { |
56 | | - cx: GuestBodyTaskContext, |
57 | | - io: T, |
58 | | -} |
59 | | - |
60 | | -impl<T, U, Fut> AccessorTask<T, U, wasmtime::Result<()>> for GuestBodyTask<Fut> |
61 | | -where |
62 | | - T: WasiHttpView, |
63 | | - U: HasData, |
64 | | - Fut: Future<Output = Result<(), ErrorCode>> + Send + 'static, |
65 | | -{ |
66 | | - async fn run(self, store: &Accessor<T, U>) -> wasmtime::Result<()> { |
67 | | - let Self { |
68 | | - cx: |
69 | | - GuestBodyTaskContext { |
70 | | - cx: |
71 | | - GuestBodyContext { |
72 | | - contents_rx, |
73 | | - trailers_rx, |
74 | | - result_tx, |
75 | | - }, |
76 | | - contents_tx, |
77 | | - mut trailers_tx, |
78 | | - }, |
79 | | - io, |
80 | | - } = self; |
81 | | - let trailers_rx = GuardedFutureReader::new(store, trailers_rx); |
82 | | - let mut result_tx = GuardedFutureWriter::new(store, result_tx); |
83 | | - if let Some(contents_rx) = contents_rx { |
84 | | - let mut contents_rx = GuardedStreamReader::new(store, contents_rx); |
85 | | - // TODO: use content-length |
86 | | - let mut buf = BytesMut::with_capacity(8192); |
87 | | - while !contents_rx.is_closed() { |
88 | | - let mut tx = pin!(contents_tx.reserve()); |
89 | | - let Some(Ok(tx)) = ({ |
90 | | - let mut contents_tx_dropped = pin!(contents_rx.watch_writer()); |
91 | | - poll_fn(|cx| match contents_tx_dropped.as_mut().poll(cx) { |
92 | | - Poll::Ready(()) => return Poll::Ready(None), |
93 | | - Poll::Pending => tx.as_mut().poll(cx).map(Some), |
94 | | - }) |
95 | | - .await |
96 | | - }) else { |
97 | | - // Either: |
98 | | - // - body receiver has been closed |
99 | | - // - guest writer has been closed |
100 | | - break; |
101 | | - }; |
102 | | - buf = contents_rx.read(buf).await; |
103 | | - if !buf.is_empty() { |
104 | | - tx.send(buf.split().freeze()); |
| 42 | + fn poll_consume( |
| 43 | + mut self: Pin<&mut Self>, |
| 44 | + cx: &mut Context<'_>, |
| 45 | + store: StoreContextMut<D>, |
| 46 | + src: Source<Self::Item>, |
| 47 | + finish: bool, |
| 48 | + ) -> Poll<wasmtime::Result<StreamResult>> { |
| 49 | + match self.tx.poll_reserve(cx) { |
| 50 | + Poll::Ready(Ok(())) => { |
| 51 | + let mut src = src.as_direct(store); |
| 52 | + let buf = Bytes::copy_from_slice(src.remaining()); |
| 53 | + let n = buf.len(); |
| 54 | + match self.tx.send_item(buf) { |
| 55 | + Ok(()) => { |
| 56 | + src.mark_read(n); |
| 57 | + Poll::Ready(Ok(StreamResult::Completed)) |
| 58 | + } |
| 59 | + Err(..) => Poll::Ready(Ok(StreamResult::Dropped)), |
105 | 60 | } |
106 | 61 | } |
| 62 | + Poll::Ready(Err(..)) => Poll::Ready(Ok(StreamResult::Dropped)), |
| 63 | + Poll::Pending if finish => Poll::Ready(Ok(StreamResult::Cancelled)), |
| 64 | + Poll::Pending => Poll::Pending, |
107 | 65 | } |
108 | | - drop(contents_tx); |
109 | | - |
110 | | - let mut rx = pin!(trailers_rx.read()); |
111 | | - match poll_fn(|cx| match trailers_tx.poll_closed(cx) { |
112 | | - Poll::Ready(()) => return Poll::Ready(None), |
113 | | - Poll::Pending => rx.as_mut().poll(cx).map(Some), |
114 | | - }) |
115 | | - .await |
116 | | - { |
117 | | - Some(Some(Ok(Some(trailers)))) => { |
118 | | - let trailers = store.with(|mut store| { |
119 | | - store |
120 | | - .data_mut() |
121 | | - .http() |
122 | | - .table |
123 | | - .delete(trailers) |
124 | | - .context("failed to delete trailers") |
125 | | - })?; |
126 | | - _ = trailers_tx.send(Ok(Some(trailers.into()))); |
127 | | - } |
128 | | - Some(Some(Ok(None))) => { |
129 | | - _ = trailers_tx.send(Ok(None)); |
130 | | - } |
131 | | - Some(Some(Err(err))) => { |
132 | | - _ = trailers_tx.send(Err(err)); |
133 | | - } |
134 | | - Some(None) | None => { |
135 | | - // Either: |
136 | | - // - trailer receiver has been closed |
137 | | - // - guest writer has been closed |
138 | | - drop(trailers_tx); |
139 | | - } |
140 | | - } |
141 | | - |
142 | | - let mut io = pin!(io); |
143 | | - if let Some(res) = { |
144 | | - let mut result_rx_dropped = pin!(result_tx.watch_reader()); |
145 | | - poll_fn(|cx| match result_rx_dropped.as_mut().poll(cx) { |
146 | | - Poll::Ready(()) => return Poll::Ready(None), |
147 | | - Poll::Pending => io.as_mut().poll(cx).map(Some), |
148 | | - }) |
149 | | - .await |
150 | | - } { |
151 | | - result_tx.write(res).await; |
152 | | - } |
153 | | - Ok(()) |
154 | 66 | } |
155 | 67 | } |
156 | 68 |
|
@@ -234,3 +146,38 @@ impl http_body::Body for ConsumedBody { |
234 | 146 | http_body::SizeHint::with_exact(0) |
235 | 147 | } |
236 | 148 | } |
| 149 | + |
| 150 | +pub(crate) struct GuestTrailerConsumer<T> { |
| 151 | + pub(crate) tx: oneshot::Sender<Result<Option<Arc<HeaderMap>>, ErrorCode>>, |
| 152 | + pub(crate) getter: for<'a> fn(&'a mut T) -> WasiHttpCtxView<'a>, |
| 153 | +} |
| 154 | + |
| 155 | +impl<D> FutureConsumer<D> for GuestTrailerConsumer<D> |
| 156 | +where |
| 157 | + D: 'static, |
| 158 | +{ |
| 159 | + type Item = Result<Option<Resource<Trailers>>, ErrorCode>; |
| 160 | + |
| 161 | + async fn consume(self, store: &Accessor<D>, res: Self::Item) -> wasmtime::Result<()> { |
| 162 | + match res { |
| 163 | + Ok(Some(trailers)) => store |
| 164 | + .with_getter::<WasiHttp>(self.getter) |
| 165 | + .with(|mut store| { |
| 166 | + let WasiHttpCtxView { table, .. } = store.get(); |
| 167 | + let trailers = table |
| 168 | + .delete(trailers) |
| 169 | + .context("failed to delete trailers")?; |
| 170 | + _ = self.tx.send(Ok(Some(Arc::from(trailers)))); |
| 171 | + Ok(()) |
| 172 | + }), |
| 173 | + Ok(None) => { |
| 174 | + _ = self.tx.send(Ok(None)); |
| 175 | + Ok(()) |
| 176 | + } |
| 177 | + Err(err) => { |
| 178 | + _ = self.tx.send(Err(err)); |
| 179 | + Ok(()) |
| 180 | + } |
| 181 | + } |
| 182 | + } |
| 183 | +} |
0 commit comments