Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 68 additions & 20 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use bytes::Bytes;
use bytestring::ByteString;
use derive_more::From;
use futures::prelude::*;
use prometheus::{Histogram, IntCounter};
use spacetimedb_client_api_messages::websocket::{
BsatnFormat, CallReducerFlags, Compression, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, Unsubscribe,
UnsubscribeMulti, WebsocketFormat,
Expand All @@ -32,6 +33,13 @@ pub enum Protocol {
}

impl Protocol {
pub fn as_str(self) -> &'static str {
match self {
Protocol::Text => "text",
Protocol::Binary => "binary",
}
}

pub(crate) fn assert_matches_format_switch<B, J>(self, fs: &FormatSwitch<B, J>) {
match (self, fs) {
(Protocol::Text, FormatSwitch::Json(_)) | (Protocol::Binary, FormatSwitch::Bsatn(_)) => {}
Expand Down Expand Up @@ -69,6 +77,30 @@ pub struct ClientConnectionSender {
sendtx: mpsc::Sender<SerializableMessage>,
abort_handle: AbortHandle,
cancelled: AtomicBool,
pub(crate) metrics: ClientConnectionMetrics,
}

#[derive(Debug)]
pub struct ClientConnectionMetrics {
pub websocket_request_msg_size: Histogram,
pub websocket_requests: IntCounter,
}

impl ClientConnectionMetrics {
fn new(replica_id: u64, protocol: Protocol) -> Self {
let message_kind = protocol.as_str();
let websocket_request_msg_size = WORKER_METRICS
.websocket_request_msg_size
.with_label_values(&replica_id, message_kind);
let websocket_requests = WORKER_METRICS
.websocket_requests
.with_label_values(&replica_id, message_kind);

Self {
websocket_request_msg_size,
websocket_requests,
}
}
}

#[derive(Debug, thiserror::Error)]
Expand All @@ -80,23 +112,35 @@ pub enum ClientSendError {
}

impl ClientConnectionSender {
fn new(
replica_id: u64,
id: ClientActorId,
config: ClientConfig,
sendtx: mpsc::Sender<SerializableMessage>,
abort_handle: AbortHandle,
cancelled: AtomicBool,
) -> Self {
let metrics = ClientConnectionMetrics::new(replica_id, config.protocol);
Self {
id,
config,
sendtx,
abort_handle,
cancelled,
metrics,
}
}

pub fn dummy_with_channel(id: ClientActorId, config: ClientConfig) -> (Self, mpsc::Receiver<SerializableMessage>) {
let (sendtx, rx) = mpsc::channel(1);
// just make something up, it doesn't need to be attached to a real task
let abort_handle = match tokio::runtime::Handle::try_current() {
Ok(h) => h.spawn(async {}).abort_handle(),
Err(_) => tokio::runtime::Runtime::new().unwrap().spawn(async {}).abort_handle(),
};
(
Self {
id,
config,
sendtx,
abort_handle,
cancelled: AtomicBool::new(false),
},
rx,
)
let cancelled = AtomicBool::new(false);
let sender = Self::new(0, id, config, sendtx, abort_handle, cancelled);
(sender, rx)
}

pub fn dummy(id: ClientActorId, config: ClientConfig) -> Self {
Expand Down Expand Up @@ -201,28 +245,28 @@ impl ClientConnection {

let (sendtx, sendrx) = mpsc::channel::<SerializableMessage>(CLIENT_CHANNEL_CAPACITY);

let db = module.info().database_identity;

let (fut_tx, fut_rx) = oneshot::channel::<Fut>();
// weird dance so that we can get an abort_handle into ClientConnection
let module_info = module.info.clone();
let abort_handle = tokio::spawn(async move {
let Ok(fut) = fut_rx.await else { return };

let _gauge_guard = WORKER_METRICS.connected_clients.with_label_values(&db).inc_scope();
WORKER_METRICS.ws_clients_spawned.with_label_values(&db).inc();
scopeguard::defer!(WORKER_METRICS.ws_clients_aborted.with_label_values(&db).inc());
let _gauge_guard = module_info.metrics.connected_clients.inc_scope();
module_info.metrics.ws_clients_spawned.inc();
scopeguard::defer!(module_info.metrics.ws_clients_aborted.inc());

fut.await
})
.abort_handle();

let sender = Arc::new(ClientConnectionSender {
let sender = Arc::new(ClientConnectionSender::new(
replica_id,
id,
config,
sendtx,
abort_handle,
cancelled: AtomicBool::new(false),
});
AtomicBool::new(false),
));
let this = Self {
sender,
replica_id,
Expand Down Expand Up @@ -303,7 +347,11 @@ impl ClientConnection {
.await
}

pub async fn subscribe_single(&self, subscription: SubscribeSingle, timer: Instant) -> Result<(), DBError> {
pub async fn subscribe_single(
&self,
subscription: SubscribeSingle,
timer: Instant,
) -> Result<Option<ExecutionMetrics>, DBError> {
let me = self.clone();
asyncify(move || {
me.module
Expand All @@ -313,7 +361,7 @@ impl ClientConnection {
.await
}

pub async fn unsubscribe(&self, request: Unsubscribe, timer: Instant) -> Result<(), DBError> {
pub async fn unsubscribe(&self, request: Unsubscribe, timer: Instant) -> Result<Option<ExecutionMetrics>, DBError> {
let me = self.clone();
asyncify(move || {
me.module
Expand Down
91 changes: 34 additions & 57 deletions crates/core/src/client/message_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::host::module_host::{EventStatus, ModuleEvent, ModuleFunctionCall};
use crate::host::{ReducerArgs, ReducerId};
use crate::identity::Identity;
use crate::messages::websocket::{CallReducer, ClientMessage, OneOffQuery};
use crate::subscription::record_exec_metrics;
use crate::worker_metrics::WORKER_METRICS;
use spacetimedb_lib::de::serde::DeserializeWrapper;
use spacetimedb_lib::identity::RequestId;
Expand All @@ -29,20 +28,8 @@ pub enum MessageHandleError {
}

pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Instant) -> Result<(), MessageHandleError> {
let message_kind = match message {
DataMessage::Text(_) => "text",
DataMessage::Binary(_) => "binary",
};

WORKER_METRICS
.websocket_request_msg_size
.with_label_values(&client.replica_id, message_kind)
.observe(message.len() as f64);

WORKER_METRICS
.websocket_requests
.with_label_values(&client.replica_id, message_kind)
.inc();
client.metrics.websocket_request_msg_size.observe(message.len() as f64);
client.metrics.websocket_requests.inc();

let message = match message {
DataMessage::Text(text) => {
Expand All @@ -60,7 +47,20 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
.map_args(|b| ReducerArgs::Bsatn(message_buf.slice_ref(b))),
};

let database_identity = client.module.info().database_identity;
let mod_info = client.module.info();
let mod_metrics = &mod_info.metrics;
let database_identity = mod_info.database_identity;
let db = &client.module.replica_ctx().relational_db;
let record_metrics = |wl| {
move |metrics| {
if let Some(metrics) = metrics {
db.exec_counters_for(wl).record(&metrics);
}
}
};
let sub_metrics = record_metrics(WorkloadType::Subscribe);
let unsub_metrics = record_metrics(WorkloadType::Unsubscribe);

let res = match message {
ClientMessage::CallReducer(CallReducer {
ref reducer,
Expand All @@ -76,65 +76,43 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
res.map(drop).map_err(|e| {
(
Some(reducer),
client
.module
.info()
.module_def
.reducer_full(&**reducer)
.map(|(id, _)| id),
mod_info.module_def.reducer_full(&**reducer).map(|(id, _)| id),
e.into(),
)
})
}
ClientMessage::SubscribeMulti(subscription) => {
let res = client.subscribe_multi(subscription, timer).await.map(|metrics| {
if let Some(metrics) = metrics {
record_exec_metrics(&WorkloadType::Subscribe, &database_identity, metrics)
}
});

WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Subscribe, &database_identity, "")
let res = client.subscribe_multi(subscription, timer).await.map(sub_metrics);
mod_metrics
.request_round_trip_subscribe
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, None, e.into()))
}
ClientMessage::UnsubscribeMulti(request) => {
let res = client.unsubscribe_multi(request, timer).await.map(|metrics| {
if let Some(metrics) = metrics {
record_exec_metrics(&WorkloadType::Unsubscribe, &database_identity, metrics)
}
});
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Unsubscribe, &database_identity, "")
let res = client.unsubscribe_multi(request, timer).await.map(unsub_metrics);
mod_metrics
.request_round_trip_unsubscribe
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, None, e.into()))
}
ClientMessage::SubscribeSingle(subscription) => {
let res = client.subscribe_single(subscription, timer).await;
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Subscribe, &database_identity, "")
let res = client.subscribe_single(subscription, timer).await.map(sub_metrics);
mod_metrics
.request_round_trip_subscribe
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, None, e.into()))
}
ClientMessage::Unsubscribe(request) => {
let res = client.unsubscribe(request, timer).await;
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Unsubscribe, &database_identity, "")
let res = client.unsubscribe(request, timer).await.map(unsub_metrics);
mod_metrics
.request_round_trip_unsubscribe
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, None, e.into()))
}
ClientMessage::Subscribe(subscription) => {
let res = client
.subscribe(subscription, timer)
.await
.map(|metrics| record_exec_metrics(&WorkloadType::Subscribe, &database_identity, metrics));
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Subscribe, &database_identity, "")
let res = client.subscribe(subscription, timer).await.map(Some).map(sub_metrics);
mod_metrics
.request_round_trip_subscribe
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, None, e.into()))
}
Expand All @@ -146,9 +124,8 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
Protocol::Binary => client.one_off_query_bsatn(&query, &message_id, timer),
Protocol::Text => client.one_off_query_json(&query, &message_id, timer),
};
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Sql, &database_identity, "")
mod_metrics
.request_round_trip_sql
.observe(timer.elapsed().as_secs_f64());
res.map_err(|err| (None, None, err))
}
Expand Down
Loading
Loading