Skip to content

Remove CoalescePartitions insertion from Joins #15570

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Apr 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use super::utils::{
BatchTransformer, BuildProbeJoinMetrics, NoopBatchTransformer, OnceAsync, OnceFut,
StatefulStreamResult,
};
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::execution_plan::{boundedness_from_children, EmissionType};
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::projection::{
Expand Down Expand Up @@ -189,19 +188,11 @@ impl CrossJoinExec {

/// Asynchronously collect the result of the left child
async fn load_left_input(
left: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
stream: SendableRecordBatchStream,
metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
) -> Result<JoinLeftData> {
// merge all left parts into a single stream
let left_schema = left.schema();
let merge = if left.output_partitioning().partition_count() != 1 {
Arc::new(CoalescePartitionsExec::new(left))
} else {
left
};
let stream = merge.execute(0, context)?;
let left_schema = stream.schema();

// Load all batches and count the rows
let (batches, _metrics, reservation) = stream
Expand Down Expand Up @@ -291,6 +282,13 @@ impl ExecutionPlan for CrossJoinExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if self.left.output_partitioning().partition_count() != 1 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

love it

return internal_err!(
"Invalid CrossJoinExec, the output partition count of the left child must be 1,\
consider using CoalescePartitionsExec or the EnforceDistribution rule"
);
}

let stream = self.right.execute(partition, Arc::clone(&context))?;

let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
Expand All @@ -303,14 +301,15 @@ impl ExecutionPlan for CrossJoinExec {
let enforce_batch_size_in_joins =
context.session_config().enforce_batch_size_in_joins();

let left_fut = self.left_fut.once(|| {
load_left_input(
Arc::clone(&self.left),
context,
let left_fut = self.left_fut.try_once(|| {
let left_stream = self.left.execute(0, context)?;

Ok(load_left_input(
left_stream,
join_metrics.clone(),
reservation,
)
});
))
})?;

if enforce_batch_size_in_joins {
Ok(Box::pin(CrossJoinStream {
Expand Down
52 changes: 24 additions & 28 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ use crate::projection::{
use crate::spill::get_record_batch_memory_size;
use crate::ExecutionPlanProperties;
use crate::{
coalesce_partitions::CoalescePartitionsExec,
common::can_project,
handle_state,
hash_utils::create_hashes,
Expand Down Expand Up @@ -792,34 +791,42 @@ impl ExecutionPlan for HashJoinExec {
);
}

if self.mode == PartitionMode::CollectLeft && left_partitions != 1 {
return internal_err!(
"Invalid HashJoinExec, the output partition count of the left child must be 1 in CollectLeft mode,\
consider using CoalescePartitionsExec or the EnforceDistribution rule"
);
}

let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);
let left_fut = match self.mode {
PartitionMode::CollectLeft => self.left_fut.once(|| {
PartitionMode::CollectLeft => self.left_fut.try_once(|| {
let left_stream = self.left.execute(0, Arc::clone(&context))?;

let reservation =
MemoryConsumer::new("HashJoinInput").register(context.memory_pool());
collect_left_input(
None,

Ok(collect_left_input(
self.random_state.clone(),
Arc::clone(&self.left),
left_stream,
on_left.clone(),
Arc::clone(&context),
join_metrics.clone(),
reservation,
need_produce_result_in_final(self.join_type),
self.right().output_partitioning().partition_count(),
)
}),
))
})?,
PartitionMode::Partitioned => {
let left_stream = self.left.execute(partition, Arc::clone(&context))?;

let reservation =
MemoryConsumer::new(format!("HashJoinInput[{partition}]"))
.register(context.memory_pool());

OnceFut::new(collect_left_input(
Some(partition),
self.random_state.clone(),
Arc::clone(&self.left),
left_stream,
on_left.clone(),
Arc::clone(&context),
join_metrics.clone(),
reservation,
need_produce_result_in_final(self.join_type),
Expand Down Expand Up @@ -930,36 +937,22 @@ impl ExecutionPlan for HashJoinExec {

/// Reads the left (build) side of the input, buffering it in memory, to build a
/// hash table (`LeftJoinData`)
#[allow(clippy::too_many_arguments)]
async fn collect_left_input(
partition: Option<usize>,
random_state: RandomState,
left: Arc<dyn ExecutionPlan>,
left_stream: SendableRecordBatchStream,
on_left: Vec<PhysicalExprRef>,
context: Arc<TaskContext>,
metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
with_visited_indices_bitmap: bool,
probe_threads_count: usize,
) -> Result<JoinLeftData> {
let schema = left.schema();

let (left_input, left_input_partition) = if let Some(partition) = partition {
(left, partition)
} else if left.output_partitioning().partition_count() != 1 {
(Arc::new(CoalescePartitionsExec::new(left)) as _, 0)
} else {
(left, 0)
};

// Depending on partition argument load single partition or whole left side in memory
let stream = left_input.execute(left_input_partition, Arc::clone(&context))?;
let schema = left_stream.schema();

// This operation performs 2 steps at once:
// 1. creates a [JoinHashMap] of all batches from the stream
// 2. stores the batches in a vector.
let initial = (Vec::new(), 0, metrics, reservation);
let (batches, num_rows, metrics, mut reservation) = stream
let (batches, num_rows, metrics, mut reservation) = left_stream
.try_fold(initial, |mut acc, batch| async {
let batch_size = get_record_batch_memory_size(&batch);
// Reserve memory for incoming batch
Expand Down Expand Up @@ -1655,6 +1648,7 @@ impl EmbeddedProjection for HashJoinExec {
#[cfg(test)]
mod tests {
use super::*;
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::test::TestMemoryExec;
use crate::{
common, expressions::Column, repartition::RepartitionExec, test::build_table_i32,
Expand Down Expand Up @@ -2105,6 +2099,7 @@ mod tests {
let left =
TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
.unwrap();
let left = Arc::new(CoalescePartitionsExec::new(left));

let right = build_table(
("a1", &vec![1, 2, 3]),
Expand Down Expand Up @@ -2177,6 +2172,7 @@ mod tests {
let left =
TestMemoryExec::try_new_exec(&[vec![batch1], vec![batch2]], schema, None)
.unwrap();
let left = Arc::new(CoalescePartitionsExec::new(left));
let right = build_table(
("a2", &vec![20, 30, 10]),
("b2", &vec![5, 6, 4]),
Expand Down
32 changes: 16 additions & 16 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use super::utils::{
need_produce_result_in_final, reorder_output_after_swap, swap_join_projection,
BatchSplitter, BatchTransformer, NoopBatchTransformer, StatefulStreamResult,
};
use crate::coalesce_partitions::CoalescePartitionsExec;
use crate::common::can_project;
use crate::execution_plan::{boundedness_from_children, EmissionType};
use crate::joins::utils::{
Expand Down Expand Up @@ -483,23 +482,31 @@ impl ExecutionPlan for NestedLoopJoinExec {
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if self.left.output_partitioning().partition_count() != 1 {
return internal_err!(
"Invalid NestedLoopJoinExec, the output partition count of the left child must be 1,\
consider using CoalescePartitionsExec or the EnforceDistribution rule"
);
}

let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics);

// Initialization reservation for load of inner table
let load_reservation =
MemoryConsumer::new(format!("NestedLoopJoinLoad[{partition}]"))
.register(context.memory_pool());

let inner_table = self.inner_table.once(|| {
collect_left_input(
Arc::clone(&self.left),
Arc::clone(&context),
let inner_table = self.inner_table.try_once(|| {
let stream = self.left.execute(0, Arc::clone(&context))?;

Ok(collect_left_input(
stream,
join_metrics.clone(),
load_reservation,
need_produce_result_in_final(self.join_type),
self.right().output_partitioning().partition_count(),
)
});
))
})?;

let batch_size = context.session_config().batch_size();
let enforce_batch_size_in_joins =
Expand Down Expand Up @@ -610,20 +617,13 @@ impl ExecutionPlan for NestedLoopJoinExec {

/// Asynchronously collect input into a single batch, and creates `JoinLeftData` from it
async fn collect_left_input(
input: Arc<dyn ExecutionPlan>,
context: Arc<TaskContext>,
stream: SendableRecordBatchStream,
join_metrics: BuildProbeJoinMetrics,
reservation: MemoryReservation,
with_visited_left_side: bool,
probe_threads_count: usize,
) -> Result<JoinLeftData> {
let schema = input.schema();
let merge = if input.output_partitioning().partition_count() != 1 {
Arc::new(CoalescePartitionsExec::new(input))
} else {
input
};
let stream = merge.execute(0, context)?;
let schema = stream.schema();

// Load all batches and count the rows
let (batches, metrics, mut reservation) = stream
Expand Down
17 changes: 10 additions & 7 deletions datafusion/physical-plan/src/joins/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ pub fn build_join_schema(
}

/// A [`OnceAsync`] runs an `async` closure once, where multiple calls to
/// [`OnceAsync::once`] return a [`OnceFut`] that resolves to the result of the
/// [`OnceAsync::try_once`] return a [`OnceFut`] that resolves to the result of the
/// same computation.
///
/// This is useful for joins where the results of one child are needed to proceed
Expand All @@ -341,7 +341,7 @@ pub fn build_join_schema(
///
/// Each output partition waits on the same `OnceAsync` before proceeding.
pub(crate) struct OnceAsync<T> {
fut: Mutex<Option<OnceFut<T>>>,
fut: Mutex<Option<SharedResult<OnceFut<T>>>>,
}

impl<T> Default for OnceAsync<T> {
Expand All @@ -360,19 +360,22 @@ impl<T> Debug for OnceAsync<T> {

impl<T: 'static> OnceAsync<T> {
/// If this is the first call to this function on this object, will invoke
/// `f` to obtain a future and return a [`OnceFut`] referring to this
/// `f` to obtain a future and return a [`OnceFut`] referring to this. `f`
/// may fail, in which case its error is returned.
///
/// If this is not the first call, will return a [`OnceFut`] referring
/// to the same future as was returned by the first call
pub(crate) fn once<F, Fut>(&self, f: F) -> OnceFut<T>
/// to the same future as was returned by the first call - or the same
/// error if the initial call to `f` failed.
pub(crate) fn try_once<F, Fut>(&self, f: F) -> Result<OnceFut<T>>
where
F: FnOnce() -> Fut,
F: FnOnce() -> Result<Fut>,
Fut: Future<Output = Result<T>> + Send + 'static,
{
self.fut
.lock()
.get_or_insert_with(|| OnceFut::new(f()))
.get_or_insert_with(|| f().map(OnceFut::new).map_err(Arc::new))
.clone()
.map_err(DataFusionError::Shared)
}
}

Expand Down