Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion crates/bench/benches/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ fn eval(c: &mut Criterion) {
&plans,
table_id,
table_name.clone(),
Compression::None,
&tx,
TableUpdateType::Subscribe,
)))
Expand Down
8 changes: 6 additions & 2 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,17 @@ impl UpdatesRelValue<'_> {
!(self.deletes.is_empty() && self.inserts.is_empty())
}

pub fn encode<F: WebsocketFormat>(&self, compression: Compression) -> (F::QueryUpdate, u64, usize) {
pub fn encode<F: WebsocketFormat>(&self) -> (F::QueryUpdate, u64, usize) {
let (deletes, nr_del) = F::encode_list(self.deletes.iter());
let (inserts, nr_ins) = F::encode_list(self.inserts.iter());
let num_rows = nr_del + nr_ins;
let num_bytes = deletes.num_bytes() + inserts.num_bytes();
let qu = QueryUpdate { deletes, inserts };
let cqu = F::into_query_update(qu, compression);
// We don't compress individual table updates.
// Previously we were, but the benefits, if any, were unclear.
// Note, each message is still compressed before being sent to clients,
// but we no longer have to hold a tx lock when doing so.
let cqu = F::into_query_update(qu, Compression::None);
(cqu, num_rows, num_bytes)
}
}
Expand Down
9 changes: 5 additions & 4 deletions crates/core/src/subscription/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ pub fn collect_table_update<Tx, F>(
plan_fragments: &[PipelinedProject],
table_id: TableId,
table_name: Box<str>,
comp: Compression,
tx: &Tx,
update_type: TableUpdateType,
) -> Result<(TableUpdate<F>, ExecutionMetrics)>
Expand All @@ -135,15 +134,17 @@ where
inserts: empty,
},
};
let update = F::into_query_update(qu, comp);
// We will compress the outer server message,
// after we release the tx lock.
// There's no need to compress the inner table update too.
let update = F::into_query_update(qu, Compression::None);
(TableUpdate::new(table_id, table_name, (update, num_rows)), metrics)
})
}

/// Execute a collection of subscription queries in parallel
pub fn execute_plans<Tx, F>(
plans: &[Arc<Plan>],
comp: Compression,
tx: &Tx,
update_type: TableUpdateType,
) -> Result<(DatabaseUpdate<F>, ExecutionMetrics), DBError>
Expand All @@ -160,7 +161,7 @@ where
.clone()
.optimize()
.map(|plan| (sql, PipelinedProject::from(plan)))
.and_then(|(_, plan)| collect_table_update(&[plan], table_id, table_name.into(), comp, tx, update_type))
.and_then(|(_, plan)| collect_table_update(&[plan], table_id, table_name.into(), tx, update_type))
.map_err(|err| DBError::WithSql {
sql: sql.into(),
error: Box::new(DBError::Other(err)),
Expand Down
134 changes: 65 additions & 69 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::worker_metrics::WORKER_METRICS;
use parking_lot::RwLock;
use prometheus::IntGauge;
use spacetimedb_client_api_messages::websocket::{
self as ws, BsatnFormat, Compression, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, TableUpdate,
Unsubscribe, UnsubscribeMulti,
self as ws, BsatnFormat, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, TableUpdate, Unsubscribe,
UnsubscribeMulti,
};
use spacetimedb_execution::pipelined::PipelinedProject;
use spacetimedb_expr::check::parse_and_type_sub;
Expand Down Expand Up @@ -186,34 +186,10 @@ impl ModuleSubscriptions {
let tx = DeltaTx::from(tx);

Ok(match sender.config.protocol {
Protocol::Binary => {
collect_table_update(
&plans,
table_id,
table_name.into(),
// We will compress the outer server message,
// after we release the tx lock.
// There's no need to compress the inner table update too.
Compression::None,
&tx,
update_type,
)
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))
}
Protocol::Text => {
collect_table_update(
&plans,
table_id,
table_name.into(),
// We will compress the outer server message,
// after we release the tx lock,
// There's no need to compress the inner table update too.
Compression::None,
&tx,
update_type,
)
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))
}
Protocol::Binary => collect_table_update(&plans, table_id, table_name.into(), &tx, update_type)
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics)),
Protocol::Text => collect_table_update(&plans, table_id, table_name.into(), &tx, update_type)
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics)),
}?)
}

Expand All @@ -240,27 +216,11 @@ impl ModuleSubscriptions {
let tx = DeltaTx::from(tx);
match sender.config.protocol {
Protocol::Binary => {
let (update, metrics) = execute_plans(
queries,
// We will compress the outer server message,
// after we release the tx lock.
// There's no need to compress the inner table updates too.
Compression::None,
&tx,
update_type,
)?;
let (update, metrics) = execute_plans(queries, &tx, update_type)?;
Ok((FormatSwitch::Bsatn(update), metrics))
}
Protocol::Text => {
let (update, metrics) = execute_plans(
queries,
// We will compress the outer server message,
// after we release the tx lock.
// There's no need to compress the inner table updates too.
Compression::None,
&tx,
update_type,
)?;
let (update, metrics) = execute_plans(queries, &tx, update_type)?;
Ok((FormatSwitch::Json(update), metrics))
}
}
Expand Down Expand Up @@ -650,26 +610,10 @@ impl ModuleSubscriptions {

let tx = DeltaTx::from(&*tx);
let (database_update, metrics) = match sender.config.protocol {
Protocol::Binary => execute_plans(
&queries,
// We will compress the outer server message,
// after we release the tx lock.
// There's no need to compress the inner table updates too.
Compression::None,
&tx,
TableUpdateType::Subscribe,
)
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?,
Protocol::Text => execute_plans(
&queries,
// We will compress the outer server message,
// after we release the tx lock.
// There's no need to compress the inner table updates too.
Compression::None,
&tx,
TableUpdateType::Subscribe,
)
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?,
Protocol::Binary => execute_plans(&queries, &tx, TableUpdateType::Subscribe)
.map(|(table_update, metrics)| (FormatSwitch::Bsatn(table_update), metrics))?,
Protocol::Text => execute_plans(&queries, &tx, TableUpdateType::Subscribe)
.map(|(table_update, metrics)| (FormatSwitch::Json(table_update), metrics))?,
};

record_exec_metrics(
Expand Down Expand Up @@ -1539,7 +1483,9 @@ mod tests {
Ok(())
}

/// Test that we do not compress the results of an initial subscribe call
/// Test that we do not compress within a [SubscriptionMessage].
/// The message itself is compressed before being sent over the wire,
/// but we don't care about that for this test.
#[tokio::test]
async fn test_no_compression_for_subscribe() -> anyhow::Result<()> {
// Establish a client connection with compression
Expand Down Expand Up @@ -1583,6 +1529,56 @@ mod tests {
Ok(())
}

/// Test that we do not compress within a [TransactionUpdateMessage].
/// The message itself is compressed before being sent over the wire,
/// but we don't care about that for this test.
#[tokio::test]
async fn test_no_compression_for_update() -> anyhow::Result<()> {
// Establish a client connection with compression
let (tx, mut rx) = client_connection_with_compression(client_id_from_u8(1), Compression::Brotli);

let db = relational_db()?;
let subs = module_subscriptions(db.clone());

let table_id = db.create_table_for_test("t", &[("x", AlgebraicType::U64)], &[])?;

let mut inserts = vec![];

for i in 0..16_000u64 {
inserts.push((table_id, product![i]));
}

// Subscribe to the entire table
subscribe_multi(&subs, &["select * from t"], tx, &mut 0)?;

// Wait to receive the initial subscription message
assert!(matches!(rx.recv().await, Some(SerializableMessage::Subscription(_))));

// Insert a lot of rows into `t`.
// We want to insert enough to cross any threshold there might be for compression.
commit_tx(&db, &subs, [], inserts)?;

// Assert the table updates within this message are all be uncompressed
match rx.recv().await {
Some(SerializableMessage::TxUpdate(TransactionUpdateMessage {
database_update:
SubscriptionUpdateMessage {
database_update: FormatSwitch::Bsatn(ws::DatabaseUpdate { tables }),
..
},
..
})) => {
assert!(tables.iter().all(|TableUpdate { updates, .. }| updates
.iter()
.all(|query_update| matches!(query_update, CompressableQueryUpdate::Uncompressed(_)))));
}
Some(_) => panic!("unexpected message from subscription"),
None => panic!("channel unexpectedly closed"),
};

Ok(())
}

/// In this test we subscribe to a join query, update the lhs table,
/// and assert that the server sends the correct delta to the client.
#[tokio::test]
Expand Down
37 changes: 21 additions & 16 deletions crates/core/src/subscription/module_subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use hashbrown::{HashMap, HashSet};
use itertools::Itertools;
use rayon::iter::{IntoParallelIterator, ParallelIterator};
use spacetimedb_client_api_messages::websocket::{
BsatnFormat, CompressableQueryUpdate, Compression, FormatSwitch, JsonFormat, QueryId, QueryUpdate, WebsocketFormat,
BsatnFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate, WebsocketFormat,
};
use spacetimedb_data_structures::map::{Entry, IntMap};
use spacetimedb_lib::metrics::ExecutionMetrics;
Expand Down Expand Up @@ -817,26 +817,37 @@ impl SubscriptionManager {
.fold(FoldState::default, |mut acc, (qstate, plan)| {
let table_id = plan.subscribed_table_id();
let table_name = plan.subscribed_table_name();
// Store at most one copy of the serialization to BSATN x Compression
// and ditto for the "serialization" for JSON.
// Store at most one copy for both the serialization to BSATN and JSON.
// Each subscriber gets to pick which of these they want,
// but we only fill `ops_bin_{compression}` and `ops_json` at most once.
// but we only fill `ops_bin_uncompressed` and `ops_json` at most once.
// The former will be `Some(_)` if some subscriber uses `Protocol::Binary`
// and the latter `Some(_)` if some subscriber uses `Protocol::Text`.
let mut ops_bin_brotli: Option<(CompressableQueryUpdate<BsatnFormat>, _, _)> = None;
let mut ops_bin_gzip: Option<(CompressableQueryUpdate<BsatnFormat>, _, _)> = None;
let mut ops_bin_none: Option<(CompressableQueryUpdate<BsatnFormat>, _, _)> = None;
//
// Previously we were compressing each `QueryUpdate` within a `TransactionUpdate`.
// The reason was simple - many clients can subscribe to the same query.
// If we compress `TransactionUpdate`s independently for each client,
// we could be doing a lot of redundant compression.
//
// However the risks associated with this approach include:
// 1. We have to hold the tx lock when compressing
// 2. A potentially worse compression ratio
// 3. Extra decompression overhead on the client
//
// Because transaction processing is currently single-threaded,
// the risks of holding the tx lock for longer than necessary,
// as well as the additional message processing overhead on the client,
// outweighed the benefit of reduced cpu with the former approach.
let mut ops_bin_uncompressed: Option<(CompressableQueryUpdate<BsatnFormat>, _, _)> = None;
let mut ops_json: Option<(QueryUpdate<JsonFormat>, _, _)> = None;

fn memo_encode<F: WebsocketFormat>(
updates: &UpdatesRelValue<'_>,
client: &ClientConnectionSender,
memory: &mut Option<(F::QueryUpdate, u64, usize)>,
metrics: &mut ExecutionMetrics,
) -> (F::QueryUpdate, u64) {
let (update, num_rows, num_bytes) = memory
.get_or_insert_with(|| {
let encoded = updates.encode::<F>(client.config.compression);
let encoded = updates.encode::<F>();
// The first time we insert into this map, we call encode.
// This is when we serialize the rows to BSATN/JSON.
// Hence this is where we increment `bytes_scanned`.
Expand Down Expand Up @@ -884,17 +895,11 @@ impl SubscriptionManager {
let update = match client.config.protocol {
Protocol::Binary => Bsatn(memo_encode::<BsatnFormat>(
&delta_updates,
client,
match client.config.compression {
Compression::Brotli => &mut ops_bin_brotli,
Compression::Gzip => &mut ops_bin_gzip,
Compression::None => &mut ops_bin_none,
},
&mut ops_bin_uncompressed,
&mut acc.metrics,
)),
Protocol::Text => Json(memo_encode::<JsonFormat>(
&delta_updates,
client,
&mut ops_json,
&mut acc.metrics,
)),
Expand Down
Loading