Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 46 additions & 83 deletions crates/bench/benches/special.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use criterion::async_executor::AsyncExecutor;
use criterion::{criterion_group, criterion_main, Criterion};
use mimalloc::MiMalloc;
use sats::bsatn;
use spacetimedb::db::{Config, Storage};
use spacetimedb_bench::{
database::BenchDatabase,
schemas::{create_sequential, u32_u64_str, u32_u64_u64, u64_u64_u32, BenchTable, RandomTable},
spacetime_module::BENCHMARKS_MODULE,
spacetime_module::SpacetimeModule,
};
use spacetimedb_lib::{bsatn::ToBsatn as _, sats, ProductValue};
use spacetimedb_lib::sats::{self, bsatn};
use spacetimedb_lib::{bsatn::ToBsatn as _, ProductValue};
use spacetimedb_schema::schema::TableSchema;
use spacetimedb_testing::modules::start_runtime;
use std::sync::Arc;
use std::sync::OnceLock;
use std::{hint::black_box, sync::Arc};

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
Expand All @@ -20,91 +20,70 @@ fn criterion_benchmark(c: &mut Criterion) {
serialize_benchmarks::<u32_u64_u64>(c);
serialize_benchmarks::<u64_u64_u32>(c);

custom_module_benchmarks(c);
custom_db_benchmarks(c);
}
let db = SpacetimeModule::build(true, true).unwrap();

fn custom_module_benchmarks(c: &mut Criterion) {
let runtime = start_runtime();
custom_module_benchmarks(&db, c);
custom_db_benchmarks(&db, c);
}

let config = Config {
storage: Storage::Memory,
};
let module = runtime.block_on(async { BENCHMARKS_MODULE.load_module(config, None).await });
fn custom_module_benchmarks(m: &SpacetimeModule, c: &mut Criterion) {
let mut group = c.benchmark_group("special/stdb_module");

let args = sats::product!["0".repeat(65536).into_boxed_str()];
c.bench_function("special/stdb_module/large_arguments/64KiB", |b| {
b.iter_batched(
|| args.clone(),
|args| runtime.block_on(async { module.call_reducer_binary("fn_with_1_args", args).await.unwrap() }),
criterion::BatchSize::PerIteration,
)
group.bench_function("large_arguments/64KiB", |b| {
b.to_async(m)
.iter(|| async { m.module.call_reducer_binary("fn_with_1_args", &args).await.unwrap() })
});

for n in [1u32, 100, 1000] {
let args = sats::product![n];
c.bench_function(&format!("special/stdb_module/print_bulk/lines={n}"), |b| {
b.iter_batched(
|| args.clone(),
|args| runtime.block_on(async { module.call_reducer_binary("print_many_things", args).await.unwrap() }),
criterion::BatchSize::PerIteration,
)
group.bench_function(format!("print_bulk/lines={n}"), |b| {
b.to_async(m)
.iter(|| async { m.module.call_reducer_binary("print_many_things", &args).await.unwrap() })
});
}
}

fn custom_db_benchmarks(c: &mut Criterion) {
let runtime = start_runtime();

let config = Config {
storage: Storage::Memory,
};
let module = runtime.block_on(async { BENCHMARKS_MODULE.load_module(config, None).await });
let mut group = c.benchmark_group("special");
fn custom_db_benchmarks(m: &SpacetimeModule, c: &mut Criterion) {
let mut group = c.benchmark_group("special/db_game");
// This bench take long, so adjust for it
group.sample_size(10);

let init_db: OnceLock<()> = OnceLock::new();
for n in [10, 100] {
let args = sats::product![n];
group.bench_function(&format!("db_game/circles/load={n}"), |b| {
group.bench_function(format!("circles/load={n}"), |b| {
// Initialize outside the benchmark so the db is seed once, to avoid to enlarge the db
init_db.get_or_init(|| {
runtime.block_on(async {
module
.call_reducer_binary("init_game_circles", sats::product![100])
m.block_on(async {
m.module
.call_reducer_binary("init_game_circles", &sats::product![100])
.await
.unwrap()
});
});

b.iter(|| {
black_box(
runtime.block_on(async { module.call_reducer_binary("run_game_circles", args.clone()).await.ok() }),
);
})
b.to_async(m)
.iter(|| async { m.module.call_reducer_binary("run_game_circles", &args).await.unwrap() })
});
}

let init_db: OnceLock<()> = OnceLock::new();
for n in [500, 5_000] {
let args = sats::product![n];
group.bench_function(&format!("db_game/ia_loop/load={n}"), |b| {
group.bench_function(format!("ia_loop/load={n}"), |b| {
// Initialize outside the benchmark so the db is seed once, to avoid `unique` constraints violations
init_db.get_or_init(|| {
runtime.block_on(async {
module
.call_reducer_binary("init_game_ia_loop", sats::product![5_000])
m.block_on(async {
m.module
.call_reducer_binary("init_game_ia_loop", &sats::product![5_000])
.await
.unwrap();
})
});

b.iter(|| {
black_box(
runtime.block_on(async { module.call_reducer_binary("run_game_ia_loop", args.clone()).await.ok() }),
);
})
b.to_async(m)
.iter(|| async { m.module.call_reducer_binary("run_game_ia_loop", &args).await.unwrap() })
});
}
}
Expand All @@ -116,37 +95,29 @@ fn serialize_benchmarks<
) {
let name = T::name();
let count = 100;
let mut group = c.benchmark_group("special/serde/serialize");
let mut group = c.benchmark_group(format!("special/serde/serialize/{name}"));
group.throughput(criterion::Throughput::Elements(count));

let data = create_sequential::<T>(0xdeadbeef, count as u32, 100);

group.bench_function(&format!("{name}/product_value/count={count}"), |b| {
group.bench_function(format!("product_value/count={count}"), |b| {
b.iter_batched(
|| data.clone(),
|data| data.into_iter().map(|row| row.into_product_value()).collect::<Vec<_>>(),
criterion::BatchSize::PerIteration,
);
});
// this measures serialization from a ProductValue, not directly (as in, from generated code in the Rust SDK.)
let data_pv = data
let data_pv = &data
.into_iter()
.map(|row| spacetimedb_lib::AlgebraicValue::Product(row.into_product_value()))
.collect::<ProductValue>();

group.bench_function(&format!("{name}/bsatn/count={count}"), |b| {
b.iter_batched_ref(
|| data_pv.clone(),
|data_pv| sats::bsatn::to_vec(data_pv).unwrap(),
criterion::BatchSize::PerIteration,
);
group.bench_function(format!("bsatn/count={count}"), |b| {
b.iter(|| sats::bsatn::to_vec(data_pv).unwrap());
});
group.bench_function(&format!("{name}/json/count={count}"), |b| {
b.iter_batched_ref(
|| data_pv.clone(),
|data_pv| serde_json::to_string(data_pv).unwrap(),
criterion::BatchSize::PerIteration,
);
group.bench_function(format!("json/count={count}"), |b| {
b.iter(|| serde_json::to_string(data_pv).unwrap());
});

let mut table_schema = TableSchema::from_product_type(T::product_type());
Expand All @@ -172,7 +143,7 @@ fn serialize_benchmarks<
.into_iter()
.map(|ptr| table.get_row_ref(&blob_store, ptr).unwrap())
.collect::<Vec<_>>();
group.bench_function(&format!("{name}/bflatn_to_bsatn_slow_path/count={count}"), |b| {
group.bench_function(format!("bflatn_to_bsatn_slow_path/count={count}"), |b| {
b.iter(|| {
let mut buf = Vec::new();
for row_ref in &refs {
Expand All @@ -181,7 +152,7 @@ fn serialize_benchmarks<
buf
})
});
group.bench_function(&format!("{name}/bflatn_to_bsatn_fast_path/count={count}"), |b| {
group.bench_function(format!("bflatn_to_bsatn_fast_path/count={count}"), |b| {
b.iter(|| {
let mut buf = Vec::new();
for row_ref in &refs {
Expand All @@ -193,25 +164,17 @@ fn serialize_benchmarks<

group.finish();

let mut group = c.benchmark_group("special/serde/deserialize");
let mut group = c.benchmark_group(format!("special/serde/deserialize/{name}"));
group.throughput(criterion::Throughput::Elements(count));

let data_bin = sats::bsatn::to_vec(&data_pv).unwrap();
let data_json = serde_json::to_string(&data_pv).unwrap();

group.bench_function(&format!("{name}/bsatn/count={count}"), |b| {
b.iter_batched_ref(
|| data_bin.clone(),
|data_bin| bsatn::from_slice::<Vec<T>>(data_bin).unwrap(),
criterion::BatchSize::PerIteration,
);
group.bench_function(format!("bsatn/count={count}"), |b| {
b.iter(|| bsatn::from_slice::<Vec<T>>(&data_bin).unwrap());
});
group.bench_function(&format!("{name}/json/count={count}"), |b| {
b.iter_batched_ref(
|| data_json.clone(),
|data_json| serde_json::from_str::<Vec<T>>(data_json).unwrap(),
criterion::BatchSize::PerIteration,
);
group.bench_function(format!("json/count={count}"), |b| {
b.iter(|| serde_json::from_str::<Vec<T>>(&data_json).unwrap());
});
// TODO: deserialize benches (needs a typespace)
}
Expand Down
44 changes: 17 additions & 27 deletions crates/bench/src/spacetime_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use crate::{
schemas::{table_name, BenchTable},
ResultBench,
};
use criterion::async_executor::AsyncExecutor;

lazy_static::lazy_static! {
pub static ref BENCHMARKS_MODULE: CompiledModule = {
Expand Down Expand Up @@ -41,22 +42,21 @@ lazy_static::lazy_static! {
/// See the doc comment there for information on the formatting expected for
/// table and reducer names.
pub struct SpacetimeModule {
// Module must be dropped BEFORE runtime otherwise there is a deadlock!
// In Rust, struct fields are guaranteed to drop in declaration order, so don't reorder this field.
pub module: ModuleHandle,
runtime: Runtime,
/// This is here due to Drop shenanigans.
/// It should always be Some when the module is not being dropped.
module: Option<ModuleHandle>,
}

impl Drop for SpacetimeModule {
fn drop(&mut self) {
// Module must be dropped BEFORE runtime,
// otherwise there is a deadlock!
drop(self.module.take());
// Note: we use block_on for the methods here. It adds about 70ns of overhead.
// This isn't currently a problem. Overhead to call an empty reducer is currently 20_000 ns.

impl AsyncExecutor for &SpacetimeModule {
fn block_on<T>(&self, future: impl std::future::Future<Output = T>) -> T {
self.runtime.block_on(future)
}
}

// Note: we use block_on for the methods here. It adds about 70ns of overhead.
// This isn't currently a problem. Overhead to call an empty reducer is currently 20_000 ns.
// It's easier to do it this way because async traits are a mess.
impl BenchDatabase for SpacetimeModule {
fn name() -> &'static str {
Expand Down Expand Up @@ -88,10 +88,7 @@ impl BenchDatabase for SpacetimeModule {
for reducer in module.client.module.info.module_def.reducers() {
log::trace!("SPACETIME_MODULE: LOADED REDUCER: {:?}", reducer);
}
Ok(SpacetimeModule {
runtime,
module: Some(module),
})
Ok(SpacetimeModule { runtime, module })
}

fn create_table<T: BenchTable>(
Expand All @@ -107,7 +104,6 @@ impl BenchDatabase for SpacetimeModule {

fn clear_table(&mut self, table_id: &Self::TableId) -> ResultBench<()> {
let SpacetimeModule { runtime, module } = self;
let module = module.as_mut().unwrap();
runtime.block_on(async move {
// FIXME: this doesn't work. delete is unimplemented!!
/*
Expand All @@ -125,11 +121,10 @@ impl BenchDatabase for SpacetimeModule {
// This implementation will not work if other people are concurrently interacting with our module.
fn count_table(&mut self, table_id: &Self::TableId) -> ResultBench<u32> {
let SpacetimeModule { runtime, module } = self;
let module = module.as_mut().unwrap();

let count = runtime.block_on(async move {
let name = format!("count_{}", table_id.snake_case);
module.call_reducer_binary(&name, [].into()).await?;
module.call_reducer_binary(&name, &[].into()).await?;
let logs = module.read_log(Some(1)).await;
let message = serde_json::from_str::<LoggerRecord>(&logs)?;
if !message.message.starts_with("COUNT: ") {
Expand All @@ -144,10 +139,9 @@ impl BenchDatabase for SpacetimeModule {

fn empty_transaction(&mut self) -> ResultBench<()> {
let SpacetimeModule { runtime, module } = self;
let module = module.as_mut().unwrap();

runtime.block_on(async move {
module.call_reducer_binary("empty", [].into()).await?;
module.call_reducer_binary("empty", &[].into()).await?;
Ok(())
})
}
Expand All @@ -156,34 +150,31 @@ impl BenchDatabase for SpacetimeModule {
let rows = rows.into_iter().map(|row| row.into_product_value()).collect();
let args = product![ArrayValue::Product(rows)];
let SpacetimeModule { runtime, module } = self;
let module = module.as_mut().unwrap();
let reducer_name = format!("insert_bulk_{}", table_id.snake_case);

runtime.block_on(async move {
module.call_reducer_binary(&reducer_name, args).await?;
module.call_reducer_binary(&reducer_name, &args).await?;
Ok(())
})
}

fn update_bulk<T: BenchTable>(&mut self, table_id: &Self::TableId, row_count: u32) -> ResultBench<()> {
let args = product![row_count];
let SpacetimeModule { runtime, module } = self;
let module = module.as_mut().unwrap();
let reducer_name = format!("update_bulk_{}", table_id.snake_case);

runtime.block_on(async move {
module.call_reducer_binary(&reducer_name, args).await?;
module.call_reducer_binary(&reducer_name, &args).await?;
Ok(())
})
}

fn iterate(&mut self, table_id: &Self::TableId) -> ResultBench<()> {
let SpacetimeModule { runtime, module } = self;
let module = module.as_mut().unwrap();
let reducer_name = format!("iterate_{}", table_id.snake_case);

runtime.block_on(async move {
module.call_reducer_binary(&reducer_name, [].into()).await?;
module.call_reducer_binary(&reducer_name, &[].into()).await?;
Ok(())
})
}
Expand All @@ -195,14 +186,13 @@ impl BenchDatabase for SpacetimeModule {
value: AlgebraicValue,
) -> ResultBench<()> {
let SpacetimeModule { runtime, module } = self;
let module = module.as_mut().unwrap();

let product_type = T::product_type();
let column_name = product_type.elements[col_id.into().idx()].name.as_ref().unwrap();
let reducer_name = format!("filter_{}_by_{}", table_id.snake_case, column_name);

runtime.block_on(async move {
module.call_reducer_binary(&reducer_name, [value].into()).await?;
module.call_reducer_binary(&reducer_name, &[value].into()).await?;
Ok(())
})
}
Expand Down
4 changes: 2 additions & 2 deletions crates/testing/src/modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ impl ModuleHandle {
})
}

pub async fn call_reducer_json(&self, reducer: &str, args: sats::ProductValue) -> anyhow::Result<()> {
pub async fn call_reducer_json(&self, reducer: &str, args: &sats::ProductValue) -> anyhow::Result<()> {
let args = serde_json::to_string(&args).unwrap();
let message = Self::call_reducer_msg(reducer, args);
self.send(serde_json::to_string(&SerializeWrapper::new(message)).unwrap())
.await
}

pub async fn call_reducer_binary(&self, reducer: &str, args: sats::ProductValue) -> anyhow::Result<()> {
pub async fn call_reducer_binary(&self, reducer: &str, args: &sats::ProductValue) -> anyhow::Result<()> {
let args = bsatn::to_vec(&args).unwrap();
let message = Self::call_reducer_msg(reducer, args);
self.send(bsatn::to_vec(&message).unwrap()).await
Expand Down
Loading