Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 13 additions & 1 deletion benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,25 @@ To run for specific query, for example Q21
./bench.sh run tpch10 21
```

## Select join algorithm
## Benchmark with modified configurations
### Select join algorithm
The benchmark runs with `prefer_hash_join == true` by default, which enforces HASH join algorithm.
To run TPCH benchmarks with join other than HASH:
```shell
PREFER_HASH_JOIN=false ./bench.sh run tpch
```

### Configure with environment variables
Any [datafusion options](https://datafusion.apache.org/user-guide/configs.html) that are provided environment variables are
also considered by the benchmarks.
The following configuration runs the TPCH benchmark with datafusion configured to *not* repartition join keys.
```shell
DATAFUSION_OPTIMIZER_REPARTITION_JOINS=false ./bench.sh run tpch
```
You might want to adjust the results location to avoid overwriting previous results.
Environment configuration that was picked up by datafusion is logged at `info` level.
To verify that datafusion picked up your configuration, run the benchmarks with `RUST_LOG=info` or higher.

## Comparing performance of main and a branch

```shell
Expand Down
1 change: 1 addition & 0 deletions benchmarks/bench.sh
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ DATAFUSION_DIR directory to use (default $DATAFUSION_DIR)
RESULTS_NAME folder where the benchmark files are stored
PREFER_HASH_JOIN Prefer hash join algorithm (default true)
VENV_PATH Python venv to use for compare and venv commands (default ./venv, override by <your-venv>/bin/activate)
DATAFUSION_* Set the given datafusion configuration
"
exit 1
}
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/bin/external_aggr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl ExternalAggrConfig {
) -> Result<Vec<QueryResult>> {
let query_name =
format!("Q{query_id}({})", human_readable_size(mem_limit as usize));
let config = self.common.config();
let config = self.common.config()?;
let memory_pool: Arc<dyn MemoryPool> = match mem_pool_type {
"fair" => Arc::new(FairSpillPool::new(mem_limit as usize)),
"greedy" => Arc::new(GreedyMemoryPool::new(mem_limit as usize)),
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/clickbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl RunOpt {
};

// configure parquet options
let mut config = self.common.config();
let mut config = self.common.config()?;
{
let parquet_options = &mut config.options_mut().execution.parquet;
// The hits_partitioned dataset specifies string columns
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/h2o.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ impl RunOpt {
None => queries.min_query_id()..=queries.max_query_id(),
};

let config = self.common.config();
let config = self.common.config()?;
let rt_builder = self.common.runtime_env_builder()?;
let ctx = SessionContext::new_with_config_rt(config, rt_builder.build_arc()?);

Expand Down
6 changes: 3 additions & 3 deletions benchmarks/src/imdb/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ impl RunOpt {
async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
let mut config = self
.common
.config()
.config()?
.with_collect_statistics(!self.disable_statistics);
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
let rt_builder = self.common.runtime_env_builder()?;
Expand Down Expand Up @@ -514,7 +514,7 @@ mod tests {
let common = CommonOpt {
iterations: 1,
partitions: Some(2),
batch_size: 8192,
batch_size: Some(8192),
mem_pool_type: "fair".to_string(),
memory_limit: None,
sort_spill_reservation_bytes: None,
Expand Down Expand Up @@ -550,7 +550,7 @@ mod tests {
let common = CommonOpt {
iterations: 1,
partitions: Some(2),
batch_size: 8192,
batch_size: Some(8192),
mem_pool_type: "fair".to_string(),
memory_limit: None,
sort_spill_reservation_bytes: None,
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/sort_tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ impl RunOpt {

/// Benchmark query `query_id` in `SORT_QUERIES`
async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
let config = self.common.config();
let config = self.common.config()?;
let rt_builder = self.common.runtime_env_builder()?;
let state = SessionStateBuilder::new()
.with_config(config)
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl RunOpt {
async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
let mut config = self
.common
.config()
.config()?
.with_collect_statistics(!self.disable_statistics);
config.options_mut().optimizer.prefer_hash_join = self.prefer_hash_join;
let rt_builder = self.common.runtime_env_builder()?;
Expand Down Expand Up @@ -355,7 +355,7 @@ mod tests {
let common = CommonOpt {
iterations: 1,
partitions: Some(2),
batch_size: 8192,
batch_size: Some(8192),
mem_pool_type: "fair".to_string(),
memory_limit: None,
sort_spill_reservation_bytes: None,
Expand Down Expand Up @@ -392,7 +392,7 @@ mod tests {
let common = CommonOpt {
iterations: 1,
partitions: Some(2),
batch_size: 8192,
batch_size: Some(8192),
mem_pool_type: "fair".to_string(),
memory_limit: None,
sort_spill_reservation_bytes: None,
Expand Down
26 changes: 15 additions & 11 deletions benchmarks/src/util/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use datafusion::{
},
prelude::SessionConfig,
};
use datafusion_common::{utils::get_available_parallelism, DataFusionError, Result};
use datafusion_common::{DataFusionError, Result};
use structopt::StructOpt;

// Common benchmark options (don't use doc comments otherwise this doc
Expand All @@ -41,8 +41,8 @@ pub struct CommonOpt {
pub partitions: Option<usize>,

/// Batch size when reading CSV or Parquet files
#[structopt(short = "s", long = "batch-size", default_value = "8192")]
pub batch_size: usize,
#[structopt(short = "s", long = "batch-size")]
pub batch_size: Option<usize>,

/// The memory pool type to use, should be one of "fair" or "greedy"
#[structopt(long = "mem-pool-type", default_value = "fair")]
Expand All @@ -65,21 +65,25 @@ pub struct CommonOpt {

impl CommonOpt {
/// Return an appropriately configured `SessionConfig`
pub fn config(&self) -> SessionConfig {
self.update_config(SessionConfig::new())
pub fn config(&self) -> Result<SessionConfig> {
SessionConfig::from_env().map(|config| self.update_config(config))
}

/// Modify the existing config appropriately
pub fn update_config(&self, config: SessionConfig) -> SessionConfig {
let mut config = config
.with_target_partitions(
self.partitions.unwrap_or(get_available_parallelism()),
)
.with_batch_size(self.batch_size);
pub fn update_config(&self, mut config: SessionConfig) -> SessionConfig {
if let Some(batch_size) = self.batch_size {
config = config.with_batch_size(batch_size)
}

if let Some(partitions) = self.partitions {
config = config.with_target_partitions(partitions)
}

if let Some(sort_spill_reservation_bytes) = self.sort_spill_reservation_bytes {
config =
config.with_sort_spill_reservation_bytes(sort_spill_reservation_bytes);
}

config
}

Expand Down
2 changes: 1 addition & 1 deletion datafusion/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ log = { workspace = true }
object_store = { workspace = true, optional = true }
parquet = { workspace = true, optional = true, default-features = true }
paste = "1.0.15"
pyo3 = { version = "0.24.0", optional = true }
pyo3 = { version = "0.24.2", optional = true }
recursive = { workspace = true, optional = true }
sqlparser = { workspace = true }
tokio = { workspace = true }
Expand Down
4 changes: 3 additions & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,9 @@ impl ConfigOptions {
for key in keys.0 {
let env = key.to_uppercase().replace('.', "_");
if let Some(var) = std::env::var_os(env) {
ret.set(&key, var.to_string_lossy().as_ref())?;
let value = var.to_string_lossy();
log::info!("Set {key} to {value} from the environment variable");
ret.set(&key, value.as_ref())?;
}
}

Expand Down
Loading
Loading