Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
a6e01e7
Split channel resolver in two
gabotechs Dec 22, 2025
fc9bfc8
Simplify WorkerResolverExtension and ChannelResolverExtension
gabotechs Dec 22, 2025
9e15f2b
Add default builder to ArrowFlightEndpoint
gabotechs Dec 22, 2025
34cf529
Add some docs
gabotechs Dec 22, 2025
312901d
Listen to clippy
gabotechs Dec 23, 2025
f026e41
Split get_flight_client_for_url in two
gabotechs Dec 23, 2025
2508e48
Fix conflicts
gabotechs Dec 24, 2025
f7218b0
Remove unnecessary channel resolver
gabotechs Dec 25, 2025
b49289a
Improve WorkerResolver docs
gabotechs Dec 26, 2025
793f898
Use one ChannelResolver per runtime
gabotechs Dec 26, 2025
eaad60f
Improve error reporting on client connection failure
gabotechs Dec 26, 2025
ea4e09a
Add a from_session_builder method for constructing an InMemoryChannel…
gabotechs Dec 26, 2025
33b0cc7
Add ChannelResolver and WorkerResolver default implementations for Arcs
gabotechs Dec 26, 2025
1aeb719
Make TPC-DS tests use DataFusion test dataset
gabotechs Dec 24, 2025
e377698
Remove non-working in-memory option from benchmarks
gabotechs Dec 24, 2025
7a0b296
Remove unnecessary utils folder
gabotechs Dec 24, 2025
41f90a1
Refactor benchmark folder
gabotechs Dec 24, 2025
c88058e
Rename to prepare_tpch.rs
gabotechs Dec 24, 2025
b3bdd2b
Adapt benchmarks for TPC-DS
gabotechs Dec 24, 2025
05a30cc
Update benchmarks README.md
gabotechs Dec 24, 2025
0c736fd
Fix conflicts
gabotechs Dec 24, 2025
f9f4439
Use default session state builder
gabotechs Dec 26, 2025
e893d51
Add ChildrenIsolatorUnionExec
gabotechs Dec 24, 2025
e02b787
Add proto serde for ChildrenIsolatorUnionExec
gabotechs Dec 24, 2025
dddcd21
Wire up ChildrenIsolatorUnionExec to planner
gabotechs Dec 24, 2025
8eb177b
Add integration tests for distributed UNIONs
gabotechs Dec 24, 2025
6e625cf
Skip query 72 in TPC-DS benchmarks
gabotechs Dec 26, 2025
f4f3a2b
Allow setting children isolator unions
gabotechs Dec 26, 2025
9352b6b
Allow passing multiple queries in benchmarks
gabotechs Dec 26, 2025
59c6dfb
Merge branch 'main' into gabrielmusat/distribute-unions
gabotechs Dec 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions benchmarks/get-tpcds.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env bash

set -e

SCALE_FACTOR=${SCALE_FACTOR:-1}
PARTITIONS=${PARTITIONS:-16}

echo "Generating TPC-DS dataset with SCALE_FACTOR=${SCALE_FACTOR} and PARTITIONS=${PARTITIONS}"

# https://stackoverflow.com/questions/59895/how-do-i-get-the-directory-where-a-bash-script-is-located-from-within-the-script
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
DATA_DIR=${DATA_DIR:-$SCRIPT_DIR/data}
CARGO_COMMAND=${CARGO_COMMAND:-"cargo run --release"}
TPCDS_DIR="${DATA_DIR}/tpcds_sf${SCALE_FACTOR}"

echo "Creating tpcds dataset at Scale Factor ${SCALE_FACTOR} in ${TPCDS_DIR}..."

# Ensure the target data directory exists
mkdir -p "${TPCDS_DIR}"

$CARGO_COMMAND -- prepare-tpcds --output "${TPCDS_DIR}" --partitions "$PARTITIONS"
15 changes: 11 additions & 4 deletions benchmarks/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ use tonic::transport::Server;
#[structopt(verbatim_doc_comment)]
pub struct RunOpt {
/// Query number. If not specified, runs all queries
#[structopt(short, long)]
pub query: Option<usize>,
#[structopt(short, long, use_delimiter = true)]
pub query: Vec<usize>,

/// Path to data files
#[structopt(parse(from_os_str), short = "p", long = "path")]
Expand Down Expand Up @@ -89,6 +89,10 @@ pub struct RunOpt {
#[structopt(long)]
cardinality_task_sf: Option<f64>,

/// Use children isolator UNIONs for distributing UNION operations.
#[structopt(long)]
children_isolator_unions: bool,

/// Collects metrics across network boundaries
#[structopt(long)]
collect_metrics: bool,
Expand Down Expand Up @@ -143,7 +147,9 @@ impl Dataset {
Dataset::Tpch => (1..22 + 1)
.map(|i| Ok((i as usize, tpch::get_test_tpch_query(i)?)))
.collect(),
Dataset::Tpcds => (1..99 + 1)
Dataset::Tpcds => (1..72)
// skip query 72, it's ridiculously slow
.chain(73..99 + 1)
.map(|i| Ok((i, tpcds::get_test_tpcds_query(i)?)))
.collect(),
Dataset::Clickbench => (0..42 + 1)
Expand Down Expand Up @@ -203,6 +209,7 @@ impl RunOpt {
.with_distributed_cardinality_effect_task_scale_factor(
self.cardinality_task_sf.unwrap_or(1.0),
)?
.with_distributed_children_isolator_unions(self.children_isolator_unions)?
.with_distributed_metrics_collection(self.collect_metrics)?
.build();
let ctx = SessionContext::new_with_state(state);
Expand All @@ -219,7 +226,7 @@ impl RunOpt {
let dataset = Dataset::infer_from_data_path(path.clone())?;

for (id, sql) in dataset.queries()? {
if self.query.is_some_and(|v| v != id) {
if !self.query.is_empty() && !self.query.contains(&id) {
continue;
}
let query_id = format!("{dataset:?} {id}");
Expand Down
57 changes: 57 additions & 0 deletions src/distributed_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,35 @@ pub trait DistributedExt: Sized {

/// Same as [DistributedExt::with_distributed_metrics_collection] but with an in-place mutation.
fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;

/// Enables children isolator unions for distributing UNION operations across as many tasks as
/// the sum of all the tasks required for each child.
///
/// For example, if there is a UNION with 3 children, requiring one task each, it will result
/// in a plan with 3 tasks where each task runs one child:
///
/// ```text
/// ┌─────────────────────────────┐┌─────────────────────────────┐┌─────────────────────────────┐
/// │ Task 1 ││ Task 2 ││ Task 3 │
/// │┌───────────────────────────┐││┌───────────────────────────┐││┌───────────────────────────┐│
/// ││ ChildrenIsolatorUnionExec ││││ ChildrenIsolatorUnionExec ││││ ChildrenIsolatorUnionExec ││
/// │└───▲─────────▲─────────▲───┘││└───▲─────────▲─────────▲───┘││└───▲─────────▲─────────▲───┘│
/// │ │ ││ │ ││ │ │
/// │┌───┴───┐ ┌ ─│ ─ ┌ ─│ ─ ││┌ ─│ ─ ┌───┴───┐ ┌ ─│ ─ ││┌ ─│ ─ ┌ ─│ ─ ┌───┴───┐│
/// ││Child 1│ Child 2│ Child 3│││ Child 1│ │Child 2│ Child 3│││ Child 1│ Child 2│ │Child 3││
/// │└───────┘ └ ─ ─ └ ─ ─ ││└ ─ ─ └───────┘ └ ─ ─ ││└ ─ ─ └ ─ ─ └───────┘│
/// └─────────────────────────────┘└─────────────────────────────┘└─────────────────────────────┘
/// ```
fn with_distributed_children_isolator_unions(
self,
enabled: bool,
) -> Result<Self, DataFusionError>;

/// Same as [DistributedExt::with_distributed_children_isolator_unions] but with an in-place mutation.
fn set_distributed_children_isolator_unions(
&mut self,
enabled: bool,
) -> Result<(), DataFusionError>;
}

impl DistributedExt for SessionConfig {
Expand Down Expand Up @@ -489,6 +518,15 @@ impl DistributedExt for SessionConfig {
Ok(())
}

fn set_distributed_children_isolator_unions(
&mut self,
enabled: bool,
) -> Result<(), DataFusionError> {
let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
d_cfg.children_isolator_unions = enabled;
Ok(())
}

delegate! {
to self {
#[call(set_distributed_option_extension)]
Expand Down Expand Up @@ -530,6 +568,10 @@ impl DistributedExt for SessionConfig {
#[call(set_distributed_metrics_collection)]
#[expr($?;Ok(self))]
fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;

#[call(set_distributed_children_isolator_unions)]
#[expr($?;Ok(self))]
fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;
}
}
}
Expand Down Expand Up @@ -586,6 +628,11 @@ impl DistributedExt for SessionStateBuilder {
#[call(set_distributed_metrics_collection)]
#[expr($?;Ok(self))]
fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;

fn set_distributed_children_isolator_unions(&mut self, enabled: bool) -> Result<(), DataFusionError>;
#[call(set_distributed_children_isolator_unions)]
#[expr($?;Ok(self))]
fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;
}
}
}
Expand Down Expand Up @@ -642,6 +689,11 @@ impl DistributedExt for SessionState {
#[call(set_distributed_metrics_collection)]
#[expr($?;Ok(self))]
fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;

fn set_distributed_children_isolator_unions(&mut self, enabled: bool) -> Result<(), DataFusionError>;
#[call(set_distributed_children_isolator_unions)]
#[expr($?;Ok(self))]
fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;
}
}
}
Expand Down Expand Up @@ -698,6 +750,11 @@ impl DistributedExt for SessionContext {
#[call(set_distributed_metrics_collection)]
#[expr($?;Ok(self))]
fn with_distributed_metrics_collection(self, enabled: bool) -> Result<Self, DataFusionError>;

fn set_distributed_children_isolator_unions(&mut self, enabled: bool) -> Result<(), DataFusionError>;
#[call(set_distributed_children_isolator_unions)]
#[expr($?;Ok(self))]
fn with_distributed_children_isolator_unions(self, enabled: bool) -> Result<Self, DataFusionError>;
}
}
}
8 changes: 8 additions & 0 deletions src/distributed_planner/distributed_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ extensions_options! {
/// batches over the wire.
/// If set to 0, batch coalescing is disabled on network shuffle operations.
pub shuffle_batch_size: usize, default = 8192
/// When encountering a UNION operation, isolate its children depending on the task context.
/// For example, on a UNION operation with 3 children running in 3 distributed tasks,
/// instead of executing the 3 children in each 3 tasks with a DistributedTaskContext of
/// 1/3, 2/3, and 3/3 respectively, Execute:
/// - The first child in the first task with a DistributedTaskContext of 1/1
/// - The second child in the second task with a DistributedTaskContext of 1/1
/// - The third child in the third task with a DistributedTaskContext of 1/1
pub children_isolator_unions: bool, default = true
/// Propagate collected metrics from all nodes in the plan across network boundaries
/// so that they can be reconstructed on the head node of the plan.
pub collect_metrics: bool, default = true
Expand Down
Loading