@@ -7,6 +7,8 @@ use axum::extract::{Path, Query, State};
77use axum:: response:: IntoResponse ;
88use axum:: Extension ;
99use axum_extra:: TypedHeader ;
10+ use bytes:: Bytes ;
11+ use bytestring:: ByteString ;
1012use futures:: future:: MaybeDone ;
1113use futures:: { Future , FutureExt , SinkExt , StreamExt } ;
1214use http:: { HeaderValue , StatusCode } ;
@@ -22,6 +24,7 @@ use spacetimedb_client_api_messages::websocket::{self as ws_api, Compression};
2224use spacetimedb_lib:: connection_id:: { ConnectionId , ConnectionIdForUrl } ;
2325use std:: time:: Instant ;
2426use tokio:: sync:: mpsc;
27+ use tokio_tungstenite:: tungstenite:: Utf8Bytes ;
2528
2629use crate :: auth:: SpacetimeAuth ;
2730use crate :: util:: websocket:: {
@@ -122,12 +125,10 @@ where
122125 name : ctx. client_actor_index ( ) . next_client_name ( ) ,
123126 } ;
124127
125- let ws_config = WebSocketConfig {
126- max_message_size : Some ( 0x2000000 ) ,
127- max_frame_size : None ,
128- accept_unmasked_frames : false ,
129- ..Default :: default ( )
130- } ;
128+ let ws_config = WebSocketConfig :: default ( )
129+ . max_message_size ( Some ( 0x2000000 ) )
130+ . max_frame_size ( None )
131+ . accept_unmasked_frames ( false ) ;
131132
132133 tokio:: spawn ( async move {
133134 let ws = match ws_upgrade. upgrade ( ws_config) . await {
@@ -340,7 +341,7 @@ async fn ws_client_actor_inner(
340341 if mem:: take( & mut got_pong) {
341342 // Send a ping message while continuing to poll the `handle_queue`,
342343 // to avoid deadlocks or delays due to enqueued futures holding resources.
343- if let Err ( e) = also_poll( ws. send( WsMessage :: Ping ( Vec :: new( ) ) ) , make_progress( & mut current_message) ) . await {
344+ if let Err ( e) = also_poll( ws. send( WsMessage :: Ping ( Bytes :: new( ) ) ) , make_progress( & mut current_message) ) . await {
344345 log:: warn!( "error sending ping: {e:#}" ) ;
345346 }
346347 continue ;
@@ -416,14 +417,14 @@ async fn ws_client_actor_inner(
416417
417418enum ClientMessage {
418419 Message ( DataMessage ) ,
419- Ping ( Vec < u8 > ) ,
420- Pong ( Vec < u8 > ) ,
421- Close ( Option < CloseFrame < ' static > > ) ,
420+ Ping ( Bytes ) ,
421+ Pong ( Bytes ) ,
422+ Close ( Option < CloseFrame > ) ,
422423}
423424impl ClientMessage {
424425 fn from_message ( msg : WsMessage ) -> Self {
425426 match msg {
426- WsMessage :: Text ( s) => Self :: Message ( DataMessage :: Text ( s ) ) ,
427+ WsMessage :: Text ( s) => Self :: Message ( DataMessage :: Text ( utf8bytes_to_bytestring ( s ) ) ) ,
427428 WsMessage :: Binary ( b) => Self :: Message ( DataMessage :: Binary ( b) ) ,
428429 WsMessage :: Ping ( b) => Self :: Ping ( b) ,
429430 WsMessage :: Pong ( b) => Self :: Pong ( b) ,
@@ -436,7 +437,16 @@ impl ClientMessage {
436437
437438fn datamsg_to_wsmsg ( msg : DataMessage ) -> WsMessage {
438439 match msg {
439- DataMessage :: Text ( text) => WsMessage :: Text ( text) ,
440+ DataMessage :: Text ( text) => WsMessage :: Text ( bytestring_to_utf8bytes ( text) ) ,
440441 DataMessage :: Binary ( bin) => WsMessage :: Binary ( bin) ,
441442 }
442443}
444+
445+ fn utf8bytes_to_bytestring ( s : Utf8Bytes ) -> ByteString {
446+ // SAFETY: `Utf8Bytes` and `ByteString` have the same invariant of UTF-8 validity
447+ unsafe { ByteString :: from_bytes_unchecked ( Bytes :: from ( s) ) }
448+ }
449+ fn bytestring_to_utf8bytes ( s : ByteString ) -> Utf8Bytes {
450+ // SAFETY: `Utf8Bytes` and `ByteString` have the same invariant of UTF-8 validity
451+ unsafe { Utf8Bytes :: from_bytes_unchecked ( s. into_bytes ( ) ) }
452+ }
0 commit comments