Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make benefits_from_input_partitioning Default in SHJ #8801

Merged
merged 3 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 82 additions & 6 deletions datafusion/core/src/physical_optimizer/join_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::ExecutionPlan;

use arrow_schema::Schema;
use datafusion_common::internal_err;
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::{internal_err, JoinSide};
use datafusion_common::{DataFusionError, JoinType};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_expr::PhysicalExpr;
use datafusion_physical_expr::sort_properties::SortProperties;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};

/// The [`JoinSelection`] rule tries to modify a given plan so that it can
/// accommodate infinite sources and optimize joins in the plan according to
Expand Down Expand Up @@ -425,31 +426,106 @@ pub type PipelineFixerSubrule = dyn Fn(
&ConfigOptions,
) -> Option<Result<PipelineStatePropagator>>;

/// This subrule checks if we can replace a hash join with a symmetric hash
/// join when we are dealing with infinite inputs on both sides. This change
/// avoids pipeline breaking and preserves query runnability. If possible,
/// this subrule makes this replacement; otherwise, it has no effect.
/// Converts a hash join to a symmetric hash join in the case of infinite inputs on both sides.
///
/// This subrule checks if a hash join can be replaced with a symmetric hash join when dealing
/// with unbounded (infinite) inputs on both sides. This replacement avoids pipeline breaking and
/// preserves query runnability. If the replacement is applicable, this subrule makes this change;
/// otherwise, it leaves the input unchanged.
///
/// # Arguments
/// * `input` - The current state of the pipeline, including the execution plan.
/// * `config_options` - Configuration options that might affect the transformation logic.
///
/// # Returns
/// An `Option` that contains the `Result` of the transformation. If the transformation is not applicable,
/// it returns `None`. If applicable, it returns `Some(Ok(...))` with the modified pipeline state,
/// or `Some(Err(...))` if an error occurs during the transformation.
fn hash_join_convert_symmetric_subrule(
mut input: PipelineStatePropagator,
config_options: &ConfigOptions,
) -> Option<Result<PipelineStatePropagator>> {
// Check if the current plan node is a HashJoinExec.
if let Some(hash_join) = input.plan.as_any().downcast_ref::<HashJoinExec>() {
// Determine if left and right children are unbounded.
let ub_flags = input.children_unbounded();
let (left_unbounded, right_unbounded) = (ub_flags[0], ub_flags[1]);
// Update the unbounded flag of the input.
input.unbounded = left_unbounded || right_unbounded;
// Process only if both left and right sides are unbounded.
let result = if left_unbounded && right_unbounded {
// Determine the partition mode based on configuration.
let mode = if config_options.optimizer.repartition_joins {
StreamJoinPartitionMode::Partitioned
} else {
StreamJoinPartitionMode::SinglePartition
};
// A closure to determine the required sort order for each side of the join in the SymmetricHashJoinExec.
// This function checks if the columns involved in the filter have any specific ordering requirements.
// If the child nodes (left or right side of the join) already have a defined order and the columns used in the
// filter predicate are ordered, this function captures that ordering requirement. The identified order is then
// used in the SymmetricHashJoinExec to maintain bounded memory during join operations.
// However, if the child nodes do not have an inherent order, or if the filter columns are unordered,
// the function concludes that no specific order is required for the SymmetricHashJoinExec. This approach
// ensures that the symmetric hash join operation only imposes ordering constraints when necessary,
// based on the properties of the child nodes and the filter condition.
let determine_order = |side: JoinSide| -> Option<Vec<PhysicalSortExpr>> {
hash_join
.filter()
.map(|filter| {
filter.column_indices().iter().any(
|ColumnIndex {
index,
side: column_side,
}| {
// Skip if column side does not match the join side.
if *column_side != side {
return false;
}
// Retrieve equivalence properties and schema based on the side.
let (equivalence, schema) = match side {
JoinSide::Left => (
hash_join.left().equivalence_properties(),
hash_join.left().schema(),
),
JoinSide::Right => (
hash_join.right().equivalence_properties(),
hash_join.right().schema(),
),
};

let name = schema.field(*index).name();
let col = Arc::new(Column::new(name, *index)) as _;
// Check if the column is ordered.
equivalence.get_expr_ordering(col).state
!= SortProperties::Unordered
},
)
})
.unwrap_or(false)
.then(|| {
match side {
JoinSide::Left => hash_join.left().output_ordering(),
JoinSide::Right => hash_join.right().output_ordering(),
}
.map(|p| p.to_vec())
})
.flatten()
};

// Determine the sort order for both left and right sides.
let left_order = determine_order(JoinSide::Left);
let right_order = determine_order(JoinSide::Right);

SymmetricHashJoinExec::try_new(
hash_join.left().clone(),
hash_join.right().clone(),
hash_join.on().to_vec(),
hash_join.filter().cloned(),
hash_join.join_type(),
hash_join.null_equals_null(),
left_order,
right_order,
mode,
)
.map(|exec| {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,8 @@ fn try_swapping_with_sym_hash_join(
new_filter,
sym_join.join_type(),
sym_join.null_equals_null(),
sym_join.right().output_ordering().map(|p| p.to_vec()),
sym_join.left().output_ordering().map(|p| p.to_vec()),
sym_join.partition_mode(),
)?)))
}
Expand Down Expand Up @@ -2048,6 +2050,8 @@ mod tests {
)),
&JoinType::Inner,
true,
None,
None,
StreamJoinPartitionMode::SinglePartition,
)?);
let projection: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
Expand Down
68 changes: 68 additions & 0 deletions datafusion/core/tests/sql/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,74 @@ async fn join_change_in_planner() -> Result<()> {
[
"SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a1@0 AS Int64) > CAST(a1@1 AS Int64) + 3 AND CAST(a1@0 AS Int64) < CAST(a1@1 AS Int64) + 10",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a1@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8, preserve_order=true, sort_exprs=a1@0 ASC NULLS LAST",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/right.csv]]}, projection=[a1, a2], has_header=false"
]
};
let mut actual: Vec<&str> = formatted.trim().lines().collect();
// Remove CSV lines
actual.remove(4);
actual.remove(7);

assert_eq!(
expected,
actual[..],
"\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
);
Ok(())
}

#[tokio::test]
async fn join_no_order_on_filter() -> Result<()> {
let config = SessionConfig::new().with_target_partitions(8);
let ctx = SessionContext::new_with_config(config);
let tmp_dir = TempDir::new().unwrap();
let left_file_path = tmp_dir.path().join("left.csv");
File::create(left_file_path.clone()).unwrap();
// Create schema
let schema = Arc::new(Schema::new(vec![
Field::new("a1", DataType::UInt32, false),
Field::new("a2", DataType::UInt32, false),
Field::new("a3", DataType::UInt32, false),
]));
// Specify the ordering:
let file_sort_order = vec![[datafusion_expr::col("a1")]
.into_iter()
.map(|e| {
let ascending = true;
let nulls_first = false;
e.sort(ascending, nulls_first)
})
.collect::<Vec<_>>()];
register_unbounded_file_with_ordering(
&ctx,
schema.clone(),
&left_file_path,
"left",
file_sort_order.clone(),
)?;
let right_file_path = tmp_dir.path().join("right.csv");
File::create(right_file_path.clone()).unwrap();
register_unbounded_file_with_ordering(
&ctx,
schema,
&right_file_path,
"right",
file_sort_order,
)?;
let sql = "SELECT * FROM left as t1 FULL JOIN right as t2 ON t1.a2 = t2.a2 AND t1.a3 > t2.a3 + 3 AND t1.a3 < t2.a3 + 10";
let dataframe = ctx.sql(sql).await?;
let physical_plan = dataframe.create_physical_plan().await?;
let formatted = displayable(physical_plan.as_ref()).indent(true).to_string();
let expected = {
[
"SymmetricHashJoinExec: mode=Partitioned, join_type=Full, on=[(a2@1, a2@1)], filter=CAST(a3@0 AS Int64) > CAST(a3@1 AS Int64) + 3 AND CAST(a3@0 AS Int64) < CAST(a3@1 AS Int64) + 10",
" CoalesceBatchesExec: target_batch_size=8192",
" RepartitionExec: partitioning=Hash([a2@1], 8), input_partitions=8",
" RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1",
// " CsvExec: file_groups={1 group: [[tempdir/left.csv]]}, projection=[a1, a2], has_header=false",
Expand Down
70 changes: 48 additions & 22 deletions datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use datafusion_physical_expr::equivalence::join_equivalence_properties;
use datafusion_physical_expr::intervals::cp_solver::ExprIntervalGraph;

use ahash::RandomState;
use datafusion_physical_expr::PhysicalSortRequirement;
use futures::Stream;
use hashbrown::HashSet;
use parking_lot::Mutex;
Expand Down Expand Up @@ -181,6 +182,10 @@ pub struct SymmetricHashJoinExec {
column_indices: Vec<ColumnIndex>,
/// If null_equals_null is true, null == null else null != null
pub(crate) null_equals_null: bool,
/// Left side sort expression(s)
pub(crate) left_sort_exprs: Option<Vec<PhysicalSortExpr>>,
/// Right side sort expression(s)
pub(crate) right_sort_exprs: Option<Vec<PhysicalSortExpr>>,
/// Partition Mode
mode: StreamJoinPartitionMode,
}
Expand All @@ -192,13 +197,16 @@ impl SymmetricHashJoinExec {
/// - It is not possible to join the left and right sides on keys `on`, or
/// - It fails to construct `SortedFilterExpr`s, or
/// - It fails to create the [ExprIntervalGraph].
#[allow(clippy::too_many_arguments)]
pub fn try_new(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
filter: Option<JoinFilter>,
join_type: &JoinType,
null_equals_null: bool,
left_sort_exprs: Option<Vec<PhysicalSortExpr>>,
right_sort_exprs: Option<Vec<PhysicalSortExpr>>,
mode: StreamJoinPartitionMode,
) -> Result<Self> {
let left_schema = left.schema();
Expand Down Expand Up @@ -232,6 +240,8 @@ impl SymmetricHashJoinExec {
metrics: ExecutionPlanMetricsSet::new(),
column_indices,
null_equals_null,
left_sort_exprs,
right_sort_exprs,
mode,
})
}
Expand Down Expand Up @@ -271,6 +281,16 @@ impl SymmetricHashJoinExec {
self.mode
}

/// Get left_sort_exprs
pub fn left_sort_exprs(&self) -> Option<&[PhysicalSortExpr]> {
self.left_sort_exprs.as_deref()
}

/// Get right_sort_exprs
pub fn right_sort_exprs(&self) -> Option<&[PhysicalSortExpr]> {
self.right_sort_exprs.as_deref()
}

/// Check if order information covers every column in the filter expression.
pub fn check_if_order_information_available(&self) -> Result<bool> {
if let Some(filter) = self.filter() {
Expand Down Expand Up @@ -337,10 +357,6 @@ impl ExecutionPlan for SymmetricHashJoinExec {
Ok(children.iter().any(|u| *u))
}

fn benefits_from_input_partitioning(&self) -> Vec<bool> {
vec![false, false]
}

fn required_input_distribution(&self) -> Vec<Distribution> {
match self.mode {
StreamJoinPartitionMode::Partitioned => {
Expand All @@ -360,6 +376,17 @@ impl ExecutionPlan for SymmetricHashJoinExec {
}
}

fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
vec![
self.left_sort_exprs
.as_ref()
.map(PhysicalSortRequirement::from_sort_exprs),
self.right_sort_exprs
.as_ref()
.map(PhysicalSortRequirement::from_sort_exprs),
]
}

fn output_partitioning(&self) -> Partitioning {
let left_columns_len = self.left.schema().fields.len();
partitioned_join_output_partitioning(
Expand Down Expand Up @@ -403,6 +430,8 @@ impl ExecutionPlan for SymmetricHashJoinExec {
self.filter.clone(),
&self.join_type,
self.null_equals_null,
self.left_sort_exprs.clone(),
self.right_sort_exprs.clone(),
self.mode,
)?))
}
Expand Down Expand Up @@ -431,24 +460,21 @@ impl ExecutionPlan for SymmetricHashJoinExec {
}
// If `filter_state` and `filter` are both present, then calculate sorted filter expressions
// for both sides, and build an expression graph.
let (left_sorted_filter_expr, right_sorted_filter_expr, graph) = match (
self.left.output_ordering(),
self.right.output_ordering(),
&self.filter,
) {
(Some(left_sort_exprs), Some(right_sort_exprs), Some(filter)) => {
let (left, right, graph) = prepare_sorted_exprs(
filter,
&self.left,
&self.right,
left_sort_exprs,
right_sort_exprs,
)?;
(Some(left), Some(right), Some(graph))
}
// If `filter_state` or `filter` is not present, then return None for all three values:
_ => (None, None, None),
};
let (left_sorted_filter_expr, right_sorted_filter_expr, graph) =
match (&self.left_sort_exprs, &self.right_sort_exprs, &self.filter) {
(Some(left_sort_exprs), Some(right_sort_exprs), Some(filter)) => {
let (left, right, graph) = prepare_sorted_exprs(
filter,
&self.left,
&self.right,
left_sort_exprs,
right_sort_exprs,
)?;
(Some(left), Some(right), Some(graph))
}
// If `filter_state` or `filter` is not present, then return None for all three values:
_ => (None, None, None),
};

let (on_left, on_right) = self.on.iter().cloned().unzip();

Expand Down
6 changes: 4 additions & 2 deletions datafusion/physical-plan/src/joins/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,19 @@ pub async fn partitioned_sym_join_with_filter(

let join = SymmetricHashJoinExec::try_new(
Arc::new(RepartitionExec::try_new(
left,
left.clone(),
Partitioning::Hash(left_expr, partition_count),
)?),
Arc::new(RepartitionExec::try_new(
right,
right.clone(),
Partitioning::Hash(right_expr, partition_count),
)?),
on,
filter,
join_type,
null_equals_null,
left.output_ordering().map(|p| p.to_vec()),
right.output_ordering().map(|p| p.to_vec()),
StreamJoinPartitionMode::Partitioned,
)?;

Expand Down
2 changes: 2 additions & 0 deletions datafusion/proto/proto/datafusion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1536,6 +1536,8 @@ message SymmetricHashJoinExecNode {
StreamPartitionMode partition_mode = 6;
bool null_equals_null = 7;
JoinFilter filter = 8;
repeated PhysicalSortExprNode left_sort_exprs = 9;
repeated PhysicalSortExprNode right_sort_exprs = 10;
}

message InterleaveExecNode {
Expand Down
Loading