Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 107 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ thin-vec = "0.2.13"
thiserror = "1.0.37"
tokio = { version = "1.37", features = ["full"] }
tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] }
tokio-tungstenite = { version = "0.21", features = ["native-tls"] }
tokio-tungstenite = { version = "0.26.2", features = ["native-tls"] }
tokio-util = { version = "0.7.4", features = ["time"] }
toml = "0.8"
toml_edit = "0.22.22"
Expand Down
34 changes: 22 additions & 12 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use axum::extract::{Path, Query, State};
use axum::response::IntoResponse;
use axum::Extension;
use axum_extra::TypedHeader;
use bytes::Bytes;
use bytestring::ByteString;
use futures::future::MaybeDone;
use futures::{Future, FutureExt, SinkExt, StreamExt};
use http::{HeaderValue, StatusCode};
Expand All @@ -22,6 +24,7 @@ use spacetimedb_client_api_messages::websocket::{self as ws_api, Compression};
use spacetimedb_lib::connection_id::{ConnectionId, ConnectionIdForUrl};
use std::time::Instant;
use tokio::sync::mpsc;
use tokio_tungstenite::tungstenite::Utf8Bytes;

use crate::auth::SpacetimeAuth;
use crate::util::websocket::{
Expand Down Expand Up @@ -122,12 +125,10 @@ where
name: ctx.client_actor_index().next_client_name(),
};

let ws_config = WebSocketConfig {
max_message_size: Some(0x2000000),
max_frame_size: None,
accept_unmasked_frames: false,
..Default::default()
};
let ws_config = WebSocketConfig::default()
.max_message_size(Some(0x2000000))
.max_frame_size(None)
.accept_unmasked_frames(false);

tokio::spawn(async move {
let ws = match ws_upgrade.upgrade(ws_config).await {
Expand Down Expand Up @@ -340,7 +341,7 @@ async fn ws_client_actor_inner(
if mem::take(&mut got_pong) {
// Send a ping message while continuing to poll the `handle_queue`,
// to avoid deadlocks or delays due to enqueued futures holding resources.
if let Err(e) = also_poll(ws.send(WsMessage::Ping(Vec::new())), make_progress(&mut current_message)).await {
if let Err(e) = also_poll(ws.send(WsMessage::Ping(Bytes::new())), make_progress(&mut current_message)).await {
log::warn!("error sending ping: {e:#}");
}
continue;
Expand Down Expand Up @@ -416,14 +417,14 @@ async fn ws_client_actor_inner(

enum ClientMessage {
Message(DataMessage),
Ping(Vec<u8>),
Pong(Vec<u8>),
Close(Option<CloseFrame<'static>>),
Ping(Bytes),
Pong(Bytes),
Close(Option<CloseFrame>),
}
impl ClientMessage {
fn from_message(msg: WsMessage) -> Self {
match msg {
WsMessage::Text(s) => Self::Message(DataMessage::Text(s)),
WsMessage::Text(s) => Self::Message(DataMessage::Text(utf8bytes_to_bytestring(s))),
WsMessage::Binary(b) => Self::Message(DataMessage::Binary(b)),
WsMessage::Ping(b) => Self::Ping(b),
WsMessage::Pong(b) => Self::Pong(b),
Expand All @@ -436,7 +437,16 @@ impl ClientMessage {

fn datamsg_to_wsmsg(msg: DataMessage) -> WsMessage {
match msg {
DataMessage::Text(text) => WsMessage::Text(text),
DataMessage::Text(text) => WsMessage::Text(bytestring_to_utf8bytes(text)),
DataMessage::Binary(bin) => WsMessage::Binary(bin),
}
}

fn utf8bytes_to_bytestring(s: Utf8Bytes) -> ByteString {
// SAFETY: `Utf8Bytes` and `ByteString` have the same invariant of UTF-8 validity
unsafe { ByteString::from_bytes_unchecked(Bytes::from(s)) }
}
fn bytestring_to_utf8bytes(s: ByteString) -> Utf8Bytes {
// SAFETY: `Utf8Bytes` and `ByteString` have the same invariant of UTF-8 validity
unsafe { Utf8Bytes::from_bytes_unchecked(s.into_bytes()) }
}
Loading
Loading