Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/client-api-messages/src/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -887,7 +887,7 @@ pub fn gzip_compress(bytes: &[u8], out: &mut Vec<u8>) {

pub fn gzip_decompress(bytes: &[u8]) -> Result<Vec<u8>, io::Error> {
let mut decompressed = Vec::new();
let _ = flate2::read::GzDecoder::new(bytes).read(&mut decompressed)?;
let _ = flate2::read::GzDecoder::new(bytes).read_to_end(&mut decompressed)?;
Ok(decompressed)
}

Expand Down
16 changes: 11 additions & 5 deletions crates/core/src/subscription/module_subscription_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use hashbrown::hash_map::OccupiedError;
use hashbrown::{HashMap, HashSet};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use spacetimedb_client_api_messages::websocket::{
BsatnFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate, WebsocketFormat,
BsatnFormat, CompressableQueryUpdate, Compression, FormatSwitch, JsonFormat, QueryId, QueryUpdate, WebsocketFormat,
};
use spacetimedb_data_structures::map::{Entry, IntMap};
use spacetimedb_lib::metrics::ExecutionMetrics;
Expand Down Expand Up @@ -485,13 +485,15 @@ impl SubscriptionManager {
.map(|(hash, plan, mut metrics)| {
let table_id = plan.subscribed_table_id();
let table_name: Box<str> = plan.subscribed_table_name().into();
// Store at most one copy of the serialization to BSATN
// Store at most one copy of the serialization to BSATN x Compression
// and ditto for the "serialization" for JSON.
// Each subscriber gets to pick which of these they want,
// but we only fill `ops_bin` and `ops_json` at most once.
// but we only fill `ops_bin_{compression}` and `ops_json` at most once.
// The former will be `Some(_)` if some subscriber uses `Protocol::Binary`
// and the latter `Some(_)` if some subscriber uses `Protocol::Text`.
let mut ops_bin: Option<(CompressableQueryUpdate<BsatnFormat>, _, _)> = None;
let mut ops_bin_brotli: Option<(CompressableQueryUpdate<BsatnFormat>, _, _)> = None;
let mut ops_bin_gzip: Option<(CompressableQueryUpdate<BsatnFormat>, _, _)> = None;
let mut ops_bin_none: Option<(CompressableQueryUpdate<BsatnFormat>, _, _)> = None;
let mut ops_json: Option<(QueryUpdate<JsonFormat>, _, _)> = None;

fn memo_encode<F: WebsocketFormat>(
Expand Down Expand Up @@ -537,7 +539,11 @@ impl SubscriptionManager {
Protocol::Binary => Bsatn(memo_encode::<BsatnFormat>(
&delta_updates,
client,
&mut ops_bin,
match client.config.compression {
Compression::Brotli => &mut ops_bin_brotli,
Compression::Gzip => &mut ops_bin_gzip,
Compression::None => &mut ops_bin_none,
},
&mut metrics,
)),
Protocol::Text => Json(memo_encode::<JsonFormat>(
Expand Down
1 change: 1 addition & 0 deletions crates/sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub use event::{Event, ReducerEvent, Status};
pub use table::{Table, TableWithPrimaryKey};

pub use spacetime_module::SubscriptionHandle;
pub use spacetimedb_client_api_messages::websocket::Compression;
pub use spacetimedb_lib::{ConnectionId, Identity, ScheduleAt, TimeDuration, Timestamp};
pub use spacetimedb_sats::{i256, u256};

Expand Down
1 change: 1 addition & 0 deletions crates/sdk/tests/test-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ test-counter = { path = "../test-counter" }
tokio.workspace = true
anyhow.workspace = true
env_logger.workspace = true
rand.workspace = true
82 changes: 76 additions & 6 deletions crates/sdk/tests/test-client/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,19 @@ mod module_bindings;

use core::fmt::Display;
use core::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Barrier, Mutex};

use module_bindings::*;

use rand::RngCore;
use spacetimedb_sdk::{
credentials, i256, u256, unstable::CallReducerFlags, ConnectionId, DbConnectionBuilder, DbContext, Event, Identity,
ReducerEvent, Status, SubscriptionHandle, Table, TimeDuration, Timestamp,
credentials, i256, u256, unstable::CallReducerFlags, Compression, ConnectionId, DbConnectionBuilder, DbContext,
Event, Identity, ReducerEvent, Status, SubscriptionHandle, Table, TimeDuration, Timestamp,
};
use test_counter::TestCounter;

mod simple_test_table;
use simple_test_table::{insert_one, on_insert_one};
use simple_test_table::{insert_one, on_insert_one, SimpleTestTable};

mod pk_test_table;
use pk_test_table::{insert_update_delete_one, PkTestTable};
Expand Down Expand Up @@ -122,6 +123,7 @@ fn main() {
"row-deduplication-join-r-and-s" => exec_row_deduplication_join_r_and_s(),
"row-deduplication-r-join-s-and-r-joint" => exec_row_deduplication_r_join_s_and_r_join_t(),
"test-intra-query-bag-semantics-for-join" => test_intra_query_bag_semantics_for_join(),
"two-different-compression-algos" => exec_two_different_compression_algos(),
_ => panic!("Unknown test: {}", test),
}
}
Expand Down Expand Up @@ -1899,12 +1901,13 @@ fn exec_caller_alice_receives_reducer_callback_but_not_bob() {
assert_ne!(conns[0].identity(), conns[1].identity());
}

type ResultRecorder = Box<dyn Send + FnOnce(Result<(), anyhow::Error>)>;

/// [`Option::take`] the `result` function, and invoke it with `res`. Panic if `result` is `None`.
///
/// Used in [`exec_row_deduplication`] to determine that row callbacks are invoked only once,
/// since this will panic if invoked on the same `result` function twice.
#[allow(clippy::type_complexity)]
fn put_result(result: &mut Option<Box<dyn Send + FnOnce(Result<(), anyhow::Error>)>>, res: Result<(), anyhow::Error>) {
fn put_result(result: &mut Option<ResultRecorder>, res: Result<(), anyhow::Error>) {
(result.take().unwrap())(res);
}

Expand Down Expand Up @@ -2157,6 +2160,73 @@ fn test_intra_query_bag_semantics_for_join() {
});
}
});
}

/// Test that several clients subscribing to the same query and using the same protocol (bsatn)
/// can use different compression algorithms than each other.
///
/// This is a regression test.
fn exec_two_different_compression_algos() {
use Compression::*;

// Create 32 KiB of random bytes to make it very likely that compression is used.
// The actual threshold used currently is 1 KiB
// but let's use more than that in case we change it and forget to update here.
let mut rng = rand::thread_rng();
let mut bytes = [0; 1 << 15];
rng.fill_bytes(&mut bytes);
let bytes: Arc<[u8]> = bytes.into();

// Connect with brotli, gzip, and no compression.
// One of them will insert and all of them will subscribe.
// All should get back `bytes`.
fn connect_with_compression(
test_counter: &Arc<TestCounter>,
compression_name: &str,
compression: Compression,
mut recorder: Option<ResultRecorder>,
barrier: &Arc<Barrier>,
expected: &Arc<[u8]>,
) {
let expected1 = expected.clone();
let expected2 = expected1.clone();
let barrier = barrier.clone();
connect_with_then(
test_counter,
compression_name,
|b| b.with_compression(compression),
move |ctx| {
subscribe_these_then(ctx, &["SELECT * FROM vec_u8"], move |ctx| {
VecU8::on_insert(ctx, move |_, actual| {
let actual: &[u8] = actual.n.as_slice();
let res = if actual == &*expected1 {
Ok(())
} else {
Err(anyhow::anyhow!(
"got bad row, expected: {expected1:?}, actual: {actual:?}"
))
};
put_result(&mut recorder, res)
});

// All clients must have subscribed and registered the `on_insert` callback
// before we actually insert the row.
barrier.wait();

if compression == None {
VecU8::insert(ctx, expected2.to_vec());
}
})
},
);
}
let test_counter: Arc<TestCounter> = TestCounter::new();
let barrier = Arc::new(Barrier::new(3));
let got_brotli = Some(test_counter.add_test("got_right_row_brotli"));
let got_gzip = Some(test_counter.add_test("got_right_row_gzip"));
let got_none = Some(test_counter.add_test("got_right_row_none"));
connect_with_compression(&test_counter, "brotli", Brotli, got_brotli, &barrier, &bytes);
connect_with_compression(&test_counter, "gzip", Gzip, got_gzip, &barrier, &bytes);
connect_with_compression(&test_counter, "none", None, got_none, &barrier, &bytes);
test_counter.wait_for_all();
}
5 changes: 5 additions & 0 deletions crates/sdk/tests/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ macro_rules! declare_tests_with_suffix {
fn test_intra_query_bag_semantics_for_join() {
make_test("test-intra-query-bag-semantics-for-join").run()
}

#[test]
fn two_different_compression_algos() {
make_test("two-different-compression-algos").run();
}
}
};
}
Expand Down
Loading