Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use spacetimedb_client_api_messages::websocket::{
UnsubscribeMulti, WebsocketFormat,
};
use spacetimedb_lib::identity::RequestId;
use spacetimedb_lib::metrics::ExecutionMetrics;
use tokio::sync::{mpsc, oneshot, watch};
use tokio::task::AbortHandle;

Expand Down Expand Up @@ -321,7 +322,11 @@ impl ClientConnection {
.unwrap() // TODO: is unwrapping right here?
}

pub async fn subscribe_multi(&self, request: SubscribeMulti, timer: Instant) -> Result<(), DBError> {
pub async fn subscribe_multi(
&self,
request: SubscribeMulti,
timer: Instant,
) -> Result<Option<ExecutionMetrics>, DBError> {
let me = self.clone();
tokio::task::spawn_blocking(move || {
me.module
Expand All @@ -332,7 +337,11 @@ impl ClientConnection {
.unwrap() // TODO: is unwrapping right here?
}

pub async fn unsubscribe_multi(&self, request: UnsubscribeMulti, timer: Instant) -> Result<(), DBError> {
pub async fn unsubscribe_multi(
&self,
request: UnsubscribeMulti,
timer: Instant,
) -> Result<Option<ExecutionMetrics>, DBError> {
let me = self.clone();
tokio::task::spawn_blocking(move || {
me.module
Expand All @@ -343,7 +352,7 @@ impl ClientConnection {
.unwrap() // TODO: is unwrapping right here?
}

pub async fn subscribe(&self, subscription: Subscribe, timer: Instant) -> Result<(), DBError> {
pub async fn subscribe(&self, subscription: Subscribe, timer: Instant) -> Result<ExecutionMetrics, DBError> {
let me = self.clone();
tokio::task::spawn_blocking(move || {
me.module
Expand Down
19 changes: 16 additions & 3 deletions crates/core/src/client/message_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::host::module_host::{EventStatus, ModuleEvent, ModuleFunctionCall};
use crate::host::{ReducerArgs, ReducerId};
use crate::identity::Identity;
use crate::messages::websocket::{CallReducer, ClientMessage, OneOffQuery};
use crate::subscription::record_exec_metrics;
use crate::worker_metrics::WORKER_METRICS;
use spacetimedb_lib::de::serde::DeserializeWrapper;
use spacetimedb_lib::identity::RequestId;
Expand Down Expand Up @@ -86,15 +87,24 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
})
}
ClientMessage::SubscribeMulti(subscription) => {
let res = client.subscribe_multi(subscription, timer).await;
let res = client.subscribe_multi(subscription, timer).await.map(|metrics| {
if let Some(metrics) = metrics {
record_exec_metrics(&WorkloadType::Subscribe, &database_identity, metrics)
}
});

WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Subscribe, &database_identity, "")
.observe(timer.elapsed().as_secs_f64());
res.map_err(|e| (None, None, e.into()))
}
ClientMessage::UnsubscribeMulti(request) => {
let res = client.unsubscribe_multi(request, timer).await;
let res = client.unsubscribe_multi(request, timer).await.map(|metrics| {
if let Some(metrics) = metrics {
record_exec_metrics(&WorkloadType::Unsubscribe, &database_identity, metrics)
}
});
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Unsubscribe, &database_identity, "")
Expand All @@ -118,7 +128,10 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
res.map_err(|e| (None, None, e.into()))
}
ClientMessage::Subscribe(subscription) => {
let res = client.subscribe(subscription, timer).await;
let res = client
.subscribe(subscription, timer)
.await
.map(|metrics| record_exec_metrics(&WorkloadType::Subscribe, &database_identity, metrics));
WORKER_METRICS
.request_round_trip
.with_label_values(&WorkloadType::Subscribe, &database_identity, "")
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ where
plans
.par_iter()
.flat_map_iter(|plan| plan.plans_fragments().map(|fragment| (plan.sql(), fragment)))
.filter(|(_, plan)| {
// Since subscriptions only support selects and inner joins,
// we filter out any plans that read from an empty table.
plan.table_ids()
.all(|table_id| tx.table(table_id).is_some_and(|t| t.row_count > 0))
})
.map(|(sql, plan)| (sql, plan, plan.subscribed_table_id(), plan.subscribed_table_name()))
.map(|(sql, plan, table_id, table_name)| {
plan.physical_plan()
Expand Down
139 changes: 83 additions & 56 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,13 @@ type FullSubscriptionUpdate = FormatSwitch<ws::DatabaseUpdate<BsatnFormat>, ws::

/// A utility for sending an error message to a client and returning early
macro_rules! return_on_err {
($expr:expr, $handler:expr) => {
($expr:expr, $handler:expr, $metrics:expr) => {
match $expr {
Ok(val) => val,
Err(e) => {
// TODO: Handle errors sending messages.
let _ = $handler(e.to_string().into());
return Ok(());
return Ok($metrics);
}
}
};
Expand Down Expand Up @@ -398,7 +398,7 @@ impl ModuleSubscriptions {
sender: Arc<ClientConnectionSender>,
request: UnsubscribeMulti,
timer: Instant,
) -> Result<(), DBError> {
) -> Result<Option<ExecutionMetrics>, DBError> {
// Send an error message to the client
let send_err_msg = |message| {
sender.send_message(SubscriptionMessage {
Expand All @@ -420,38 +420,23 @@ impl ModuleSubscriptions {
let removed_queries = {
let mut subscriptions = self.subscriptions.write();

match subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id) {
Ok(queries) => queries,
Err(error) => {
// Apparently we ignore errors sending messages.
let _ = send_err_msg(error.to_string().into());
return Ok(());
}
}
};

let auth = AuthCtx::new(self.owner_identity, sender.id.identity);
let eval_result = self.evaluate_queries(
sender.clone(),
&removed_queries,
&tx,
&auth,
TableUpdateType::Unsubscribe,
);
// If execution error, send to client
let (update, metrics) = match eval_result {
Ok(ok) => ok,
Err(e) => {
// Apparently we ignore errors sending messages.
let _ = send_err_msg(e.to_string().into());
return Ok(());
}
return_on_err!(
subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id),
send_err_msg,
None
)
};

record_exec_metrics(
&WorkloadType::Unsubscribe,
&self.relational_db.database_identity(),
metrics,
let (update, metrics) = return_on_err!(
self.evaluate_queries(
sender.clone(),
&removed_queries,
&tx,
&AuthCtx::new(self.owner_identity, sender.id.identity),
TableUpdateType::Unsubscribe,
),
send_err_msg,
None
);

let _ = sender.send_message(SubscriptionMessage {
Expand All @@ -460,7 +445,8 @@ impl ModuleSubscriptions {
timer: Some(timer),
result: SubscriptionResult::UnsubscribeMulti(SubscriptionData { data: update }),
});
Ok(())

Ok(Some(metrics))
}

/// Compiles the queries in a [Subscribe] or [SubscribeMulti] message.
Expand Down Expand Up @@ -538,7 +524,7 @@ impl ModuleSubscriptions {
request: SubscribeMulti,
timer: Instant,
_assert: Option<AssertTxFn>,
) -> Result<(), DBError> {
) -> Result<Option<ExecutionMetrics>, DBError> {
// Send an error message to the client
let send_err_msg = |message| {
let _ = sender.send_message(SubscriptionMessage {
Expand All @@ -555,7 +541,8 @@ impl ModuleSubscriptions {
let num_queries = request.query_strings.len();
let (queries, auth, tx) = return_on_err!(
self.compile_queries(sender.id.identity, request.query_strings, num_queries),
send_err_msg
send_err_msg,
None
);
let tx = scopeguard::guard(tx, |tx| {
self.relational_db.release_tx(tx);
Expand All @@ -578,15 +565,9 @@ impl ModuleSubscriptions {
let mut subscriptions = self.subscriptions.write();
subscriptions.remove_subscription((sender.id.identity, sender.id.connection_id), request.query_id)?;
send_err_msg("Internal error evaluating queries".into());
return Ok(());
return Ok(None);
};

record_exec_metrics(
&WorkloadType::Subscribe,
&self.relational_db.database_identity(),
metrics,
);

#[cfg(test)]
if let Some(assert) = _assert {
assert(&tx);
Expand All @@ -602,7 +583,8 @@ impl ModuleSubscriptions {
timer: Some(timer),
result: SubscriptionResult::SubscribeMulti(SubscriptionData { data: update }),
});
Ok(())

Ok(Some(metrics))
}

/// Add a subscriber to the module. NOTE: this function is blocking.
Expand All @@ -614,7 +596,7 @@ impl ModuleSubscriptions {
subscription: Subscribe,
timer: Instant,
_assert: Option<AssertTxFn>,
) -> Result<(), DBError> {
) -> Result<ExecutionMetrics, DBError> {
let num_queries = subscription.query_strings.len();
let (queries, auth, tx) = self.compile_queries(sender.id.identity, subscription.query_strings, num_queries)?;
let tx = scopeguard::guard(tx, |tx| {
Expand All @@ -641,12 +623,6 @@ impl ModuleSubscriptions {
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?,
};

record_exec_metrics(
&WorkloadType::Subscribe,
&self.relational_db.database_identity(),
metrics,
);

// It acquires the subscription lock after `eval`, allowing `add_subscription` to run concurrently.
// This also makes it possible for `broadcast_event` to get scheduled before the subsequent part here
// but that should not pose an issue.
Expand All @@ -667,7 +643,8 @@ impl ModuleSubscriptions {
request_id: Some(subscription.request_id),
timer: Some(timer),
});
Ok(())

Ok(metrics)
}

pub fn remove_subscriber(&self, client_id: ClientActorId) {
Expand Down Expand Up @@ -764,6 +741,7 @@ mod tests {
use hashbrown::HashMap;
use itertools::Itertools;
use parking_lot::RwLock;
use pretty_assertions::assert_matches;
use spacetimedb_client_api_messages::energy::EnergyQuanta;
use spacetimedb_client_api_messages::websocket::{
CompressableQueryUpdate, Compression, FormatSwitch, QueryId, Subscribe, SubscribeMulti, SubscribeSingle,
Expand Down Expand Up @@ -794,7 +772,8 @@ mod tests {
query_strings: [sql.into()].into(),
request_id: 0,
};
module_subscriptions.add_legacy_subscriber(sender, subscribe, Instant::now(), assert)
module_subscriptions.add_legacy_subscriber(sender, subscribe, Instant::now(), assert)?;
Ok(())
}

/// An in-memory `RelationalDB` for testing
Expand Down Expand Up @@ -945,10 +924,12 @@ mod tests {
queries: &[&'static str],
sender: Arc<ClientConnectionSender>,
counter: &mut u32,
) -> anyhow::Result<()> {
) -> anyhow::Result<ExecutionMetrics> {
*counter += 1;
subs.add_multi_subscription(sender, multi_subscribe(queries, *counter), Instant::now(), None)?;
Ok(())
let metrics = subs
.add_multi_subscription(sender, multi_subscribe(queries, *counter), Instant::now(), None)
.map(|metrics| metrics.unwrap_or_default())?;
Ok(metrics)
}

/// Unsubscribe from a single query
Expand Down Expand Up @@ -1237,6 +1218,8 @@ mod tests {
})
})?;

commit_tx(&db, &subs, [], [(table_id, product![0_u8])])?;

let mut query_id = 0;

// Subscribe to `t`
Expand Down Expand Up @@ -1858,6 +1841,50 @@ mod tests {
Ok(())
}

/// Test that we do not evaluate queries that return trivially empty results
#[tokio::test]
async fn test_query_pruning_for_empty_tables() -> anyhow::Result<()> {
// Establish a client connection
let (tx, mut rx) = client_connection(client_id_from_u8(1));

let db = relational_db()?;
let subs = module_subscriptions(db.clone());

let schema = &[("id", AlgebraicType::U64), ("a", AlgebraicType::U64)];
let indices = &[0.into()];
// Create tables `t` and `s` with `(i: u64, a: u64)`.
db.create_table_for_test("t", schema, indices)?;
let s_id = db.create_table_for_test("s", schema, indices)?;

// Insert one row into `s`, but leave `t` empty.
commit_tx(&db, &subs, [], [(s_id, product![0u64, 0u64])])?;

// Subscribe to queries that return empty results
let metrics = subscribe_multi(
&subs,
&[
"select t.* from t where a = 0",
"select t.* from t join s on t.id = s.id where s.a = 0",
"select s.* from t join s on t.id = s.id where t.a = 0",
],
tx,
&mut 0,
)?;

assert_matches!(
rx.recv().await,
Some(SerializableMessage::Subscription(SubscriptionMessage {
result: SubscriptionResult::SubscribeMulti(_),
..
}))
);

assert_eq!(metrics.rows_scanned, 0);
assert_eq!(metrics.index_seeks, 0);

Ok(())
}

/// Asserts that a subscription holds a tx handle for the entire length of its evaluation.
#[test]
fn test_tx_subscription_ordering() -> ResultTest<()> {
Expand Down
Loading