From ce5593de698d53c14668632d3e3ab7f742832265 Mon Sep 17 00:00:00 2001 From: Nicholas Gates Date: Tue, 3 Dec 2024 11:46:45 -0500 Subject: [PATCH] Add more click bench things (#1530) --- .gitignore | 2 + bench-vortex/src/bin/clickbench.rs | 42 +++++++++++++++---- bench-vortex/src/lib.rs | 12 +++++- .../src/compressors/chunked.rs | 3 +- .../src/sampling_compressor.rs | 25 +++++------ 5 files changed, 60 insertions(+), 24 deletions(-) diff --git a/.gitignore b/.gitignore index 3cf0da99ec..8a22636953 100644 --- a/.gitignore +++ b/.gitignore @@ -203,3 +203,5 @@ benchmarks/.out # Zed .zed/ + +*.plan \ No newline at end of file diff --git a/bench-vortex/src/bin/clickbench.rs b/bench-vortex/src/bin/clickbench.rs index a0c79b2e98..e4bab68a73 100644 --- a/bench-vortex/src/bin/clickbench.rs +++ b/bench-vortex/src/bin/clickbench.rs @@ -1,5 +1,6 @@ #![feature(exit_status_error)] +use std::fs; use std::path::PathBuf; use std::process::Command; use std::time::{Duration, Instant}; @@ -7,7 +8,8 @@ use std::time::{Duration, Instant}; use bench_vortex::clickbench::{self, clickbench_queries, HITS_SCHEMA}; use bench_vortex::display::{print_measurements_json, render_table, DisplayFormat}; use bench_vortex::{ - execute_query, idempotent, setup_logger, Format, IdempotentPath as _, Measurement, + execute_query, idempotent, physical_plan, setup_logger, Format, IdempotentPath as _, + Measurement, }; use clap::Parser; use datafusion::prelude::SessionContext; @@ -33,6 +35,8 @@ struct Args { display_format: DisplayFormat, #[arg(short, long, value_delimiter = ',')] queries: Option>, + #[arg(long, default_value = "false")] + emit_plan: bool, } fn main() { @@ -62,9 +66,14 @@ fn main() { let output_path = basepath.join(format!("hits_{idx}.parquet")); idempotent(&output_path, |output_path| { eprintln!("Fixing parquet file {idx}"); + + // We need to set the home directory because GitHub Actions doesn't set it in a way + // that DuckDB respects. + let home = std::env::var("HOME").unwrap_or_else(|_| "/home/ci-runner".to_string()); + let command = format!( " - SET home_directory='/home/ci-runner/'; + SET home_directory='{home}'; INSTALL HTTPFS; COPY (SELECT * REPLACE (epoch_ms(EventTime * 1000) AS EventTime, \ @@ -85,12 +94,18 @@ fn main() { .unwrap(); }); - let formats = [ - Format::Parquet, - Format::OnDiskVortex { + let formats = if args.only_vortex { + vec![Format::OnDiskVortex { enable_compression: true, - }, - ]; + }] + } else { + vec![ + Format::Parquet, + Format::OnDiskVortex { + enable_compression: true, + }, + ] + }; let queries = match args.queries.clone() { None => clickbench_queries(), @@ -104,7 +119,7 @@ fn main() { let mut all_measurements = Vec::default(); - for format in formats { + for format in &formats { let session_context = SessionContext::new(); let context = session_context.clone(); match format { @@ -136,6 +151,15 @@ fn main() { } for (query_idx, query) in queries.clone().into_iter() { + if args.emit_plan { + let plan = runtime.block_on(physical_plan(&context, &query)).unwrap(); + fs::write( + format!("clickbench_{format}_q{query_idx:02}.plan",), + format!("{:#?}", plan), + ) + .expect("Unable to write file"); + } + let mut fastest_result = Duration::from_millis(u64::MAX); for _ in 0..args.iterations { let exec_duration = runtime.block_on(async { @@ -152,7 +176,7 @@ fn main() { all_measurements.push(Measurement { query_idx, time: fastest_result, - format, + format: *format, dataset: "clickbench".to_string(), }); } diff --git a/bench-vortex/src/lib.rs b/bench-vortex/src/lib.rs index 177dd82289..34eecea777 100644 --- a/bench-vortex/src/lib.rs +++ b/bench-vortex/src/lib.rs @@ -9,7 +9,7 @@ use std::time::Duration; use arrow_array::{RecordBatch, RecordBatchReader}; use datafusion::prelude::SessionContext; -use datafusion_physical_plan::collect; +use datafusion_physical_plan::{collect, ExecutionPlan}; use itertools::Itertools; use log::LevelFilter; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; @@ -255,6 +255,16 @@ pub async fn execute_query(ctx: &SessionContext, query: &str) -> anyhow::Result< Ok(result) } +pub async fn physical_plan( + ctx: &SessionContext, + query: &str, +) -> anyhow::Result> { + let plan = ctx.sql(query).await?; + let (state, plan) = plan.into_parts(); + let optimized = state.optimize(&plan)?; + Ok(state.create_physical_plan(&optimized).await?) +} + #[derive(Clone, Debug)] pub struct Measurement { pub query_idx: usize, diff --git a/vortex-sampling-compressor/src/compressors/chunked.rs b/vortex-sampling-compressor/src/compressors/chunked.rs index 7ccb11ef3a..3c38e94d07 100644 --- a/vortex-sampling-compressor/src/compressors/chunked.rs +++ b/vortex-sampling-compressor/src/compressors/chunked.rs @@ -1,7 +1,6 @@ use std::any::Any; use std::sync::Arc; -use log::info; use vortex_array::aliases::hash_set::HashSet; use vortex_array::array::{ChunkedArray, ChunkedEncoding}; use vortex_array::compress::compute_precompression_stats; @@ -109,7 +108,7 @@ impl ChunkedCompressor { .unwrap_or(false); if ratio > 1.0 || exceeded_target_ratio { - info!("unsatisfactory ratio {}, previous: {:?}", ratio, previous); + log::info!("unsatisfactory ratio {}, previous: {:?}", ratio, previous); let (compressed_chunk, tree) = ctx.compress_array(&chunk)?.into_parts(); let new_ratio = (compressed_chunk.nbytes() as f32) / (chunk.nbytes() as f32); diff --git a/vortex-sampling-compressor/src/sampling_compressor.rs b/vortex-sampling-compressor/src/sampling_compressor.rs index 3e9ba6071e..84fbc0fa50 100644 --- a/vortex-sampling-compressor/src/sampling_compressor.rs +++ b/vortex-sampling-compressor/src/sampling_compressor.rs @@ -1,7 +1,6 @@ use core::fmt::Formatter; use std::fmt::Display; -use log::{debug, info, warn}; use rand::rngs::StdRng; use rand::SeedableRng as _; use vortex_array::aliases::hash_set::HashSet; @@ -147,7 +146,7 @@ impl<'a> SamplingCompressor<'a> { check_statistics_unchanged(arr, compressed.as_ref()); return Ok(compressed); } else { - warn!("{} cannot compress {} like {}", self, arr, l); + log::warn!("{} cannot compress {} like {}", self, arr, l); } } @@ -199,7 +198,7 @@ impl<'a> SamplingCompressor<'a> { }); if !too_deep.is_empty() { - debug!( + log::debug!( "{} skipping encodings due to depth/cost: {}", self, too_deep @@ -210,10 +209,10 @@ impl<'a> SamplingCompressor<'a> { ); } - debug!("{} candidates for {}: {:?}", self, array, candidates); + log::debug!("{} candidates for {}: {:?}", self, array, candidates); if candidates.is_empty() { - debug!( + log::debug!( "{} no compressors for array with dtype: {} and encoding: {}", self, array.dtype(), @@ -257,9 +256,11 @@ impl<'a> SamplingCompressor<'a> { let best = find_best_compression(candidates, &sample, self)? .into_path() .map(|best_compressor| { - debug!( + log::debug!( "{} Compressing array {} with {}", - self, array, best_compressor + self, + array, + best_compressor ); best_compressor.compress_unchecked(array, self) }) @@ -282,7 +283,7 @@ pub(crate) fn find_best_compression<'a>( let mut best_compression_ratio_sample = None; for compression in candidates { - debug!( + log::debug!( "{} trying candidate {} for {}", ctx, compression.id(), @@ -315,7 +316,7 @@ pub(crate) fn find_best_compression<'a>( best = Some(compressed_sample); } - debug!( + log::debug!( "{} with {}: ratio ({}), objective fn value ({}); best so far: ratio ({}), objective fn value ({})", ctx, compression.id(), @@ -330,14 +331,14 @@ pub(crate) fn find_best_compression<'a>( if best_compression_ratio < best_objective_ratio && best_compression_ratio_sample.is_some() { let best_ratio_sample = best_compression_ratio_sample.vortex_expect("already checked that this Option is Some"); - debug!( + log::debug!( "{} best objective fn value ({}) has ratio {} from {}", ctx, best_objective, best_compression_ratio, best.array().tree_display() ); - debug!( + log::debug!( "{} best ratio ({}) has objective fn value {} from {}", ctx, best_compression_ratio, @@ -346,7 +347,7 @@ pub(crate) fn find_best_compression<'a>( ); } - info!( + log::debug!( "{} best compression ({} bytes, {} objective fn value, {} compression ratio", ctx, best.nbytes(),