Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions bench-vortex/src/bench_run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,42 @@ where

fastest_result
}

/// Run a benchmark for a specified time limit, collecting all run durations.
///
/// At least one run is always guaranteed, even if it exceeds the time limit.
pub fn run_timed_with_setup<I, O, S, R, F>(
runtime: &Runtime,
time_limit_secs: u64,
mut setup: S,
mut routine: R,
) -> Vec<Duration>
where
S: FnMut() -> I,
R: FnMut(I) -> F,
F: Future<Output = O>,
{
let time_limit = Duration::from_secs(time_limit_secs);
let overall_start = Instant::now();
let mut runs = Vec::new();

// Looping like this ensures at least one run happens.
loop {
let state = black_box(setup());
let elapsed = runtime.block_on(async {
let start = Instant::now();
let output = routine(state).await;
let elapsed = start.elapsed();
drop(black_box(output));
Comment on lines +65 to +70
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you explain the use of black_box here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no idea... I'm going to test this locally first and I'll probably rewrite stuff

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok its just because the function above uses it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want bb in cause the compiler kills the call to routine() which does seem very unlikely

elapsed
});
runs.push(elapsed);

// Check if we should continue.
if overall_start.elapsed() >= time_limit {
break;
}
}

runs
}
99 changes: 58 additions & 41 deletions bench-vortex/src/bin/random_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::fs::File;
use std::future::Future;
use std::io::{Write, stdout};
use std::path::PathBuf;

use bench_vortex::bench_run::run_with_setup;
use bench_vortex::bench_run::run_timed_with_setup;
use bench_vortex::datasets::taxi_data::*;
use bench_vortex::display::{DisplayFormat, print_measurements_json, render_table};
use bench_vortex::measurements::TimingMeasurement;
Expand Down Expand Up @@ -34,8 +35,9 @@ struct Args {
default_values_t = vec![Format::Parquet, Format::OnDiskVortex]
)]
formats: Vec<Format>,
#[arg(short, long, default_value_t = 10)]
iterations: usize,
/// Time limit in seconds for each benchmark target (e.g., 10 for 10 seconds).
#[arg(long, default_value_t = 10)]
time_limit: u64,
#[arg(short, long)]
threads: Option<usize>,
#[arg(short, long)]
Expand All @@ -61,41 +63,48 @@ fn main() -> anyhow::Result<()> {
random_access(
args.formats,
runtime,
args.iterations,
args.time_limit,
args.display_format,
indices,
&args.output_path,
)
}

/// Given a benchmark future, runs it and returns a [`TimingMeasurement`].
fn create_timing_measurement<O, B, F>(
benchmark: B,
/// Configuration for timing measurements
struct TimingConfig<'a> {
name: String,
storage: String,
runtime: &Runtime,
indices: &Buffer<u64>,
iterations: usize,
runtime: &'a Runtime,
indices: &'a Buffer<u64>,
time_limit: u64,
target: Target,
) -> TimingMeasurement
}

/// Given a benchmark future, runs it and returns a [`TimingMeasurement`].
fn create_timing_measurement<O, B, F>(benchmark: B, config: TimingConfig) -> TimingMeasurement
where
B: FnMut(Buffer<u64>) -> F,
F: Future<Output = O>,
{
let benchmark_duration = run_with_setup(runtime, iterations, || indices.clone(), benchmark);
let runs = run_timed_with_setup(
config.runtime,
config.time_limit,
|| config.indices.clone(),
benchmark,
);

TimingMeasurement {
name,
storage,
target,
time: benchmark_duration,
name: config.name,
storage: config.storage,
target: config.target,
runs,
}
}

fn random_access(
formats: Vec<Format>,
runtime: Runtime,
iterations: usize,
time_limit: u64,
display_format: DisplayFormat,
indices: Buffer<u64>,
output_path: &Option<PathBuf>,
Expand Down Expand Up @@ -123,12 +132,14 @@ fn random_access(
|indices| async {
take_vortex_tokio(&taxi_vortex, indices, validate_vortex_array).await
},
"random-access/vortex-tokio-local-disk".to_string(),
STORAGE_NVME.to_owned(),
&runtime,
&indices,
iterations,
target,
TimingConfig {
name: "random-access/vortex-tokio-local-disk".to_string(),
storage: STORAGE_NVME.to_owned(),
runtime: &runtime,
indices: &indices,
time_limit,
target,
},
)
}
Format::VortexCompact => {
Expand All @@ -139,25 +150,29 @@ fn random_access(
take_vortex_tokio(&taxi_vortex_compact, indices, validate_vortex_array)
.await
},
"random-access/vortex-compact-tokio-local-disk".to_string(),
STORAGE_NVME.to_owned(),
&runtime,
&indices,
iterations,
target,
TimingConfig {
name: "random-access/vortex-compact-tokio-local-disk".to_string(),
storage: STORAGE_NVME.to_owned(),
runtime: &runtime,
indices: &indices,
time_limit,
target,
},
)
}
Format::Parquet => {
let taxi_parquet = runtime.block_on(taxi_data_parquet())?;

create_timing_measurement(
|indices| async { take_parquet(&taxi_parquet, indices).await },
"random-access/parquet-tokio-local-disk".to_string(),
STORAGE_NVME.to_owned(),
&runtime,
&indices,
iterations,
target,
TimingConfig {
name: "random-access/parquet-tokio-local-disk".to_string(),
storage: STORAGE_NVME.to_owned(),
runtime: &runtime,
indices: &indices,
time_limit,
target,
},
)
}
#[cfg(feature = "lance")]
Expand All @@ -166,12 +181,14 @@ fn random_access(

create_timing_measurement(
|indices| async { take_lance(&taxi_lance, indices).await },
"random-access/lance-tokio-local-disk".to_string(),
STORAGE_NVME.to_owned(),
&runtime,
&indices,
iterations,
target,
TimingConfig {
name: "random-access/lance-tokio-local-disk".to_string(),
storage: STORAGE_NVME.to_owned(),
runtime: &runtime,
indices: &indices,
time_limit,
target,
},
)
}
Format::Csv | Format::Arrow | Format::OnDiskDuckDB => unimplemented!(),
Expand Down
22 changes: 19 additions & 3 deletions bench-vortex/src/measurements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,23 @@ pub struct TimingMeasurement {
pub name: String,
pub target: Target,
pub storage: String,
pub time: Duration,
pub runs: Vec<Duration>,
}

impl TimingMeasurement {
pub fn mean_time(&self) -> Duration {
let len = self.runs.len();
if len == 0 {
vortex_panic!("cannot have no runs");
}

let total_nanos: u128 = self.runs.iter().map(|d| d.as_nanos()).sum();
let mean_nanos = total_nanos / len as u128;
Duration::new(
u64::try_from(mean_nanos / 1_000_000_000).vortex_unwrap(),
u32::try_from(mean_nanos % 1_000_000_000).vortex_unwrap(),
)
}
Comment on lines +167 to +180
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do median here

}

impl ToTable for TimingMeasurement {
Expand All @@ -171,7 +187,7 @@ impl ToTable for TimingMeasurement {
name: self.name.clone(),
target: self.target,
unit: Cow::from("μs"),
value: MeasurementValue::Int(self.time.as_micros()),
value: MeasurementValue::Int(self.mean_time().as_micros()),
}
}
}
Expand All @@ -182,7 +198,7 @@ impl ToJson for TimingMeasurement {
name: self.name.clone(),
storage: Some(self.storage.clone()),
unit: Some(Cow::from("ns")),
value: MeasurementValue::Int(self.time.as_nanos()),
value: MeasurementValue::Int(self.mean_time().as_nanos()),
bytes: None,
time: None,
commit_id: Cow::from(GIT_COMMIT_ID.as_str()),
Expand Down
Loading