Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,8 @@ async fn ws_client_actor_inner(
}
Item::Message(ClientMessage::Ping(_message)) => {
log::trace!("Received ping from client {}", client.id);
// TODO: should we respond with a `Pong`?
// No need to explicitly respond with a `Pong`, as tungstenite handles this automatically.
// See [https://github.com/snapview/tokio-tungstenite/issues/88].
}
Item::Message(ClientMessage::Pong(_message)) => {
log::trace!("Received heartbeat from client {}", client.id);
Expand Down
93 changes: 83 additions & 10 deletions crates/sdk/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
//!
//! This module is internal, and may incompatibly change without warning.

use std::mem;
use std::sync::Arc;
use std::time::Duration;

use bytes::Bytes;
use futures::{SinkExt, StreamExt as _, TryStreamExt};
Expand All @@ -16,6 +18,7 @@ use spacetimedb_client_api_messages::websocket::{ClientMessage, ServerMessage};
use spacetimedb_lib::{bsatn, ConnectionId};
use thiserror::Error;
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tokio::{net::TcpStream, runtime};
use tokio_tungstenite::{
connect_async_with_config,
Expand Down Expand Up @@ -287,18 +290,57 @@ impl WsConnection {
.observe(msg_size as f64);
};

// There is a small but plausible chance that a client's socket will not
// be notified that the remote end has closed the connection, e.g.
// because of the remote machine being power cycled, or middleboxes
// misbehaving.
//
// Unless the client uses dynamic subscriptions, it will only ever try
// to read from the socket, and thus not notice the connection closure.
//
// For certain types of clients it is crucial to eventually time out
// such connections, and attempt to reconnect. We don't, however, want
// to flood the server with `Ping` frames unnecessarily.
//
// Instead, we:
//
// * Check every `IDLE_TIMEOUT` whether some data has arrived.
//
// - If not, send a `Ping` frame.
//
// * Check after another `IDLE_TIMEOUT` whether data has arrived.
//
// - If not, and we were expecting a `Pong` response, consider the
// connection bad and exit the loop, thereby closing the socket.
//
// Note that the server also initiates `Ping`s, currently at `2 * IDLE_TIMEOUT`.
// If both ends cannot communicate, we assume the server has already
// timed out the client, and so don't bother sending a `Close` frame.
const IDLE_TIMEOUT: Duration = Duration::from_secs(30);
let mut idle_timeout_interval = tokio::time::interval_at(Instant::now() + IDLE_TIMEOUT, IDLE_TIMEOUT);

let mut idle = true;
let mut want_pong = false;

let mut outgoing_messages = Some(outgoing_messages);
loop {
tokio::select! {
incoming = self.sock.try_next() => match incoming {
Err(tokio_tungstenite::tungstenite::error::Error::ConnectionClosed) | Ok(None) => break,
Err(tokio_tungstenite::tungstenite::error::Error::ConnectionClosed) | Ok(None) => {
log::info!("Connection closed");
break;
},

Err(e) => Self::maybe_log_error::<(), _>(
"Error reading message from read WebSocket stream",
Err(e),
),
Err(e) => {
Self::maybe_log_error::<(), _>(
"Error reading message from read WebSocket stream",
Err(e),
);
break;
},

Ok(Some(WebSocketMessage::Binary(bytes))) => {
idle = false;
record_metrics(bytes.len());
match Self::parse_response(&bytes) {
Err(e) => Self::maybe_log_error::<(), _>(
Expand All @@ -313,23 +355,54 @@ impl WsConnection {
}

Ok(Some(WebSocketMessage::Ping(payload))) => {
log::trace!("received ping");
idle = false;
record_metrics(payload.len());
}
// No need to explicitly respond with a `Pong`,
// as tungstenite handles this automatically.
// See [https://github.com/snapview/tokio-tungstenite/issues/88].
},

Ok(Some(WebSocketMessage::Pong(payload))) => {
log::trace!("received pong");
idle = false;
want_pong = false;
record_metrics(payload.len());
},

Ok(Some(other)) => {
log::warn!("Unexpected WebSocket message {:?}", other);
idle = false;
record_metrics(other.len());
},
},

_ = idle_timeout_interval.tick() => {
if mem::replace(&mut idle, true) {
if want_pong {
// Nothing received while we were waiting for a pong.
log::warn!("Connection timed out");
break;
}

log::trace!("sending client ping");
let ping = WebSocketMessage::Ping(vec![]);
if let Err(e) = self.sock.send(ping).await {
log::warn!("Error sending ping: {e:?}");
break;
}
want_pong = true;
}
},

// this is stupid. we want to handle the channel close *once*, and then disable this branch
Some(outgoing) = async { Some(outgoing_messages.as_mut()?.next().await) } => match outgoing {
Some(outgoing) => {
let msg = Self::encode_message(outgoing);
Self::maybe_log_error(
"Error sending outgoing message",
self.sock.send(msg).await,
);
if let Err(e) = self.sock.send(msg).await {
log::warn!("Error sending outgoing message: {e:?}");
break;
}
}
None => {
Self::maybe_log_error("Error sending close frame", SinkExt::close(&mut self.sock).await);
Expand Down
Loading