Skip to content

Commit

Permalink
Add more click bench things (#1530)
Browse files Browse the repository at this point in the history
  • Loading branch information
gatesn authored Dec 3, 2024
1 parent 84ac94f commit ce5593d
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 24 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,5 @@ benchmarks/.out

# Zed
.zed/

*.plan
42 changes: 33 additions & 9 deletions bench-vortex/src/bin/clickbench.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#![feature(exit_status_error)]

use std::fs;
use std::path::PathBuf;
use std::process::Command;
use std::time::{Duration, Instant};

use bench_vortex::clickbench::{self, clickbench_queries, HITS_SCHEMA};
use bench_vortex::display::{print_measurements_json, render_table, DisplayFormat};
use bench_vortex::{
execute_query, idempotent, setup_logger, Format, IdempotentPath as _, Measurement,
execute_query, idempotent, physical_plan, setup_logger, Format, IdempotentPath as _,
Measurement,
};
use clap::Parser;
use datafusion::prelude::SessionContext;
Expand All @@ -33,6 +35,8 @@ struct Args {
display_format: DisplayFormat,
#[arg(short, long, value_delimiter = ',')]
queries: Option<Vec<usize>>,
#[arg(long, default_value = "false")]
emit_plan: bool,
}

fn main() {
Expand Down Expand Up @@ -62,9 +66,14 @@ fn main() {
let output_path = basepath.join(format!("hits_{idx}.parquet"));
idempotent(&output_path, |output_path| {
eprintln!("Fixing parquet file {idx}");

// We need to set the home directory because GitHub Actions doesn't set it in a way
// that DuckDB respects.
let home = std::env::var("HOME").unwrap_or_else(|_| "/home/ci-runner".to_string());

let command = format!(
"
SET home_directory='/home/ci-runner/';
SET home_directory='{home}';
INSTALL HTTPFS;
COPY (SELECT * REPLACE
(epoch_ms(EventTime * 1000) AS EventTime, \
Expand All @@ -85,12 +94,18 @@ fn main() {
.unwrap();
});

let formats = [
Format::Parquet,
Format::OnDiskVortex {
let formats = if args.only_vortex {
vec![Format::OnDiskVortex {
enable_compression: true,
},
];
}]
} else {
vec![
Format::Parquet,
Format::OnDiskVortex {
enable_compression: true,
},
]
};

let queries = match args.queries.clone() {
None => clickbench_queries(),
Expand All @@ -104,7 +119,7 @@ fn main() {

let mut all_measurements = Vec::default();

for format in formats {
for format in &formats {
let session_context = SessionContext::new();
let context = session_context.clone();
match format {
Expand Down Expand Up @@ -136,6 +151,15 @@ fn main() {
}

for (query_idx, query) in queries.clone().into_iter() {
if args.emit_plan {
let plan = runtime.block_on(physical_plan(&context, &query)).unwrap();
fs::write(
format!("clickbench_{format}_q{query_idx:02}.plan",),
format!("{:#?}", plan),
)
.expect("Unable to write file");
}

let mut fastest_result = Duration::from_millis(u64::MAX);
for _ in 0..args.iterations {
let exec_duration = runtime.block_on(async {
Expand All @@ -152,7 +176,7 @@ fn main() {
all_measurements.push(Measurement {
query_idx,
time: fastest_result,
format,
format: *format,
dataset: "clickbench".to_string(),
});
}
Expand Down
12 changes: 11 additions & 1 deletion bench-vortex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::time::Duration;

use arrow_array::{RecordBatch, RecordBatchReader};
use datafusion::prelude::SessionContext;
use datafusion_physical_plan::collect;
use datafusion_physical_plan::{collect, ExecutionPlan};
use itertools::Itertools;
use log::LevelFilter;
use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
Expand Down Expand Up @@ -255,6 +255,16 @@ pub async fn execute_query(ctx: &SessionContext, query: &str) -> anyhow::Result<
Ok(result)
}

pub async fn physical_plan(
ctx: &SessionContext,
query: &str,
) -> anyhow::Result<Arc<dyn ExecutionPlan>> {
let plan = ctx.sql(query).await?;
let (state, plan) = plan.into_parts();
let optimized = state.optimize(&plan)?;
Ok(state.create_physical_plan(&optimized).await?)
}

#[derive(Clone, Debug)]
pub struct Measurement {
pub query_idx: usize,
Expand Down
3 changes: 1 addition & 2 deletions vortex-sampling-compressor/src/compressors/chunked.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::any::Any;
use std::sync::Arc;

use log::info;
use vortex_array::aliases::hash_set::HashSet;
use vortex_array::array::{ChunkedArray, ChunkedEncoding};
use vortex_array::compress::compute_precompression_stats;
Expand Down Expand Up @@ -109,7 +108,7 @@ impl ChunkedCompressor {
.unwrap_or(false);

if ratio > 1.0 || exceeded_target_ratio {
info!("unsatisfactory ratio {}, previous: {:?}", ratio, previous);
log::info!("unsatisfactory ratio {}, previous: {:?}", ratio, previous);
let (compressed_chunk, tree) = ctx.compress_array(&chunk)?.into_parts();
let new_ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32);

Expand Down
25 changes: 13 additions & 12 deletions vortex-sampling-compressor/src/sampling_compressor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use core::fmt::Formatter;
use std::fmt::Display;

use log::{debug, info, warn};
use rand::rngs::StdRng;
use rand::SeedableRng as _;
use vortex_array::aliases::hash_set::HashSet;
Expand Down Expand Up @@ -147,7 +146,7 @@ impl<'a> SamplingCompressor<'a> {
check_statistics_unchanged(arr, compressed.as_ref());
return Ok(compressed);
} else {
warn!("{} cannot compress {} like {}", self, arr, l);
log::warn!("{} cannot compress {} like {}", self, arr, l);
}
}

Expand Down Expand Up @@ -199,7 +198,7 @@ impl<'a> SamplingCompressor<'a> {
});

if !too_deep.is_empty() {
debug!(
log::debug!(
"{} skipping encodings due to depth/cost: {}",
self,
too_deep
Expand All @@ -210,10 +209,10 @@ impl<'a> SamplingCompressor<'a> {
);
}

debug!("{} candidates for {}: {:?}", self, array, candidates);
log::debug!("{} candidates for {}: {:?}", self, array, candidates);

if candidates.is_empty() {
debug!(
log::debug!(
"{} no compressors for array with dtype: {} and encoding: {}",
self,
array.dtype(),
Expand Down Expand Up @@ -257,9 +256,11 @@ impl<'a> SamplingCompressor<'a> {
let best = find_best_compression(candidates, &sample, self)?
.into_path()
.map(|best_compressor| {
debug!(
log::debug!(
"{} Compressing array {} with {}",
self, array, best_compressor
self,
array,
best_compressor
);
best_compressor.compress_unchecked(array, self)
})
Expand All @@ -282,7 +283,7 @@ pub(crate) fn find_best_compression<'a>(
let mut best_compression_ratio_sample = None;

for compression in candidates {
debug!(
log::debug!(
"{} trying candidate {} for {}",
ctx,
compression.id(),
Expand Down Expand Up @@ -315,7 +316,7 @@ pub(crate) fn find_best_compression<'a>(
best = Some(compressed_sample);
}

debug!(
log::debug!(
"{} with {}: ratio ({}), objective fn value ({}); best so far: ratio ({}), objective fn value ({})",
ctx,
compression.id(),
Expand All @@ -330,14 +331,14 @@ pub(crate) fn find_best_compression<'a>(
if best_compression_ratio < best_objective_ratio && best_compression_ratio_sample.is_some() {
let best_ratio_sample =
best_compression_ratio_sample.vortex_expect("already checked that this Option is Some");
debug!(
log::debug!(
"{} best objective fn value ({}) has ratio {} from {}",
ctx,
best_objective,
best_compression_ratio,
best.array().tree_display()
);
debug!(
log::debug!(
"{} best ratio ({}) has objective fn value {} from {}",
ctx,
best_compression_ratio,
Expand All @@ -346,7 +347,7 @@ pub(crate) fn find_best_compression<'a>(
);
}

info!(
log::debug!(
"{} best compression ({} bytes, {} objective fn value, {} compression ratio",
ctx,
best.nbytes(),
Expand Down

0 comments on commit ce5593d

Please sign in to comment.