Skip to content

Commit a6b9659

Browse files
committed
Fix async bug(s) in MGS serial console forwarding
Our `select!` loop was pulling enqueued data out of in-memory buffers, but then if the future they went to was cancelled by the `select!` the data was lost. This splits the forwarding up into three separate tokio tasks, and leaves the `select!` loop only polling on cancel-safe futures that don't lose any data if dropped.
1 parent d565469 commit a6b9659

File tree

1 file changed

+78
-67
lines changed

1 file changed

+78
-67
lines changed

gateway/src/serial_console.rs

Lines changed: 78 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
// Copyright 2022 Oxide Computer Company
66

77
use crate::error::Error;
8-
use futures::future::Fuse;
9-
use futures::FutureExt;
8+
use futures::stream::SplitSink;
9+
use futures::stream::SplitStream;
1010
use futures::SinkExt;
1111
use futures::StreamExt;
1212
use gateway_messages::SpComponent;
@@ -23,9 +23,8 @@ use slog::error;
2323
use slog::info;
2424
use slog::Logger;
2525
use std::borrow::Cow;
26-
use std::collections::VecDeque;
27-
use std::mem;
2826
use std::ops::Deref;
27+
use tokio::sync::mpsc;
2928
use tokio_tungstenite::tungstenite::handshake;
3029
use tokio_tungstenite::tungstenite::protocol::frame::coding::CloseCode;
3130
use tokio_tungstenite::tungstenite::protocol::CloseFrame;
@@ -143,41 +142,45 @@ struct SerialConsoleTask {
143142

144143
impl SerialConsoleTask {
145144
async fn run(self, log: &Logger) -> Result<(), SerialTaskError> {
146-
let (mut ws_sink, mut ws_stream) = self.ws_stream.split();
145+
let (ws_sink, ws_stream) = self.ws_stream.split();
146+
147+
// Spawn a task to send any messages received from the SP to the client
148+
// websocket.
149+
//
150+
// TODO-cleanup We have no way to apply backpressure to the SP, and are
151+
// willing to buffer up an arbitray amount of data in memory. We should
152+
// apply some form of backpressure (which the SP could only handle by
153+
// discarding data).
154+
let (ws_sink_tx, ws_sink_rx) = mpsc::unbounded_channel();
155+
let mut ws_sink_handle =
156+
tokio::spawn(Self::ws_sink_task(ws_sink, ws_sink_rx));
157+
158+
// Spawn a task to send any messages received from the client websocket
159+
// to the SP.
147160
let (console_tx, mut console_rx) = self.console.split();
148161
let console_tx = DetachOnDrop::new(console_tx);
149-
150-
// TODO Currently we do not apply any backpressure on the SP and are
151-
// willing to buffer up an arbitrary amount of data in memory. Is it
152-
// reasonable to apply backpressure to the SP over UDP? Should we have
153-
// caps on memory and start discarding data if we exceed them? We _do_
154-
// apply backpressure to the websocket, delaying reading from it if we
155-
// still have data waiting to be sent to the SP.
156-
let mut data_from_sp: VecDeque<Vec<u8>> = VecDeque::new();
157-
let mut data_to_sp: Vec<u8> = Vec::new();
162+
let mut ws_recv_handle = tokio::spawn(Self::ws_recv_task(
163+
ws_stream,
164+
console_tx,
165+
log.clone(),
166+
));
158167

159168
loop {
160-
let ws_send = if let Some(data) = data_from_sp.pop_front() {
161-
ws_sink.send(Message::Binary(data)).fuse()
162-
} else {
163-
Fuse::terminated()
164-
};
165-
166-
let (ws_recv, sp_send) = if data_to_sp.is_empty() {
167-
(ws_stream.next().fuse(), Fuse::terminated())
168-
} else {
169-
// Steal `data_to_sp` and create a future to send it to the SP.
170-
let mut to_send = Vec::new();
171-
mem::swap(&mut to_send, &mut data_to_sp);
172-
(Fuse::terminated(), console_tx.write(to_send).fuse())
173-
};
174-
175169
tokio::select! {
176-
// Finished (or failed) sending data to the SP.
177-
send_success = sp_send => {
178-
send_success
179-
.map_err(gateway_sp_comms::error::Error::from)
180-
.map_err(Error::from)?;
170+
// Our ws_sink task completed; this is only possible if it
171+
// fails, since it loops until we drop ws_sink_tx (which doesn't
172+
// happen until we return!).
173+
join_result = &mut ws_sink_handle => {
174+
let result = join_result.expect("ws sink task panicked");
175+
return result;
176+
}
177+
178+
// Our ws_recv task completed; this is possible if the websocket
179+
// connection fails or is closed by the client. In either case,
180+
// we're also done.
181+
join_result = &mut ws_recv_handle => {
182+
let result = join_result.expect("ws recv task panicked");
183+
return result;
181184
}
182185

183186
// Receive a UDP packet from the SP.
@@ -188,7 +191,7 @@ impl SerialConsoleTask {
188191
log, "received serial console data from SP";
189192
"length" => data.len(),
190193
);
191-
data_from_sp.push_back(data);
194+
let _ = ws_sink_tx.send(Message::Binary(data));
192195
}
193196
None => {
194197
// Sender is closed; i.e., we've been detached.
@@ -198,47 +201,55 @@ impl SerialConsoleTask {
198201
code: CloseCode::Policy,
199202
reason: Cow::Borrowed("serial console was detached"),
200203
};
201-
ws_sink.send(Message::Close(Some(close))).await?;
204+
let _ = ws_sink_tx.send(Message::Close(Some(close)));
202205
return Ok(());
203206
}
204207
}
205208
}
209+
}
210+
}
211+
}
206212

207-
// Send a previously-received UDP packet of data to the websocket
208-
// client
209-
write_success = ws_send => {
210-
write_success?;
211-
}
213+
async fn ws_sink_task(
214+
mut ws_sink: SplitSink<WebSocketStream<Upgraded>, Message>,
215+
mut messages: mpsc::UnboundedReceiver<Message>,
216+
) -> Result<(), SerialTaskError> {
217+
while let Some(message) = messages.recv().await {
218+
ws_sink.send(message).await?;
219+
}
220+
Ok(())
221+
}
212222

213-
// Receive from the websocket to send to the SP.
214-
msg = ws_recv => {
215-
match msg {
216-
Some(Ok(Message::Binary(mut data))) => {
217-
// we only populate ws_recv when we have no data
218-
// currently queued up; sanity check that here
219-
assert!(data_to_sp.is_empty());
220-
data_to_sp.append(&mut data);
221-
}
222-
Some(Ok(Message::Close(_))) | None => {
223-
info!(
224-
log,
225-
"remote end closed websocket; terminating task",
226-
);
227-
return Ok(());
228-
}
229-
Some(other) => {
230-
let wrong_message = other?;
231-
error!(
232-
log,
233-
"bogus websocket message; terminating task";
234-
"message" => ?wrong_message,
235-
);
236-
return Ok(());
237-
}
238-
}
223+
async fn ws_recv_task(
224+
mut ws_stream: SplitStream<WebSocketStream<Upgraded>>,
225+
console_tx: DetachOnDrop,
226+
log: Logger,
227+
) -> Result<(), SerialTaskError> {
228+
while let Some(message) = ws_stream.next().await {
229+
match message {
230+
Ok(Message::Binary(data)) => {
231+
console_tx
232+
.write(data)
233+
.await
234+
.map_err(gateway_sp_comms::error::Error::from)
235+
.map_err(Error::from)?;
236+
}
237+
Ok(Message::Close(_)) => {
238+
break;
239+
}
240+
Ok(other) => {
241+
error!(
242+
log,
243+
"bogus websocket message; terminating task";
244+
"message" => ?other,
245+
);
246+
return Ok(());
239247
}
248+
Err(err) => return Err(err.into()),
240249
}
241250
}
251+
info!(log, "remote end closed websocket; terminating task",);
252+
Ok(())
242253
}
243254
}
244255

0 commit comments

Comments
 (0)