diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 0aef126578f3..d5a086227323 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -25,11 +25,11 @@ use std::fmt; use std::fmt::Formatter; use std::sync::Arc; +use super::output_requirements::OutputRequirementExec; use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_optimizer::utils::{ - add_sort_above, get_children_exectrees, is_coalesce_partitions, is_repartition, - is_sort_preserving_merge, ExecTree, + is_coalesce_partitions, is_repartition, is_sort_preserving_merge, }; use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; @@ -52,8 +52,10 @@ use datafusion_expr::logical_plan::JoinType; use datafusion_physical_expr::expressions::{Column, NoOp}; use datafusion_physical_expr::utils::map_columns_before_projection; use datafusion_physical_expr::{ - physical_exprs_equal, EquivalenceProperties, PhysicalExpr, + physical_exprs_equal, EquivalenceProperties, LexRequirementRef, PhysicalExpr, + PhysicalSortRequirement, }; +use datafusion_physical_plan::sorts::sort::SortExec; use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec}; use datafusion_physical_plan::{get_plan_string, unbounded_output}; @@ -268,11 +270,12 @@ impl PhysicalOptimizerRule for EnforceDistribution { /// 5) For other types of operators, by default, pushdown the parent requirements to children. /// fn adjust_input_keys_ordering( - requirements: PlanWithKeyRequirements, + mut requirements: PlanWithKeyRequirements, ) -> Result> { let parent_required = requirements.required_key_ordering.clone(); let plan_any = requirements.plan.as_any(); - let transformed = if let Some(HashJoinExec { + + if let Some(HashJoinExec { left, right, on, @@ -287,7 +290,7 @@ fn adjust_input_keys_ordering( PartitionMode::Partitioned => { let join_constructor = |new_conditions: (Vec<(Column, Column)>, Vec)| { - Ok(Arc::new(HashJoinExec::try_new( + HashJoinExec::try_new( left.clone(), right.clone(), new_conditions.0, @@ -295,15 +298,17 @@ fn adjust_input_keys_ordering( join_type, PartitionMode::Partitioned, *null_equals_null, - )?) as Arc) + ) + .map(|e| Arc::new(e) as _) }; - Some(reorder_partitioned_join_keys( + reorder_partitioned_join_keys( requirements.plan.clone(), &parent_required, on, vec![], &join_constructor, - )?) + ) + .map(Transformed::Yes) } PartitionMode::CollectLeft => { let new_right_request = match join_type { @@ -321,15 +326,15 @@ fn adjust_input_keys_ordering( }; // Push down requirements to the right side - Some(PlanWithKeyRequirements { - plan: requirements.plan.clone(), - required_key_ordering: vec![], - request_key_ordering: vec![None, new_right_request], - }) + requirements.children[1].required_key_ordering = + new_right_request.unwrap_or(vec![]); + Ok(Transformed::Yes(requirements)) } PartitionMode::Auto => { // Can not satisfy, clear the current requirements and generate new empty requirements - Some(PlanWithKeyRequirements::new(requirements.plan.clone())) + Ok(Transformed::Yes(PlanWithKeyRequirements::new( + requirements.plan, + ))) } } } else if let Some(CrossJoinExec { left, .. }) = @@ -337,14 +342,9 @@ fn adjust_input_keys_ordering( { let left_columns_len = left.schema().fields().len(); // Push down requirements to the right side - Some(PlanWithKeyRequirements { - plan: requirements.plan.clone(), - required_key_ordering: vec![], - request_key_ordering: vec![ - None, - shift_right_required(&parent_required, left_columns_len), - ], - }) + requirements.children[1].required_key_ordering = + shift_right_required(&parent_required, left_columns_len).unwrap_or_default(); + Ok(Transformed::Yes(requirements)) } else if let Some(SortMergeJoinExec { left, right, @@ -357,35 +357,40 @@ fn adjust_input_keys_ordering( { let join_constructor = |new_conditions: (Vec<(Column, Column)>, Vec)| { - Ok(Arc::new(SortMergeJoinExec::try_new( + SortMergeJoinExec::try_new( left.clone(), right.clone(), new_conditions.0, *join_type, new_conditions.1, *null_equals_null, - )?) as Arc) + ) + .map(|e| Arc::new(e) as _) }; - Some(reorder_partitioned_join_keys( + reorder_partitioned_join_keys( requirements.plan.clone(), &parent_required, on, sort_options.clone(), &join_constructor, - )?) + ) + .map(Transformed::Yes) } else if let Some(aggregate_exec) = plan_any.downcast_ref::() { if !parent_required.is_empty() { match aggregate_exec.mode() { - AggregateMode::FinalPartitioned => Some(reorder_aggregate_keys( + AggregateMode::FinalPartitioned => reorder_aggregate_keys( requirements.plan.clone(), &parent_required, aggregate_exec, - )?), - _ => Some(PlanWithKeyRequirements::new(requirements.plan.clone())), + ) + .map(Transformed::Yes), + _ => Ok(Transformed::Yes(PlanWithKeyRequirements::new( + requirements.plan, + ))), } } else { // Keep everything unchanged - None + Ok(Transformed::No(requirements)) } } else if let Some(proj) = plan_any.downcast_ref::() { let expr = proj.expr(); @@ -394,34 +399,28 @@ fn adjust_input_keys_ordering( // Construct a mapping from new name to the the orginal Column let new_required = map_columns_before_projection(&parent_required, expr); if new_required.len() == parent_required.len() { - Some(PlanWithKeyRequirements { - plan: requirements.plan.clone(), - required_key_ordering: vec![], - request_key_ordering: vec![Some(new_required.clone())], - }) + requirements.children[0].required_key_ordering = new_required; + Ok(Transformed::Yes(requirements)) } else { // Can not satisfy, clear the current requirements and generate new empty requirements - Some(PlanWithKeyRequirements::new(requirements.plan.clone())) + Ok(Transformed::Yes(PlanWithKeyRequirements::new( + requirements.plan, + ))) } } else if plan_any.downcast_ref::().is_some() || plan_any.downcast_ref::().is_some() || plan_any.downcast_ref::().is_some() { - Some(PlanWithKeyRequirements::new(requirements.plan.clone())) + Ok(Transformed::Yes(PlanWithKeyRequirements::new( + requirements.plan, + ))) } else { // By default, push down the parent requirements to children - let children_len = requirements.plan.children().len(); - Some(PlanWithKeyRequirements { - plan: requirements.plan.clone(), - required_key_ordering: vec![], - request_key_ordering: vec![Some(parent_required.clone()); children_len], - }) - }; - Ok(if let Some(transformed) = transformed { - Transformed::Yes(transformed) - } else { - Transformed::No(requirements) - }) + requirements.children.iter_mut().for_each(|child| { + child.required_key_ordering = parent_required.clone(); + }); + Ok(Transformed::Yes(requirements)) + } } fn reorder_partitioned_join_keys( @@ -452,28 +451,24 @@ where for idx in 0..sort_options.len() { new_sort_options.push(sort_options[new_positions[idx]]) } - - Ok(PlanWithKeyRequirements { - plan: join_constructor((new_join_on, new_sort_options))?, - required_key_ordering: vec![], - request_key_ordering: vec![Some(left_keys), Some(right_keys)], - }) + let mut requirement_tree = PlanWithKeyRequirements::new(join_constructor(( + new_join_on, + new_sort_options, + ))?); + requirement_tree.children[0].required_key_ordering = left_keys; + requirement_tree.children[1].required_key_ordering = right_keys; + Ok(requirement_tree) } else { - Ok(PlanWithKeyRequirements { - plan: join_plan, - required_key_ordering: vec![], - request_key_ordering: vec![Some(left_keys), Some(right_keys)], - }) + let mut requirement_tree = PlanWithKeyRequirements::new(join_plan); + requirement_tree.children[0].required_key_ordering = left_keys; + requirement_tree.children[1].required_key_ordering = right_keys; + Ok(requirement_tree) } } else { - Ok(PlanWithKeyRequirements { - plan: join_plan, - required_key_ordering: vec![], - request_key_ordering: vec![ - Some(join_key_pairs.left_keys), - Some(join_key_pairs.right_keys), - ], - }) + let mut requirement_tree = PlanWithKeyRequirements::new(join_plan); + requirement_tree.children[0].required_key_ordering = join_key_pairs.left_keys; + requirement_tree.children[1].required_key_ordering = join_key_pairs.right_keys; + Ok(requirement_tree) } } @@ -868,59 +863,24 @@ fn new_join_conditions( .collect() } -/// Updates `dist_onward` such that, to keep track of -/// `input` in the `exec_tree`. -/// -/// # Arguments -/// -/// * `input`: Current execution plan -/// * `dist_onward`: It keeps track of executors starting from a distribution -/// changing operator (e.g Repartition, SortPreservingMergeExec, etc.) -/// until child of `input` (`input` should have single child). -/// * `input_idx`: index of the `input`, for its parent. -/// -fn update_distribution_onward( - input: Arc, - dist_onward: &mut Option, - input_idx: usize, -) { - // Update the onward tree if there is an active branch - if let Some(exec_tree) = dist_onward { - // When we add a new operator to change distribution - // we add RepartitionExec, SortPreservingMergeExec, CoalescePartitionsExec - // in this case, we need to update exec tree idx such that exec tree is now child of these - // operators (change the 0, since all of the operators have single child). - exec_tree.idx = 0; - *exec_tree = ExecTree::new(input, input_idx, vec![exec_tree.clone()]); - } else { - *dist_onward = Some(ExecTree::new(input, input_idx, vec![])); - } -} - /// Adds RoundRobin repartition operator to the plan increase parallelism. /// /// # Arguments /// -/// * `input`: Current execution plan +/// * `input`: Current node. /// * `n_target`: desired target partition number, if partition number of the /// current executor is less than this value. Partition number will be increased. -/// * `dist_onward`: It keeps track of executors starting from a distribution -/// changing operator (e.g Repartition, SortPreservingMergeExec, etc.) -/// until `input` plan. -/// * `input_idx`: index of the `input`, for its parent. /// /// # Returns /// -/// A [Result] object that contains new execution plan, where desired partition number -/// is achieved by adding RoundRobin Repartition. +/// A [`Result`] object that contains new execution plan where the desired +/// partition number is achieved by adding a RoundRobin repartition. fn add_roundrobin_on_top( - input: Arc, + input: DistributionContext, n_target: usize, - dist_onward: &mut Option, - input_idx: usize, -) -> Result> { - // Adding repartition is helpful - if input.output_partitioning().partition_count() < n_target { +) -> Result { + // Adding repartition is helpful: + if input.plan.output_partitioning().partition_count() < n_target { // When there is an existing ordering, we preserve ordering // during repartition. This will be un-done in the future // If any of the following conditions is true @@ -928,13 +888,16 @@ fn add_roundrobin_on_top( // - Usage of order preserving variants is not desirable // (determined by flag `config.optimizer.prefer_existing_sort`) let partitioning = Partitioning::RoundRobinBatch(n_target); - let repartition = - RepartitionExec::try_new(input, partitioning)?.with_preserve_order(); + let repartition = RepartitionExec::try_new(input.plan.clone(), partitioning)? + .with_preserve_order(); - // update distribution onward with new operator - let new_plan = Arc::new(repartition) as Arc; - update_distribution_onward(new_plan.clone(), dist_onward, input_idx); - Ok(new_plan) + let new_plan = Arc::new(repartition) as _; + + Ok(DistributionContext { + plan: new_plan, + distribution_connection: true, + children_nodes: vec![input], + }) } else { // Partition is not helpful, we already have desired number of partitions. Ok(input) @@ -948,46 +911,38 @@ fn add_roundrobin_on_top( /// /// # Arguments /// -/// * `input`: Current execution plan +/// * `input`: Current node. /// * `hash_exprs`: Stores Physical Exprs that are used during hashing. /// * `n_target`: desired target partition number, if partition number of the /// current executor is less than this value. Partition number will be increased. -/// * `dist_onward`: It keeps track of executors starting from a distribution -/// changing operator (e.g Repartition, SortPreservingMergeExec, etc.) -/// until `input` plan. -/// * `input_idx`: index of the `input`, for its parent. /// /// # Returns /// -/// A [`Result`] object that contains new execution plan, where desired distribution is -/// satisfied by adding Hash Repartition. +/// A [`Result`] object that contains new execution plan where the desired +/// distribution is satisfied by adding a Hash repartition. fn add_hash_on_top( - input: Arc, + mut input: DistributionContext, hash_exprs: Vec>, - // Repartition(Hash) will have `n_target` partitions at the output. n_target: usize, - // Stores executors starting from Repartition(RoundRobin) until - // current executor. When Repartition(Hash) is added, `dist_onward` - // is updated such that it stores connection from Repartition(RoundRobin) - // until Repartition(Hash). - dist_onward: &mut Option, - input_idx: usize, repartition_beneficial_stats: bool, -) -> Result> { - if n_target == input.output_partitioning().partition_count() && n_target == 1 { - // In this case adding a hash repartition is unnecessary as the hash - // requirement is implicitly satisfied. +) -> Result { + let partition_count = input.plan.output_partitioning().partition_count(); + // Early return if hash repartition is unnecessary + if n_target == partition_count && n_target == 1 { return Ok(input); } + let satisfied = input + .plan .output_partitioning() .satisfy(Distribution::HashPartitioned(hash_exprs.clone()), || { - input.equivalence_properties() + input.plan.equivalence_properties() }); + // Add hash repartitioning when: // - The hash distribution requirement is not satisfied, or // - We can increase parallelism by adding hash partitioning. - if !satisfied || n_target > input.output_partitioning().partition_count() { + if !satisfied || n_target > input.plan.output_partitioning().partition_count() { // When there is an existing ordering, we preserve ordering during // repartition. This will be rolled back in the future if any of the // following conditions is true: @@ -995,75 +950,66 @@ fn add_hash_on_top( // requirements. // - Usage of order preserving variants is not desirable (per the flag // `config.optimizer.prefer_existing_sort`). - let mut new_plan = if repartition_beneficial_stats { + if repartition_beneficial_stats { // Since hashing benefits from partitioning, add a round-robin repartition // before it: - add_roundrobin_on_top(input, n_target, dist_onward, 0)? - } else { - input - }; + input = add_roundrobin_on_top(input, n_target)?; + } + let partitioning = Partitioning::Hash(hash_exprs, n_target); - let repartition = RepartitionExec::try_new(new_plan, partitioning)? - // preserve any ordering if possible + let repartition = RepartitionExec::try_new(input.plan.clone(), partitioning)? .with_preserve_order(); - new_plan = Arc::new(repartition) as _; - // update distribution onward with new operator - update_distribution_onward(new_plan.clone(), dist_onward, input_idx); - Ok(new_plan) - } else { - Ok(input) + input.children_nodes = vec![input.clone()]; + input.distribution_connection = true; + input.plan = Arc::new(repartition) as _; } + + Ok(input) } -/// Adds a `SortPreservingMergeExec` operator on top of input executor: -/// - to satisfy single distribution requirement. +/// Adds a [`SortPreservingMergeExec`] operator on top of input executor +/// to satisfy single distribution requirement. /// /// # Arguments /// -/// * `input`: Current execution plan -/// * `dist_onward`: It keeps track of executors starting from a distribution -/// changing operator (e.g Repartition, SortPreservingMergeExec, etc.) -/// until `input` plan. -/// * `input_idx`: index of the `input`, for its parent. +/// * `input`: Current node. /// /// # Returns /// -/// New execution plan, where desired single -/// distribution is satisfied by adding `SortPreservingMergeExec`. -fn add_spm_on_top( - input: Arc, - dist_onward: &mut Option, - input_idx: usize, -) -> Arc { +/// Updated node with an execution plan, where desired single +/// distribution is satisfied by adding [`SortPreservingMergeExec`]. +fn add_spm_on_top(input: DistributionContext) -> DistributionContext { // Add SortPreservingMerge only when partition count is larger than 1. - if input.output_partitioning().partition_count() > 1 { + if input.plan.output_partitioning().partition_count() > 1 { // When there is an existing ordering, we preserve ordering - // during decreasıng partıtıons. This will be un-done in the future - // If any of the following conditions is true + // when decreasing partitions. This will be un-done in the future + // if any of the following conditions is true // - Preserving ordering is not helpful in terms of satisfying ordering requirements // - Usage of order preserving variants is not desirable - // (determined by flag `config.optimizer.prefer_existing_sort`) - let should_preserve_ordering = input.output_ordering().is_some(); - let new_plan: Arc = if should_preserve_ordering { - let existing_ordering = input.output_ordering().unwrap_or(&[]); + // (determined by flag `config.optimizer.bounded_order_preserving_variants`) + let should_preserve_ordering = input.plan.output_ordering().is_some(); + + let new_plan = if should_preserve_ordering { Arc::new(SortPreservingMergeExec::new( - existing_ordering.to_vec(), - input, + input.plan.output_ordering().unwrap_or(&[]).to_vec(), + input.plan.clone(), )) as _ } else { - Arc::new(CoalescePartitionsExec::new(input)) as _ + Arc::new(CoalescePartitionsExec::new(input.plan.clone())) as _ }; - // update repartition onward with new operator - update_distribution_onward(new_plan.clone(), dist_onward, input_idx); - new_plan + DistributionContext { + plan: new_plan, + distribution_connection: true, + children_nodes: vec![input], + } } else { input } } -/// Updates the physical plan inside `distribution_context` so that distribution +/// Updates the physical plan inside [`DistributionContext`] so that distribution /// changing operators are removed from the top. If they are necessary, they will /// be added in subsequent stages. /// @@ -1081,48 +1027,23 @@ fn add_spm_on_top( /// "ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC]", /// ``` fn remove_dist_changing_operators( - distribution_context: DistributionContext, + mut distribution_context: DistributionContext, ) -> Result { - let DistributionContext { - mut plan, - mut distribution_onwards, - } = distribution_context; - - // Remove any distribution changing operators at the beginning: - // Note that they will be re-inserted later on if necessary or helpful. - while is_repartition(&plan) - || is_coalesce_partitions(&plan) - || is_sort_preserving_merge(&plan) + while is_repartition(&distribution_context.plan) + || is_coalesce_partitions(&distribution_context.plan) + || is_sort_preserving_merge(&distribution_context.plan) { - // All of above operators have a single child. When we remove the top - // operator, we take the first child. - plan = plan.children().swap_remove(0); - distribution_onwards = - get_children_exectrees(plan.children().len(), &distribution_onwards[0]); + // All of above operators have a single child. First child is only child. + let child = distribution_context.children_nodes.swap_remove(0); + // Remove any distribution changing operators at the beginning: + // Note that they will be re-inserted later on if necessary or helpful. + distribution_context = child; } - // Create a plan with the updated children: - Ok(DistributionContext { - plan, - distribution_onwards, - }) + Ok(distribution_context) } -/// Updates the physical plan `input` by using `dist_onward` replace order preserving operator variants -/// with their corresponding operators that do not preserve order. It is a wrapper for `replace_order_preserving_variants_helper` -fn replace_order_preserving_variants( - input: &mut Arc, - dist_onward: &mut Option, -) -> Result<()> { - if let Some(dist_onward) = dist_onward { - *input = replace_order_preserving_variants_helper(dist_onward)?; - } - *dist_onward = None; - Ok(()) -} - -/// Updates the physical plan inside `ExecTree` if preserving ordering while changing partitioning -/// is not helpful or desirable. +/// Updates the [`DistributionContext`] if preserving ordering while changing partitioning is not helpful or desirable. /// /// Assume that following plan is given: /// ```text @@ -1132,7 +1053,7 @@ fn replace_order_preserving_variants( /// " ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC]", /// ``` /// -/// This function converts plan above (inside `ExecTree`) to the following: +/// This function converts plan above to the following: /// /// ```text /// "CoalescePartitionsExec" @@ -1140,30 +1061,75 @@ fn replace_order_preserving_variants( /// " RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2", /// " ParquetExec: file_groups={2 groups: \[\[x], \[y]]}, projection=\[a, b, c, d, e], output_ordering=\[a@0 ASC]", /// ``` -fn replace_order_preserving_variants_helper( - exec_tree: &ExecTree, -) -> Result> { - let mut updated_children = exec_tree.plan.children(); - for child in &exec_tree.children { - updated_children[child.idx] = replace_order_preserving_variants_helper(child)?; - } - if is_sort_preserving_merge(&exec_tree.plan) { - return Ok(Arc::new(CoalescePartitionsExec::new( - updated_children.swap_remove(0), - ))); - } - if let Some(repartition) = exec_tree.plan.as_any().downcast_ref::() { +fn replace_order_preserving_variants( + mut context: DistributionContext, +) -> Result { + let mut updated_children = context + .children_nodes + .iter() + .map(|child| { + if child.distribution_connection { + replace_order_preserving_variants(child.clone()) + } else { + Ok(child.clone()) + } + }) + .collect::>>()?; + + if is_sort_preserving_merge(&context.plan) { + let child = updated_children.swap_remove(0); + context.plan = Arc::new(CoalescePartitionsExec::new(child.plan.clone())); + context.children_nodes = vec![child]; + return Ok(context); + } else if let Some(repartition) = + context.plan.as_any().downcast_ref::() + { if repartition.preserve_order() { - return Ok(Arc::new( - // new RepartitionExec don't preserve order - RepartitionExec::try_new( - updated_children.swap_remove(0), - repartition.partitioning().clone(), - )?, - )); + let child = updated_children.swap_remove(0); + context.plan = Arc::new(RepartitionExec::try_new( + child.plan.clone(), + repartition.partitioning().clone(), + )?); + context.children_nodes = vec![child]; + return Ok(context); + } + } + + context.plan = context + .plan + .clone() + .with_new_children(updated_children.into_iter().map(|c| c.plan).collect())?; + Ok(context) +} + +/// This utility function adds a [`SortExec`] above an operator according to the +/// given ordering requirements while preserving the original partitioning. +fn add_sort_preserving_partitions( + node: DistributionContext, + sort_requirement: LexRequirementRef, + fetch: Option, +) -> DistributionContext { + // If the ordering requirement is already satisfied, do not add a sort. + if !node + .plan + .equivalence_properties() + .ordering_satisfy_requirement(sort_requirement) + { + let sort_expr = PhysicalSortRequirement::to_sort_exprs(sort_requirement.to_vec()); + let new_sort = SortExec::new(sort_expr, node.plan.clone()).with_fetch(fetch); + + DistributionContext { + plan: Arc::new(if node.plan.output_partitioning().partition_count() > 1 { + new_sort.with_preserve_partitioning(true) + } else { + new_sort + }), + distribution_connection: false, + children_nodes: vec![node], } + } else { + node } - exec_tree.plan.clone().with_new_children(updated_children) } /// This function checks whether we need to add additional data exchange @@ -1174,6 +1140,12 @@ fn ensure_distribution( dist_context: DistributionContext, config: &ConfigOptions, ) -> Result> { + let dist_context = dist_context.update_children()?; + + if dist_context.plan.children().is_empty() { + return Ok(Transformed::No(dist_context)); + } + let target_partitions = config.execution.target_partitions; // When `false`, round robin repartition will not be added to increase parallelism let enable_round_robin = config.optimizer.enable_round_robin_repartition; @@ -1186,14 +1158,11 @@ fn ensure_distribution( let order_preserving_variants_desirable = is_unbounded || config.optimizer.prefer_existing_sort; - if dist_context.plan.children().is_empty() { - return Ok(Transformed::No(dist_context)); - } - // Remove unnecessary repartition from the physical plan if any let DistributionContext { mut plan, - mut distribution_onwards, + distribution_connection, + children_nodes, } = remove_dist_changing_operators(dist_context)?; if let Some(exec) = plan.as_any().downcast_ref::() { @@ -1213,33 +1182,23 @@ fn ensure_distribution( plan = updated_window; } }; - let n_children = plan.children().len(); + // This loop iterates over all the children to: // - Increase parallelism for every child if it is beneficial. // - Satisfy the distribution requirements of every child, if it is not // already satisfied. // We store the updated children in `new_children`. - let new_children = izip!( - plan.children().into_iter(), + let children_nodes = izip!( + children_nodes.into_iter(), plan.required_input_distribution().iter(), plan.required_input_ordering().iter(), - distribution_onwards.iter_mut(), plan.benefits_from_input_partitioning(), - plan.maintains_input_order(), - 0..n_children + plan.maintains_input_order() ) .map( - |( - mut child, - requirement, - required_input_ordering, - dist_onward, - would_benefit, - maintains, - child_idx, - )| { + |(mut child, requirement, required_input_ordering, would_benefit, maintains)| { // Don't need to apply when the returned row count is not greater than 1: - let num_rows = child.statistics()?.num_rows; + let num_rows = child.plan.statistics()?.num_rows; let repartition_beneficial_stats = if num_rows.is_exact().unwrap_or(false) { num_rows .get_value() @@ -1248,45 +1207,39 @@ fn ensure_distribution( } else { true }; + if enable_round_robin // Operator benefits from partitioning (e.g. filter): && (would_benefit && repartition_beneficial_stats) // Unless partitioning doesn't increase the partition count, it is not beneficial: - && child.output_partitioning().partition_count() < target_partitions + && child.plan.output_partitioning().partition_count() < target_partitions { // When `repartition_file_scans` is set, attempt to increase // parallelism at the source. if repartition_file_scans { if let Some(new_child) = - child.repartitioned(target_partitions, config)? + child.plan.repartitioned(target_partitions, config)? { - child = new_child; + child.plan = new_child; } } // Increase parallelism by adding round-robin repartitioning // on top of the operator. Note that we only do this if the // partition count is not already equal to the desired partition // count. - child = add_roundrobin_on_top( - child, - target_partitions, - dist_onward, - child_idx, - )?; + child = add_roundrobin_on_top(child, target_partitions)?; } // Satisfy the distribution requirement if it is unmet. match requirement { Distribution::SinglePartition => { - child = add_spm_on_top(child, dist_onward, child_idx); + child = add_spm_on_top(child); } Distribution::HashPartitioned(exprs) => { child = add_hash_on_top( child, exprs.to_vec(), target_partitions, - dist_onward, - child_idx, repartition_beneficial_stats, )?; } @@ -1299,31 +1252,38 @@ fn ensure_distribution( // - Ordering requirement cannot be satisfied by preserving ordering through repartitions, or // - using order preserving variant is not desirable. let ordering_satisfied = child + .plan .equivalence_properties() .ordering_satisfy_requirement(required_input_ordering); - if !ordering_satisfied || !order_preserving_variants_desirable { - replace_order_preserving_variants(&mut child, dist_onward)?; + if (!ordering_satisfied || !order_preserving_variants_desirable) + && child.distribution_connection + { + child = replace_order_preserving_variants(child)?; // If ordering requirements were satisfied before repartitioning, // make sure ordering requirements are still satisfied after. if ordering_satisfied { // Make sure to satisfy ordering requirement: - add_sort_above(&mut child, required_input_ordering, None); + child = add_sort_preserving_partitions( + child, + required_input_ordering, + None, + ); } } // Stop tracking distribution changing operators - *dist_onward = None; + child.distribution_connection = false; } else { // no ordering requirement match requirement { // Operator requires specific distribution. Distribution::SinglePartition | Distribution::HashPartitioned(_) => { // Since there is no ordering requirement, preserving ordering is pointless - replace_order_preserving_variants(&mut child, dist_onward)?; + child = replace_order_preserving_variants(child)?; } Distribution::UnspecifiedDistribution => { // Since ordering is lost, trying to preserve ordering is pointless - if !maintains { - replace_order_preserving_variants(&mut child, dist_onward)?; + if !maintains || plan.as_any().is::() { + child = replace_order_preserving_variants(child)?; } } } @@ -1334,7 +1294,9 @@ fn ensure_distribution( .collect::>>()?; let new_distribution_context = DistributionContext { - plan: if plan.as_any().is::() && can_interleave(&new_children) { + plan: if plan.as_any().is::() + && can_interleave(children_nodes.iter().map(|c| c.plan.clone())) + { // Add a special case for [`UnionExec`] since we want to "bubble up" // hash-partitioned data. So instead of // @@ -1358,120 +1320,91 @@ fn ensure_distribution( // - Agg: // Repartition (hash): // Data - Arc::new(InterleaveExec::try_new(new_children)?) + Arc::new(InterleaveExec::try_new( + children_nodes.iter().map(|c| c.plan.clone()).collect(), + )?) } else { - plan.with_new_children(new_children)? + plan.with_new_children( + children_nodes.iter().map(|c| c.plan.clone()).collect(), + )? }, - distribution_onwards, + distribution_connection, + children_nodes, }; + Ok(Transformed::Yes(new_distribution_context)) } -/// A struct to keep track of distribution changing executors +/// A struct to keep track of distribution changing operators /// (`RepartitionExec`, `SortPreservingMergeExec`, `CoalescePartitionsExec`), /// and their associated parents inside `plan`. Using this information, /// we can optimize distribution of the plan if/when necessary. #[derive(Debug, Clone)] struct DistributionContext { plan: Arc, - /// Keep track of associations for each child of the plan. If `None`, - /// there is no distribution changing operator in its descendants. - distribution_onwards: Vec>, + /// Indicates whether this plan is connected to a distribution-changing + /// operator. + distribution_connection: bool, + children_nodes: Vec, } impl DistributionContext { - /// Creates an empty context. + /// Creates a tree according to the plan with empty states. fn new(plan: Arc) -> Self { - let length = plan.children().len(); - DistributionContext { + let children = plan.children(); + Self { plan, - distribution_onwards: vec![None; length], + distribution_connection: false, + children_nodes: children.into_iter().map(Self::new).collect(), } } - /// Constructs a new context from children contexts. - fn new_from_children_nodes( - children_nodes: Vec, - parent_plan: Arc, - ) -> Result { - let children_plans = children_nodes - .iter() - .map(|item| item.plan.clone()) - .collect(); - let distribution_onwards = children_nodes - .into_iter() - .enumerate() - .map(|(idx, context)| { - let DistributionContext { - plan, - // The `distribution_onwards` tree keeps track of operators - // that change distribution, or preserves the existing - // distribution (starting from an operator that change distribution). - distribution_onwards, - } = context; - if plan.children().is_empty() { - // Plan has no children, there is nothing to propagate. - None - } else if distribution_onwards[0].is_none() { - if let Some(repartition) = - plan.as_any().downcast_ref::() - { - match repartition.partitioning() { - Partitioning::RoundRobinBatch(_) - | Partitioning::Hash(_, _) => { - // Start tracking operators starting from this repartition (either roundrobin or hash): - return Some(ExecTree::new(plan, idx, vec![])); - } - _ => {} - } - } else if plan.as_any().is::() - || plan.as_any().is::() - { - // Start tracking operators starting from this sort preserving merge: - return Some(ExecTree::new(plan, idx, vec![])); - } - None - } else { - // Propagate children distribution tracking to the above - let new_distribution_onwards = izip!( - plan.required_input_distribution().iter(), - distribution_onwards.into_iter() - ) - .flat_map(|(required_dist, distribution_onwards)| { - if let Some(distribution_onwards) = distribution_onwards { - // Operator can safely propagate the distribution above. - // This is similar to maintaining order in the EnforceSorting rule. - if let Distribution::UnspecifiedDistribution = required_dist { - return Some(distribution_onwards); - } - } - None - }) - .collect::>(); - // Either: - // - None of the children has a connection to an operator that modifies distribution, or - // - The current operator requires distribution at its input so doesn't propagate it above. - if new_distribution_onwards.is_empty() { - None - } else { - Some(ExecTree::new(plan, idx, new_distribution_onwards)) - } + fn update_children(mut self) -> Result { + for child_context in self.children_nodes.iter_mut() { + child_context.distribution_connection = match child_context.plan.as_any() { + plan_any if plan_any.is::() => matches!( + plan_any + .downcast_ref::() + .unwrap() + .partitioning(), + Partitioning::RoundRobinBatch(_) | Partitioning::Hash(_, _) + ), + plan_any + if plan_any.is::() + || plan_any.is::() => + { + true } - }) - .collect(); - Ok(DistributionContext { - plan: with_new_children_if_necessary(parent_plan, children_plans)?.into(), - distribution_onwards, - }) - } + _ => { + child_context.plan.children().is_empty() + || child_context.children_nodes[0].distribution_connection + || child_context + .plan + .required_input_distribution() + .iter() + .zip(child_context.children_nodes.iter()) + .any(|(required_dist, child_context)| { + child_context.distribution_connection + && matches!( + required_dist, + Distribution::UnspecifiedDistribution + ) + }) + } + }; + } - /// Computes distribution tracking contexts for every child of the plan. - fn children(&self) -> Vec { - self.plan - .children() - .into_iter() - .map(DistributionContext::new) - .collect() + let children_plans = self + .children_nodes + .iter() + .map(|context| context.plan.clone()) + .collect::>(); + + Ok(Self { + plan: with_new_children_if_necessary(self.plan, children_plans)?.into(), + distribution_connection: false, + children_nodes: self.children_nodes, + }) } } @@ -1480,8 +1413,8 @@ impl TreeNode for DistributionContext { where F: FnMut(&Self) -> Result, { - for child in self.children() { - match op(&child)? { + for child in &self.children_nodes { + match op(child)? { VisitRecursion::Continue => {} VisitRecursion::Skip => return Ok(VisitRecursion::Continue), VisitRecursion::Stop => return Ok(VisitRecursion::Stop), @@ -1490,20 +1423,23 @@ impl TreeNode for DistributionContext { Ok(VisitRecursion::Continue) } - fn map_children(self, transform: F) -> Result + fn map_children(mut self, transform: F) -> Result where F: FnMut(Self) -> Result, { - let children = self.children(); - if children.is_empty() { - Ok(self) - } else { - let children_nodes = children + if !self.children_nodes.is_empty() { + self.children_nodes = self + .children_nodes .into_iter() .map(transform) - .collect::>>()?; - DistributionContext::new_from_children_nodes(children_nodes, self.plan) + .collect::>()?; + self.plan = with_new_children_if_necessary( + self.plan, + self.children_nodes.iter().map(|c| c.plan.clone()).collect(), + )? + .into(); } + Ok(self) } } @@ -1512,11 +1448,11 @@ impl fmt::Display for DistributionContext { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { let plan_string = get_plan_string(&self.plan); write!(f, "plan: {:?}", plan_string)?; - for (idx, child) in self.distribution_onwards.iter().enumerate() { - if let Some(child) = child { - write!(f, "idx:{:?}, exec_tree:{}", idx, child)?; - } - } + write!( + f, + "distribution_connection:{}", + self.distribution_connection, + )?; write!(f, "") } } @@ -1532,37 +1468,18 @@ struct PlanWithKeyRequirements { plan: Arc, /// Parent required key ordering required_key_ordering: Vec>, - /// The request key ordering to children - request_key_ordering: Vec>>>, + children: Vec, } impl PlanWithKeyRequirements { fn new(plan: Arc) -> Self { - let children_len = plan.children().len(); - PlanWithKeyRequirements { + let children = plan.children(); + Self { plan, required_key_ordering: vec![], - request_key_ordering: vec![None; children_len], + children: children.into_iter().map(Self::new).collect(), } } - - fn children(&self) -> Vec { - let plan_children = self.plan.children(); - assert_eq!(plan_children.len(), self.request_key_ordering.len()); - plan_children - .into_iter() - .zip(self.request_key_ordering.clone()) - .map(|(child, required)| { - let from_parent = required.unwrap_or_default(); - let length = child.children().len(); - PlanWithKeyRequirements { - plan: child, - required_key_ordering: from_parent, - request_key_ordering: vec![None; length], - } - }) - .collect() - } } impl TreeNode for PlanWithKeyRequirements { @@ -1570,9 +1487,8 @@ impl TreeNode for PlanWithKeyRequirements { where F: FnMut(&Self) -> Result, { - let children = self.children(); - for child in children { - match op(&child)? { + for child in &self.children { + match op(child)? { VisitRecursion::Continue => {} VisitRecursion::Skip => return Ok(VisitRecursion::Continue), VisitRecursion::Stop => return Ok(VisitRecursion::Stop), @@ -1582,28 +1498,23 @@ impl TreeNode for PlanWithKeyRequirements { Ok(VisitRecursion::Continue) } - fn map_children(self, transform: F) -> Result + fn map_children(mut self, transform: F) -> Result where F: FnMut(Self) -> Result, { - let children = self.children(); - if !children.is_empty() { - let new_children: Result> = - children.into_iter().map(transform).collect(); - - let children_plans = new_children? + if !self.children.is_empty() { + self.children = self + .children .into_iter() - .map(|child| child.plan) - .collect::>(); - let new_plan = with_new_children_if_necessary(self.plan, children_plans)?; - Ok(PlanWithKeyRequirements { - plan: new_plan.into(), - required_key_ordering: self.required_key_ordering, - request_key_ordering: self.request_key_ordering, - }) - } else { - Ok(self) + .map(transform) + .collect::>()?; + self.plan = with_new_children_if_necessary( + self.plan, + self.children.iter().map(|c| c.plan.clone()).collect(), + )? + .into(); } + Ok(self) } } diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 2ecc1e11b985..77d04a61c59e 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -44,7 +44,7 @@ use crate::physical_optimizer::replace_with_order_preserving_variants::{ use crate::physical_optimizer::sort_pushdown::{pushdown_sorts, SortPushDown}; use crate::physical_optimizer::utils::{ add_sort_above, is_coalesce_partitions, is_limit, is_repartition, is_sort, - is_sort_preserving_merge, is_union, is_window, ExecTree, + is_sort_preserving_merge, is_union, is_window, }; use crate::physical_optimizer::PhysicalOptimizerRule; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -81,78 +81,66 @@ impl EnforceSorting { #[derive(Debug, Clone)] struct PlanWithCorrespondingSort { plan: Arc, - // For every child, keep a subtree of `ExecutionPlan`s starting from the - // child until the `SortExec`(s) -- could be multiple for n-ary plans like - // Union -- that determine the output ordering of the child. If the child - // has no connection to any sort, simply store None (and not a subtree). - sort_onwards: Vec>, + // For every child, track `ExecutionPlan`s starting from the child until + // the `SortExec`(s). If the child has no connection to any sort, it simply + // stores false. + sort_connection: bool, + children_nodes: Vec, } impl PlanWithCorrespondingSort { fn new(plan: Arc) -> Self { - let length = plan.children().len(); - PlanWithCorrespondingSort { + let children = plan.children(); + Self { plan, - sort_onwards: vec![None; length], + sort_connection: false, + children_nodes: children.into_iter().map(Self::new).collect(), } } - fn new_from_children_nodes( - children_nodes: Vec, + fn update_children( parent_plan: Arc, + mut children_nodes: Vec, ) -> Result { - let children_plans = children_nodes - .iter() - .map(|item| item.plan.clone()) - .collect::>(); - let sort_onwards = children_nodes - .into_iter() - .enumerate() - .map(|(idx, item)| { - let plan = &item.plan; - // Leaves of `sort_onwards` are `SortExec` operators, which impose - // an ordering. This tree collects all the intermediate executors - // that maintain this ordering. If we just saw a order imposing - // operator, we reset the tree and start accumulating. - if is_sort(plan) { - return Some(ExecTree::new(item.plan, idx, vec![])); - } else if is_limit(plan) { - // There is no sort linkage for this path, it starts at a limit. - return None; - } + for node in children_nodes.iter_mut() { + let plan = &node.plan; + // Leaves of `sort_onwards` are `SortExec` operators, which impose + // an ordering. This tree collects all the intermediate executors + // that maintain this ordering. If we just saw a order imposing + // operator, we reset the tree and start accumulating. + node.sort_connection = if is_sort(plan) { + // Initiate connection + true + } else if is_limit(plan) { + // There is no sort linkage for this path, it starts at a limit. + false + } else { let is_spm = is_sort_preserving_merge(plan); let required_orderings = plan.required_input_ordering(); let flags = plan.maintains_input_order(); - let children = izip!(flags, item.sort_onwards, required_orderings) - .filter_map(|(maintains, element, required_ordering)| { - if (required_ordering.is_none() && maintains) || is_spm { - element - } else { - None - } - }) - .collect::>(); - if !children.is_empty() { - // Add parent node to the tree if there is at least one - // child with a subtree: - Some(ExecTree::new(item.plan, idx, children)) - } else { - // There is no sort linkage for this child, do nothing. - None - } - }) - .collect(); + // Add parent node to the tree if there is at least one + // child with a sort connection: + izip!(flags, required_orderings).any(|(maintains, required_ordering)| { + let propagates_ordering = + (maintains && required_ordering.is_none()) || is_spm; + let connected_to_sort = + node.children_nodes.iter().any(|item| item.sort_connection); + propagates_ordering && connected_to_sort + }) + } + } + let children_plans = children_nodes + .iter() + .map(|item| item.plan.clone()) + .collect::>(); let plan = with_new_children_if_necessary(parent_plan, children_plans)?.into(); - Ok(PlanWithCorrespondingSort { plan, sort_onwards }) - } - fn children(&self) -> Vec { - self.plan - .children() - .into_iter() - .map(PlanWithCorrespondingSort::new) - .collect() + Ok(Self { + plan, + sort_connection: false, + children_nodes, + }) } } @@ -161,9 +149,8 @@ impl TreeNode for PlanWithCorrespondingSort { where F: FnMut(&Self) -> Result, { - let children = self.children(); - for child in children { - match op(&child)? { + for child in &self.children_nodes { + match op(child)? { VisitRecursion::Continue => {} VisitRecursion::Skip => return Ok(VisitRecursion::Continue), VisitRecursion::Stop => return Ok(VisitRecursion::Stop), @@ -173,102 +160,79 @@ impl TreeNode for PlanWithCorrespondingSort { Ok(VisitRecursion::Continue) } - fn map_children(self, transform: F) -> Result + fn map_children(mut self, transform: F) -> Result where F: FnMut(Self) -> Result, { - let children = self.children(); - if children.is_empty() { - Ok(self) - } else { - let children_nodes = children + if !self.children_nodes.is_empty() { + self.children_nodes = self + .children_nodes .into_iter() .map(transform) - .collect::>>()?; - PlanWithCorrespondingSort::new_from_children_nodes(children_nodes, self.plan) + .collect::>()?; + self.plan = with_new_children_if_necessary( + self.plan, + self.children_nodes.iter().map(|c| c.plan.clone()).collect(), + )? + .into(); } + Ok(self) } } -/// This object is used within the [EnforceSorting] rule to track the closest +/// This object is used within the [`EnforceSorting`] rule to track the closest /// [`CoalescePartitionsExec`] descendant(s) for every child of a plan. #[derive(Debug, Clone)] struct PlanWithCorrespondingCoalescePartitions { plan: Arc, - // For every child, keep a subtree of `ExecutionPlan`s starting from the - // child until the `CoalescePartitionsExec`(s) -- could be multiple for - // n-ary plans like Union -- that affect the output partitioning of the - // child. If the child has no connection to any `CoalescePartitionsExec`, - // simply store None (and not a subtree). - coalesce_onwards: Vec>, + // Stores whether the plan is a `CoalescePartitionsExec` or it is connected to + // a `CoalescePartitionsExec` via its children. + coalesce_connection: bool, + children_nodes: Vec, } impl PlanWithCorrespondingCoalescePartitions { + /// Creates an empty tree with empty connections. fn new(plan: Arc) -> Self { - let length = plan.children().len(); - PlanWithCorrespondingCoalescePartitions { + let children = plan.children(); + Self { plan, - coalesce_onwards: vec![None; length], + coalesce_connection: false, + children_nodes: children.into_iter().map(Self::new).collect(), } } - fn new_from_children_nodes( - children_nodes: Vec, - parent_plan: Arc, - ) -> Result { - let children_plans = children_nodes + fn update_children(mut self) -> Result { + self.coalesce_connection = if self.plan.children().is_empty() { + // Plan has no children, it cannot be a `CoalescePartitionsExec`. + false + } else if is_coalesce_partitions(&self.plan) { + // Initiate a connection + true + } else { + self.children_nodes + .iter() + .enumerate() + .map(|(idx, node)| { + // Only consider operators that don't require a + // single partition, and connected to any coalesce + node.coalesce_connection + && !matches!( + self.plan.required_input_distribution()[idx], + Distribution::SinglePartition + ) + // If all children are None. There is nothing to track, set connection false. + }) + .any(|c| c) + }; + + let children_plans = self + .children_nodes .iter() .map(|item| item.plan.clone()) .collect(); - let coalesce_onwards = children_nodes - .into_iter() - .enumerate() - .map(|(idx, item)| { - // Leaves of the `coalesce_onwards` tree are `CoalescePartitionsExec` - // operators. This tree collects all the intermediate executors that - // maintain a single partition. If we just saw a `CoalescePartitionsExec` - // operator, we reset the tree and start accumulating. - let plan = item.plan; - if plan.children().is_empty() { - // Plan has no children, there is nothing to propagate. - None - } else if is_coalesce_partitions(&plan) { - Some(ExecTree::new(plan, idx, vec![])) - } else { - let children = item - .coalesce_onwards - .into_iter() - .flatten() - .filter(|item| { - // Only consider operators that don't require a - // single partition. - !matches!( - plan.required_input_distribution()[item.idx], - Distribution::SinglePartition - ) - }) - .collect::>(); - if children.is_empty() { - None - } else { - Some(ExecTree::new(plan, idx, children)) - } - } - }) - .collect(); - let plan = with_new_children_if_necessary(parent_plan, children_plans)?.into(); - Ok(PlanWithCorrespondingCoalescePartitions { - plan, - coalesce_onwards, - }) - } - - fn children(&self) -> Vec { - self.plan - .children() - .into_iter() - .map(PlanWithCorrespondingCoalescePartitions::new) - .collect() + self.plan = with_new_children_if_necessary(self.plan, children_plans)?.into(); + Ok(self) } } @@ -277,9 +241,8 @@ impl TreeNode for PlanWithCorrespondingCoalescePartitions { where F: FnMut(&Self) -> Result, { - let children = self.children(); - for child in children { - match op(&child)? { + for child in &self.children_nodes { + match op(child)? { VisitRecursion::Continue => {} VisitRecursion::Skip => return Ok(VisitRecursion::Continue), VisitRecursion::Stop => return Ok(VisitRecursion::Stop), @@ -289,23 +252,23 @@ impl TreeNode for PlanWithCorrespondingCoalescePartitions { Ok(VisitRecursion::Continue) } - fn map_children(self, transform: F) -> Result + fn map_children(mut self, transform: F) -> Result where F: FnMut(Self) -> Result, { - let children = self.children(); - if children.is_empty() { - Ok(self) - } else { - let children_nodes = children + if !self.children_nodes.is_empty() { + self.children_nodes = self + .children_nodes .into_iter() .map(transform) - .collect::>>()?; - PlanWithCorrespondingCoalescePartitions::new_from_children_nodes( - children_nodes, + .collect::>()?; + self.plan = with_new_children_if_necessary( self.plan, - ) + self.children_nodes.iter().map(|c| c.plan.clone()).collect(), + )? + .into(); } + Ok(self) } } @@ -332,6 +295,7 @@ impl PhysicalOptimizerRule for EnforceSorting { } else { adjusted.plan }; + let plan_with_pipeline_fixer = OrderPreservationContext::new(new_plan); let updated_plan = plan_with_pipeline_fixer.transform_up(&|plan_with_pipeline_fixer| { @@ -345,7 +309,8 @@ impl PhysicalOptimizerRule for EnforceSorting { // Execute a top-down traversal to exploit sort push-down opportunities // missed by the bottom-up traversal: - let sort_pushdown = SortPushDown::init(updated_plan.plan); + let mut sort_pushdown = SortPushDown::new(updated_plan.plan); + sort_pushdown.assign_initial_requirements(); let adjusted = sort_pushdown.transform_down(&pushdown_sorts)?; Ok(adjusted.plan) } @@ -376,16 +341,21 @@ impl PhysicalOptimizerRule for EnforceSorting { fn parallelize_sorts( requirements: PlanWithCorrespondingCoalescePartitions, ) -> Result> { - let plan = requirements.plan; - let mut coalesce_onwards = requirements.coalesce_onwards; - if plan.children().is_empty() || coalesce_onwards[0].is_none() { + let PlanWithCorrespondingCoalescePartitions { + mut plan, + coalesce_connection, + mut children_nodes, + } = requirements.update_children()?; + + if plan.children().is_empty() || !children_nodes[0].coalesce_connection { // We only take an action when the plan is either a SortExec, a // SortPreservingMergeExec or a CoalescePartitionsExec, and they // all have a single child. Therefore, if the first child is `None`, // we can return immediately. return Ok(Transformed::No(PlanWithCorrespondingCoalescePartitions { plan, - coalesce_onwards, + coalesce_connection, + children_nodes, })); } else if (is_sort(&plan) || is_sort_preserving_merge(&plan)) && plan.output_partitioning().partition_count() <= 1 @@ -395,34 +365,30 @@ fn parallelize_sorts( // executors don't require single partition), then we can replace // the CoalescePartitionsExec + Sort cascade with a SortExec + // SortPreservingMergeExec cascade to parallelize sorting. - let mut prev_layer = plan.clone(); - update_child_to_remove_coalesce(&mut prev_layer, &mut coalesce_onwards[0])?; let (sort_exprs, fetch) = get_sort_exprs(&plan)?; - add_sort_above( - &mut prev_layer, - &PhysicalSortRequirement::from_sort_exprs(sort_exprs), - fetch, - ); - let spm = SortPreservingMergeExec::new(sort_exprs.to_vec(), prev_layer) - .with_fetch(fetch); - return Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions { - plan: Arc::new(spm), - coalesce_onwards: vec![None], - })); + let sort_reqs = PhysicalSortRequirement::from_sort_exprs(sort_exprs); + let sort_exprs = sort_exprs.to_vec(); + update_child_to_remove_coalesce(&mut plan, &mut children_nodes[0])?; + add_sort_above(&mut plan, &sort_reqs, fetch); + let spm = SortPreservingMergeExec::new(sort_exprs, plan).with_fetch(fetch); + + return Ok(Transformed::Yes( + PlanWithCorrespondingCoalescePartitions::new(Arc::new(spm)), + )); } else if is_coalesce_partitions(&plan) { // There is an unnecessary `CoalescePartitionsExec` in the plan. - let mut prev_layer = plan.clone(); - update_child_to_remove_coalesce(&mut prev_layer, &mut coalesce_onwards[0])?; - let new_plan = plan.with_new_children(vec![prev_layer])?; - return Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions { - plan: new_plan, - coalesce_onwards: vec![None], - })); + update_child_to_remove_coalesce(&mut plan, &mut children_nodes[0])?; + + let new_plan = Arc::new(CoalescePartitionsExec::new(plan)) as _; + return Ok(Transformed::Yes( + PlanWithCorrespondingCoalescePartitions::new(new_plan), + )); } Ok(Transformed::Yes(PlanWithCorrespondingCoalescePartitions { plan, - coalesce_onwards, + coalesce_connection, + children_nodes, })) } @@ -431,91 +397,102 @@ fn parallelize_sorts( fn ensure_sorting( requirements: PlanWithCorrespondingSort, ) -> Result> { + let requirements = PlanWithCorrespondingSort::update_children( + requirements.plan, + requirements.children_nodes, + )?; + // Perform naive analysis at the beginning -- remove already-satisfied sorts: if requirements.plan.children().is_empty() { return Ok(Transformed::No(requirements)); } - let plan = requirements.plan; - let mut children = plan.children(); - let mut sort_onwards = requirements.sort_onwards; - if let Some(result) = analyze_immediate_sort_removal(&plan, &sort_onwards) { + if let Some(result) = analyze_immediate_sort_removal(&requirements) { return Ok(Transformed::Yes(result)); } - for (idx, (child, sort_onwards, required_ordering)) in izip!( - children.iter_mut(), - sort_onwards.iter_mut(), - plan.required_input_ordering() - ) - .enumerate() + + let plan = requirements.plan; + let mut children_nodes = requirements.children_nodes; + + for (idx, (child_node, required_ordering)) in + izip!(children_nodes.iter_mut(), plan.required_input_ordering()).enumerate() { - let physical_ordering = child.output_ordering(); + let mut child_plan = child_node.plan.clone(); + let physical_ordering = child_plan.output_ordering(); match (required_ordering, physical_ordering) { (Some(required_ordering), Some(_)) => { - if !child + if !child_plan .equivalence_properties() .ordering_satisfy_requirement(&required_ordering) { // Make sure we preserve the ordering requirements: - update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?; - add_sort_above(child, &required_ordering, None); - if is_sort(child) { - *sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![])); - } else { - *sort_onwards = None; + update_child_to_remove_unnecessary_sort(idx, child_node, &plan)?; + add_sort_above(&mut child_plan, &required_ordering, None); + if is_sort(&child_plan) { + *child_node = PlanWithCorrespondingSort::update_children( + child_plan, + vec![child_node.clone()], + )?; + child_node.sort_connection = true; } } } (Some(required), None) => { // Ordering requirement is not met, we should add a `SortExec` to the plan. - add_sort_above(child, &required, None); - *sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![])); + add_sort_above(&mut child_plan, &required, None); + *child_node = PlanWithCorrespondingSort::update_children( + child_plan, + vec![child_node.clone()], + )?; + child_node.sort_connection = true; } (None, Some(_)) => { // We have a `SortExec` whose effect may be neutralized by // another order-imposing operator. Remove this sort. if !plan.maintains_input_order()[idx] || is_union(&plan) { - update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?; + update_child_to_remove_unnecessary_sort(idx, child_node, &plan)?; } } (None, None) => { - update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?; + update_child_to_remove_unnecessary_sort(idx, child_node, &plan)?; } } } // For window expressions, we can remove some sorts when we can // calculate the result in reverse: - if is_window(&plan) { - if let Some(tree) = &mut sort_onwards[0] { - if let Some(result) = analyze_window_sort_removal(tree, &plan)? { - return Ok(Transformed::Yes(result)); - } + if is_window(&plan) && children_nodes[0].sort_connection { + if let Some(result) = analyze_window_sort_removal(&mut children_nodes[0], &plan)? + { + return Ok(Transformed::Yes(result)); } } else if is_sort_preserving_merge(&plan) - && children[0].output_partitioning().partition_count() <= 1 + && children_nodes[0] + .plan + .output_partitioning() + .partition_count() + <= 1 { // This SortPreservingMergeExec is unnecessary, input already has a // single partition. - sort_onwards.truncate(1); - return Ok(Transformed::Yes(PlanWithCorrespondingSort { - plan: children.swap_remove(0), - sort_onwards, - })); + let child_node = children_nodes.swap_remove(0); + return Ok(Transformed::Yes(child_node)); } - Ok(Transformed::Yes(PlanWithCorrespondingSort { - plan: plan.with_new_children(children)?, - sort_onwards, - })) + Ok(Transformed::Yes( + PlanWithCorrespondingSort::update_children(plan, children_nodes)?, + )) } /// Analyzes a given [`SortExec`] (`plan`) to determine whether its input /// already has a finer ordering than it enforces. fn analyze_immediate_sort_removal( - plan: &Arc, - sort_onwards: &[Option], + node: &PlanWithCorrespondingSort, ) -> Option { + let PlanWithCorrespondingSort { + plan, + children_nodes, + .. + } = node; if let Some(sort_exec) = plan.as_any().downcast_ref::() { let sort_input = sort_exec.input().clone(); - // If this sort is unnecessary, we should remove it: if sort_input .equivalence_properties() @@ -533,20 +510,33 @@ fn analyze_immediate_sort_removal( sort_exec.expr().to_vec(), sort_input, )); - let new_tree = ExecTree::new( - new_plan.clone(), - 0, - sort_onwards.iter().flat_map(|e| e.clone()).collect(), - ); PlanWithCorrespondingSort { plan: new_plan, - sort_onwards: vec![Some(new_tree)], + // SortPreservingMergeExec has single child. + sort_connection: false, + children_nodes: children_nodes + .iter() + .cloned() + .map(|mut node| { + node.sort_connection = false; + node + }) + .collect(), } } else { // Remove the sort: PlanWithCorrespondingSort { plan: sort_input, - sort_onwards: sort_onwards.to_vec(), + sort_connection: false, + children_nodes: children_nodes[0] + .children_nodes + .iter() + .cloned() + .map(|mut node| { + node.sort_connection = false; + node + }) + .collect(), } }, ); @@ -558,15 +548,15 @@ fn analyze_immediate_sort_removal( /// Analyzes a [`WindowAggExec`] or a [`BoundedWindowAggExec`] to determine /// whether it may allow removing a sort. fn analyze_window_sort_removal( - sort_tree: &mut ExecTree, + sort_tree: &mut PlanWithCorrespondingSort, window_exec: &Arc, ) -> Result> { let requires_single_partition = matches!( - window_exec.required_input_distribution()[sort_tree.idx], + window_exec.required_input_distribution()[0], Distribution::SinglePartition ); - let mut window_child = - remove_corresponding_sort_from_sub_plan(sort_tree, requires_single_partition)?; + remove_corresponding_sort_from_sub_plan(sort_tree, requires_single_partition)?; + let mut window_child = sort_tree.plan.clone(); let (window_expr, new_window) = if let Some(exec) = window_exec.as_any().downcast_ref::() { ( @@ -628,9 +618,9 @@ fn analyze_window_sort_removal( /// Updates child to remove the unnecessary [`CoalescePartitionsExec`] below it. fn update_child_to_remove_coalesce( child: &mut Arc, - coalesce_onwards: &mut Option, + coalesce_onwards: &mut PlanWithCorrespondingCoalescePartitions, ) -> Result<()> { - if let Some(coalesce_onwards) = coalesce_onwards { + if coalesce_onwards.coalesce_connection { *child = remove_corresponding_coalesce_in_sub_plan(coalesce_onwards, child)?; } Ok(()) @@ -638,10 +628,10 @@ fn update_child_to_remove_coalesce( /// Removes the [`CoalescePartitionsExec`] from the plan in `coalesce_onwards`. fn remove_corresponding_coalesce_in_sub_plan( - coalesce_onwards: &mut ExecTree, + coalesce_onwards: &mut PlanWithCorrespondingCoalescePartitions, parent: &Arc, ) -> Result> { - Ok(if is_coalesce_partitions(&coalesce_onwards.plan) { + if is_coalesce_partitions(&coalesce_onwards.plan) { // We can safely use the 0th index since we have a `CoalescePartitionsExec`. let mut new_plan = coalesce_onwards.plan.children()[0].clone(); while new_plan.output_partitioning() == parent.output_partitioning() @@ -650,89 +640,113 @@ fn remove_corresponding_coalesce_in_sub_plan( { new_plan = new_plan.children().swap_remove(0) } - new_plan + Ok(new_plan) } else { let plan = coalesce_onwards.plan.clone(); let mut children = plan.children(); - for item in &mut coalesce_onwards.children { - children[item.idx] = remove_corresponding_coalesce_in_sub_plan(item, &plan)?; + for (idx, node) in coalesce_onwards.children_nodes.iter_mut().enumerate() { + if node.coalesce_connection { + children[idx] = remove_corresponding_coalesce_in_sub_plan(node, &plan)?; + } } - plan.with_new_children(children)? - }) + plan.with_new_children(children) + } } /// Updates child to remove the unnecessary sort below it. fn update_child_to_remove_unnecessary_sort( - child: &mut Arc, - sort_onwards: &mut Option, + child_idx: usize, + sort_onwards: &mut PlanWithCorrespondingSort, parent: &Arc, ) -> Result<()> { - if let Some(sort_onwards) = sort_onwards { + if sort_onwards.sort_connection { let requires_single_partition = matches!( - parent.required_input_distribution()[sort_onwards.idx], + parent.required_input_distribution()[child_idx], Distribution::SinglePartition ); - *child = remove_corresponding_sort_from_sub_plan( - sort_onwards, - requires_single_partition, - )?; + remove_corresponding_sort_from_sub_plan(sort_onwards, requires_single_partition)?; } - *sort_onwards = None; + sort_onwards.sort_connection = false; Ok(()) } /// Removes the sort from the plan in `sort_onwards`. fn remove_corresponding_sort_from_sub_plan( - sort_onwards: &mut ExecTree, + sort_onwards: &mut PlanWithCorrespondingSort, requires_single_partition: bool, -) -> Result> { +) -> Result<()> { // A `SortExec` is always at the bottom of the tree. - let mut updated_plan = if is_sort(&sort_onwards.plan) { - sort_onwards.plan.children().swap_remove(0) + if is_sort(&sort_onwards.plan) { + *sort_onwards = sort_onwards.children_nodes.swap_remove(0); } else { - let plan = &sort_onwards.plan; - let mut children = plan.children(); - for item in &mut sort_onwards.children { - let requires_single_partition = matches!( - plan.required_input_distribution()[item.idx], - Distribution::SinglePartition - ); - children[item.idx] = - remove_corresponding_sort_from_sub_plan(item, requires_single_partition)?; + let PlanWithCorrespondingSort { + plan, + sort_connection: _, + children_nodes, + } = sort_onwards; + let mut any_connection = false; + for (child_idx, child_node) in children_nodes.iter_mut().enumerate() { + if child_node.sort_connection { + any_connection = true; + let requires_single_partition = matches!( + plan.required_input_distribution()[child_idx], + Distribution::SinglePartition + ); + remove_corresponding_sort_from_sub_plan( + child_node, + requires_single_partition, + )?; + } } + if any_connection || children_nodes.is_empty() { + *sort_onwards = PlanWithCorrespondingSort::update_children( + plan.clone(), + children_nodes.clone(), + )?; + } + let PlanWithCorrespondingSort { + plan, + children_nodes, + .. + } = sort_onwards; // Replace with variants that do not preserve order. if is_sort_preserving_merge(plan) { - children.swap_remove(0) + children_nodes.swap_remove(0); + *plan = plan.children().swap_remove(0); } else if let Some(repartition) = plan.as_any().downcast_ref::() { - Arc::new( - // By default, RepartitionExec does not preserve order - RepartitionExec::try_new( - children.swap_remove(0), - repartition.partitioning().clone(), - )?, - ) - } else { - plan.clone().with_new_children(children)? + *plan = Arc::new(RepartitionExec::try_new( + children_nodes[0].plan.clone(), + repartition.output_partitioning(), + )?) as _; } }; // Deleting a merging sort may invalidate distribution requirements. // Ensure that we stay compliant with such requirements: if requires_single_partition - && updated_plan.output_partitioning().partition_count() > 1 + && sort_onwards.plan.output_partitioning().partition_count() > 1 { // If there is existing ordering, to preserve ordering use SortPreservingMergeExec // instead of CoalescePartitionsExec. - if let Some(ordering) = updated_plan.output_ordering() { - updated_plan = Arc::new(SortPreservingMergeExec::new( + if let Some(ordering) = sort_onwards.plan.output_ordering() { + let plan = Arc::new(SortPreservingMergeExec::new( ordering.to_vec(), - updated_plan, - )); + sort_onwards.plan.clone(), + )) as _; + *sort_onwards = PlanWithCorrespondingSort::update_children( + plan, + vec![sort_onwards.clone()], + )?; } else { - updated_plan = Arc::new(CoalescePartitionsExec::new(updated_plan)); + let plan = + Arc::new(CoalescePartitionsExec::new(sort_onwards.plan.clone())) as _; + *sort_onwards = PlanWithCorrespondingSort::update_children( + plan, + vec![sort_onwards.clone()], + )?; } } - Ok(updated_plan) + Ok(()) } /// Converts an [ExecutionPlan] trait object to a [PhysicalSortExpr] slice when possible. diff --git a/datafusion/core/src/physical_optimizer/output_requirements.rs b/datafusion/core/src/physical_optimizer/output_requirements.rs index f8bf3bb965e8..4d03840d3dd3 100644 --- a/datafusion/core/src/physical_optimizer/output_requirements.rs +++ b/datafusion/core/src/physical_optimizer/output_requirements.rs @@ -147,6 +147,10 @@ impl ExecutionPlan for OutputRequirementExec { self.input.output_ordering() } + fn maintains_input_order(&self) -> Vec { + vec![true] + } + fn children(&self) -> Vec> { vec![self.input.clone()] } diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs index d59248aadf05..9e9f647d073f 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs @@ -24,13 +24,13 @@ use std::sync::Arc; use crate::config::ConfigOptions; use crate::error::Result; use crate::physical_optimizer::PhysicalOptimizerRule; -use crate::physical_plan::joins::SymmetricHashJoinExec; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; use datafusion_common::config::OptimizerOptions; use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_common::{plan_err, DataFusionError}; use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; +use datafusion_physical_plan::joins::SymmetricHashJoinExec; /// The PipelineChecker rule rejects non-runnable query plans that use /// pipeline-breaking operators on infinite input(s). @@ -70,14 +70,14 @@ impl PhysicalOptimizerRule for PipelineChecker { pub struct PipelineStatePropagator { pub(crate) plan: Arc, pub(crate) unbounded: bool, - pub(crate) children: Vec, + pub(crate) children: Vec, } impl PipelineStatePropagator { /// Constructs a new, default pipelining state. pub fn new(plan: Arc) -> Self { let children = plan.children(); - PipelineStatePropagator { + Self { plan, unbounded: false, children: children.into_iter().map(Self::new).collect(), @@ -86,10 +86,7 @@ impl PipelineStatePropagator { /// Returns the children unboundedness information. pub fn children_unbounded(&self) -> Vec { - self.children - .iter() - .map(|c| c.unbounded) - .collect::>() + self.children.iter().map(|c| c.unbounded).collect() } } @@ -109,26 +106,23 @@ impl TreeNode for PipelineStatePropagator { Ok(VisitRecursion::Continue) } - fn map_children(self, transform: F) -> Result + fn map_children(mut self, transform: F) -> Result where F: FnMut(Self) -> Result, { if !self.children.is_empty() { - let new_children = self + self.children = self .children .into_iter() .map(transform) - .collect::>>()?; - let children_plans = new_children.iter().map(|c| c.plan.clone()).collect(); - - Ok(PipelineStatePropagator { - plan: with_new_children_if_necessary(self.plan, children_plans)?.into(), - unbounded: self.unbounded, - children: new_children, - }) - } else { - Ok(self) + .collect::>()?; + self.plan = with_new_children_if_necessary( + self.plan, + self.children.iter().map(|c| c.plan.clone()).collect(), + )? + .into(); } + Ok(self) } } diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index 0ff7e9f48edc..91f3d2abc6ff 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -21,14 +21,13 @@ use std::sync::Arc; +use super::utils::is_repartition; use crate::error::Result; -use crate::physical_optimizer::utils::{is_coalesce_partitions, is_sort, ExecTree}; +use crate::physical_optimizer::utils::{is_coalesce_partitions, is_sort}; use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan}; -use super::utils::is_repartition; - use datafusion_common::config::ConfigOptions; use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion}; use datafusion_physical_plan::unbounded_output; @@ -40,80 +39,67 @@ use datafusion_physical_plan::unbounded_output; #[derive(Debug, Clone)] pub(crate) struct OrderPreservationContext { pub(crate) plan: Arc, - ordering_onwards: Vec>, + ordering_connection: bool, + children_nodes: Vec, } impl OrderPreservationContext { - /// Creates a "default" order-preservation context. + /// Creates an empty context tree. Each node has `false` connections. pub fn new(plan: Arc) -> Self { - let length = plan.children().len(); - OrderPreservationContext { + let children = plan.children(); + Self { plan, - ordering_onwards: vec![None; length], + ordering_connection: false, + children_nodes: children.into_iter().map(Self::new).collect(), } } /// Creates a new order-preservation context from those of children nodes. - pub fn new_from_children_nodes( - children_nodes: Vec, - parent_plan: Arc, - ) -> Result { - let children_plans = children_nodes - .iter() - .map(|item| item.plan.clone()) - .collect(); - let ordering_onwards = children_nodes - .into_iter() - .enumerate() - .map(|(idx, item)| { - // `ordering_onwards` tree keeps track of executors that maintain - // ordering, (or that can maintain ordering with the replacement of - // its variant) - let plan = item.plan; - let children = plan.children(); - let ordering_onwards = item.ordering_onwards; - if children.is_empty() { - // Plan has no children, there is nothing to propagate. - None - } else if ordering_onwards[0].is_none() - && ((is_repartition(&plan) && !plan.maintains_input_order()[0]) - || (is_coalesce_partitions(&plan) - && children[0].output_ordering().is_some())) - { - Some(ExecTree::new(plan, idx, vec![])) - } else { - let children = ordering_onwards - .into_iter() - .flatten() - .filter(|item| { - // Only consider operators that maintains ordering - plan.maintains_input_order()[item.idx] - || is_coalesce_partitions(&plan) - || is_repartition(&plan) - }) - .collect::>(); - if children.is_empty() { - None - } else { - Some(ExecTree::new(plan, idx, children)) - } - } - }) - .collect(); - let plan = with_new_children_if_necessary(parent_plan, children_plans)?.into(); - Ok(OrderPreservationContext { - plan, - ordering_onwards, - }) - } + pub fn update_children(mut self) -> Result { + for node in self.children_nodes.iter_mut() { + let plan = node.plan.clone(); + let children = plan.children(); + let maintains_input_order = plan.maintains_input_order(); + let inspect_child = |idx| { + maintains_input_order[idx] + || is_coalesce_partitions(&plan) + || is_repartition(&plan) + }; + + // We cut the path towards nodes that do not maintain ordering. + for (idx, c) in node.children_nodes.iter_mut().enumerate() { + c.ordering_connection &= inspect_child(idx); + } + + node.ordering_connection = if children.is_empty() { + false + } else if !node.children_nodes[0].ordering_connection + && ((is_repartition(&plan) && !maintains_input_order[0]) + || (is_coalesce_partitions(&plan) + && children[0].output_ordering().is_some())) + { + // We either have a RepartitionExec or a CoalescePartitionsExec + // and they lose their input ordering, so initiate connection: + true + } else { + // Maintain connection if there is a child with a connection, + // and operator can possibly maintain that connection (either + // in its current form or when we replace it with the corresponding + // order preserving operator). + node.children_nodes + .iter() + .enumerate() + .any(|(idx, c)| c.ordering_connection && inspect_child(idx)) + } + } - /// Computes order-preservation contexts for every child of the plan. - pub fn children(&self) -> Vec { - self.plan - .children() - .into_iter() - .map(OrderPreservationContext::new) - .collect() + self.plan = with_new_children_if_necessary( + self.plan, + self.children_nodes.iter().map(|c| c.plan.clone()).collect(), + )? + .into(); + self.ordering_connection = false; + Ok(self) } } @@ -122,8 +108,8 @@ impl TreeNode for OrderPreservationContext { where F: FnMut(&Self) -> Result, { - for child in self.children() { - match op(&child)? { + for child in &self.children_nodes { + match op(child)? { VisitRecursion::Continue => {} VisitRecursion::Skip => return Ok(VisitRecursion::Continue), VisitRecursion::Stop => return Ok(VisitRecursion::Stop), @@ -132,68 +118,88 @@ impl TreeNode for OrderPreservationContext { Ok(VisitRecursion::Continue) } - fn map_children(self, transform: F) -> Result + fn map_children(mut self, transform: F) -> Result where F: FnMut(Self) -> Result, { - let children = self.children(); - if children.is_empty() { - Ok(self) - } else { - let children_nodes = children + if !self.children_nodes.is_empty() { + self.children_nodes = self + .children_nodes .into_iter() .map(transform) - .collect::>>()?; - OrderPreservationContext::new_from_children_nodes(children_nodes, self.plan) + .collect::>()?; + self.plan = with_new_children_if_necessary( + self.plan, + self.children_nodes.iter().map(|c| c.plan.clone()).collect(), + )? + .into(); } + Ok(self) } } -/// Calculates the updated plan by replacing executors that lose ordering -/// inside the `ExecTree` with their order-preserving variants. This will +/// Calculates the updated plan by replacing operators that lose ordering +/// inside `sort_input` with their order-preserving variants. This will /// generate an alternative plan, which will be accepted or rejected later on /// depending on whether it helps us remove a `SortExec`. fn get_updated_plan( - exec_tree: &ExecTree, + mut sort_input: OrderPreservationContext, // Flag indicating that it is desirable to replace `RepartitionExec`s with // `SortPreservingRepartitionExec`s: is_spr_better: bool, // Flag indicating that it is desirable to replace `CoalescePartitionsExec`s // with `SortPreservingMergeExec`s: is_spm_better: bool, -) -> Result> { - let plan = exec_tree.plan.clone(); +) -> Result { + let updated_children = sort_input + .children_nodes + .clone() + .into_iter() + .map(|item| { + // Update children and their descendants in the given tree if the connection is open: + if item.ordering_connection { + get_updated_plan(item, is_spr_better, is_spm_better) + } else { + Ok(item) + } + }) + .collect::>>()?; - let mut children = plan.children(); - // Update children and their descendants in the given tree: - for item in &exec_tree.children { - children[item.idx] = get_updated_plan(item, is_spr_better, is_spm_better)?; - } - // Construct the plan with updated children: - let mut plan = plan.with_new_children(children)?; + sort_input.plan = sort_input + .plan + .with_new_children(updated_children.iter().map(|c| c.plan.clone()).collect())?; + sort_input.ordering_connection = false; + sort_input.children_nodes = updated_children; // When a `RepartitionExec` doesn't preserve ordering, replace it with - // a `SortPreservingRepartitionExec` if appropriate: - if is_repartition(&plan) && !plan.maintains_input_order()[0] && is_spr_better { - let child = plan.children().swap_remove(0); - let repartition = RepartitionExec::try_new(child, plan.output_partitioning())? - .with_preserve_order(); - plan = Arc::new(repartition) as _ - } - // When the input of a `CoalescePartitionsExec` has an ordering, replace it - // with a `SortPreservingMergeExec` if appropriate: - let mut children = plan.children(); - if is_coalesce_partitions(&plan) - && children[0].output_ordering().is_some() - && is_spm_better + // a sort-preserving variant if appropriate: + if is_repartition(&sort_input.plan) + && !sort_input.plan.maintains_input_order()[0] + && is_spr_better { - let child = children.swap_remove(0); - plan = Arc::new(SortPreservingMergeExec::new( - child.output_ordering().unwrap_or(&[]).to_vec(), - child, - )) as _ + let child = sort_input.plan.children().swap_remove(0); + let repartition = + RepartitionExec::try_new(child, sort_input.plan.output_partitioning())? + .with_preserve_order(); + sort_input.plan = Arc::new(repartition) as _; + sort_input.children_nodes[0].ordering_connection = true; + } else if is_coalesce_partitions(&sort_input.plan) && is_spm_better { + // When the input of a `CoalescePartitionsExec` has an ordering, replace it + // with a `SortPreservingMergeExec` if appropriate: + if let Some(ordering) = sort_input.children_nodes[0] + .plan + .output_ordering() + .map(|o| o.to_vec()) + { + // Now we can mutate `new_node.children_nodes` safely + let child = sort_input.children_nodes.clone().swap_remove(0); + sort_input.plan = + Arc::new(SortPreservingMergeExec::new(ordering, child.plan)) as _; + sort_input.children_nodes[0].ordering_connection = true; + } } - Ok(plan) + + Ok(sort_input) } /// The `replace_with_order_preserving_variants` optimizer sub-rule tries to @@ -211,11 +217,11 @@ fn get_updated_plan( /// /// The algorithm flow is simply like this: /// 1. Visit nodes of the physical plan bottom-up and look for `SortExec` nodes. -/// 1_1. During the traversal, build an `ExecTree` to keep track of operators -/// that maintain ordering (or can maintain ordering when replaced by an -/// order-preserving variant) until a `SortExec` is found. +/// 1_1. During the traversal, keep track of operators that maintain ordering +/// (or can maintain ordering when replaced by an order-preserving variant) until +/// a `SortExec` is found. /// 2. When a `SortExec` is found, update the child of the `SortExec` by replacing -/// operators that do not preserve ordering in the `ExecTree` with their order +/// operators that do not preserve ordering in the tree with their order /// preserving variants. /// 3. Check if the `SortExec` is still necessary in the updated plan by comparing /// its input ordering with the output ordering it imposes. We do this because @@ -239,37 +245,41 @@ pub(crate) fn replace_with_order_preserving_variants( is_spm_better: bool, config: &ConfigOptions, ) -> Result> { - let plan = &requirements.plan; - let ordering_onwards = &requirements.ordering_onwards; - if is_sort(plan) { - let exec_tree = if let Some(exec_tree) = &ordering_onwards[0] { - exec_tree - } else { - return Ok(Transformed::No(requirements)); - }; - // For unbounded cases, replace with the order-preserving variant in - // any case, as doing so helps fix the pipeline. - // Also do the replacement if opted-in via config options. - let use_order_preserving_variant = - config.optimizer.prefer_existing_sort || unbounded_output(plan); - let updated_sort_input = get_updated_plan( - exec_tree, - is_spr_better || use_order_preserving_variant, - is_spm_better || use_order_preserving_variant, - )?; - // If this sort is unnecessary, we should remove it and update the plan: - if updated_sort_input - .equivalence_properties() - .ordering_satisfy(plan.output_ordering().unwrap_or(&[])) - { - return Ok(Transformed::Yes(OrderPreservationContext { - plan: updated_sort_input, - ordering_onwards: vec![None], - })); - } + let mut requirements = requirements.update_children()?; + if !(is_sort(&requirements.plan) + && requirements.children_nodes[0].ordering_connection) + { + return Ok(Transformed::No(requirements)); } - Ok(Transformed::No(requirements)) + // For unbounded cases, replace with the order-preserving variant in + // any case, as doing so helps fix the pipeline. + // Also do the replacement if opted-in via config options. + let use_order_preserving_variant = + config.optimizer.prefer_existing_sort || unbounded_output(&requirements.plan); + + let mut updated_sort_input = get_updated_plan( + requirements.children_nodes.clone().swap_remove(0), + is_spr_better || use_order_preserving_variant, + is_spm_better || use_order_preserving_variant, + )?; + + // If this sort is unnecessary, we should remove it and update the plan: + if updated_sort_input + .plan + .equivalence_properties() + .ordering_satisfy(requirements.plan.output_ordering().unwrap_or(&[])) + { + for child in updated_sort_input.children_nodes.iter_mut() { + child.ordering_connection = false; + } + Ok(Transformed::Yes(updated_sort_input)) + } else { + for child in requirements.children_nodes.iter_mut() { + child.ordering_connection = false; + } + Ok(Transformed::Yes(requirements)) + } } #[cfg(test)] diff --git a/datafusion/core/src/physical_optimizer/sort_pushdown.rs b/datafusion/core/src/physical_optimizer/sort_pushdown.rs index b9502d92ac12..b0013863010a 100644 --- a/datafusion/core/src/physical_optimizer/sort_pushdown.rs +++ b/datafusion/core/src/physical_optimizer/sort_pushdown.rs @@ -36,8 +36,6 @@ use datafusion_physical_expr::{ LexRequirementRef, PhysicalSortExpr, PhysicalSortRequirement, }; -use itertools::izip; - /// This is a "data class" we use within the [`EnforceSorting`] rule to push /// down [`SortExec`] in the plan. In some cases, we can reduce the total /// computational cost by pushing down `SortExec`s through some executors. @@ -49,35 +47,26 @@ pub(crate) struct SortPushDown { pub plan: Arc, /// Parent required sort ordering required_ordering: Option>, - /// The adjusted request sort ordering to children. - /// By default they are the same as the plan's required input ordering, but can be adjusted based on parent required sort ordering properties. - adjusted_request_ordering: Vec>>, + children_nodes: Vec, } impl SortPushDown { - pub fn init(plan: Arc) -> Self { - let request_ordering = plan.required_input_ordering(); - SortPushDown { + /// Creates an empty tree with empty `required_ordering`'s. + pub fn new(plan: Arc) -> Self { + let children = plan.children(); + Self { plan, required_ordering: None, - adjusted_request_ordering: request_ordering, + children_nodes: children.into_iter().map(Self::new).collect(), } } - pub fn children(&self) -> Vec { - izip!( - self.plan.children().into_iter(), - self.adjusted_request_ordering.clone().into_iter(), - ) - .map(|(child, from_parent)| { - let child_request_ordering = child.required_input_ordering(); - SortPushDown { - plan: child, - required_ordering: from_parent, - adjusted_request_ordering: child_request_ordering, - } - }) - .collect() + /// Assigns the ordering requirement of the root node to the its children. + pub fn assign_initial_requirements(&mut self) { + let reqs = self.plan.required_input_ordering(); + for (child, requirement) in self.children_nodes.iter_mut().zip(reqs) { + child.required_ordering = requirement; + } } } @@ -86,9 +75,8 @@ impl TreeNode for SortPushDown { where F: FnMut(&Self) -> Result, { - let children = self.children(); - for child in children { - match op(&child)? { + for child in &self.children_nodes { + match op(child)? { VisitRecursion::Continue => {} VisitRecursion::Skip => return Ok(VisitRecursion::Continue), VisitRecursion::Stop => return Ok(VisitRecursion::Stop), @@ -97,64 +85,64 @@ impl TreeNode for SortPushDown { Ok(VisitRecursion::Continue) } - fn map_children(mut self, transform: F) -> Result where F: FnMut(Self) -> Result, { - let children = self.children(); - if !children.is_empty() { - let children_plans = children + if !self.children_nodes.is_empty() { + self.children_nodes = self + .children_nodes .into_iter() .map(transform) - .map(|r| r.map(|s| s.plan)) - .collect::>>()?; - - match with_new_children_if_necessary(self.plan, children_plans)? { - Transformed::Yes(plan) | Transformed::No(plan) => { - self.plan = plan; - } - } - }; + .collect::>()?; + self.plan = with_new_children_if_necessary( + self.plan, + self.children_nodes.iter().map(|c| c.plan.clone()).collect(), + )? + .into(); + } Ok(self) } } pub(crate) fn pushdown_sorts( - requirements: SortPushDown, + mut requirements: SortPushDown, ) -> Result> { let plan = &requirements.plan; let parent_required = requirements.required_ordering.as_deref().unwrap_or(&[]); + if let Some(sort_exec) = plan.as_any().downcast_ref::() { - let new_plan = if !plan + if !plan .equivalence_properties() .ordering_satisfy_requirement(parent_required) { // If the current plan is a SortExec, modify it to satisfy parent requirements: let mut new_plan = sort_exec.input().clone(); add_sort_above(&mut new_plan, parent_required, sort_exec.fetch()); - new_plan - } else { - requirements.plan + requirements.plan = new_plan; }; - let required_ordering = new_plan + + let required_ordering = requirements + .plan .output_ordering() .map(PhysicalSortRequirement::from_sort_exprs) .unwrap_or_default(); // Since new_plan is a SortExec, we can safely get the 0th index. - let child = new_plan.children().swap_remove(0); + let mut child = requirements.children_nodes.swap_remove(0); if let Some(adjusted) = - pushdown_requirement_to_children(&child, &required_ordering)? + pushdown_requirement_to_children(&child.plan, &required_ordering)? { + for (c, o) in child.children_nodes.iter_mut().zip(adjusted) { + c.required_ordering = o; + } // Can push down requirements - Ok(Transformed::Yes(SortPushDown { - plan: child, - required_ordering: None, - adjusted_request_ordering: adjusted, - })) + child.required_ordering = None; + Ok(Transformed::Yes(child)) } else { // Can not push down requirements - Ok(Transformed::Yes(SortPushDown::init(new_plan))) + let mut empty_node = SortPushDown::new(requirements.plan); + empty_node.assign_initial_requirements(); + Ok(Transformed::Yes(empty_node)) } } else { // Executors other than SortExec @@ -163,23 +151,27 @@ pub(crate) fn pushdown_sorts( .ordering_satisfy_requirement(parent_required) { // Satisfies parent requirements, immediately return. - return Ok(Transformed::Yes(SortPushDown { - required_ordering: None, - ..requirements - })); + let reqs = requirements.plan.required_input_ordering(); + for (child, order) in requirements.children_nodes.iter_mut().zip(reqs) { + child.required_ordering = order; + } + return Ok(Transformed::Yes(requirements)); } // Can not satisfy the parent requirements, check whether the requirements can be pushed down: if let Some(adjusted) = pushdown_requirement_to_children(plan, parent_required)? { - Ok(Transformed::Yes(SortPushDown { - plan: requirements.plan, - required_ordering: None, - adjusted_request_ordering: adjusted, - })) + for (c, o) in requirements.children_nodes.iter_mut().zip(adjusted) { + c.required_ordering = o; + } + requirements.required_ordering = None; + Ok(Transformed::Yes(requirements)) } else { // Can not push down requirements, add new SortExec: let mut new_plan = requirements.plan; add_sort_above(&mut new_plan, parent_required, None); - Ok(Transformed::Yes(SortPushDown::init(new_plan))) + let mut new_empty = SortPushDown::new(new_plan); + new_empty.assign_initial_requirements(); + // Can not push down requirements + Ok(Transformed::Yes(new_empty)) } } } @@ -297,10 +289,11 @@ fn pushdown_requirement_to_children( // TODO: Add support for Projection push down } -/// Determine the children requirements -/// If the children requirements are more specific, do not push down the parent requirements -/// If the the parent requirements are more specific, push down the parent requirements -/// If they are not compatible, need to add Sort. +/// Determine children requirements: +/// - If children requirements are more specific, do not push down parent +/// requirements. +/// - If parent requirements are more specific, push down parent requirements. +/// - If they are not compatible, need to add a sort. fn determine_children_requirement( parent_required: LexRequirementRef, request_child: LexRequirementRef, @@ -310,18 +303,15 @@ fn determine_children_requirement( .equivalence_properties() .requirements_compatible(request_child, parent_required) { - // request child requirements are more specific, no need to push down the parent requirements + // Child requirements are more specific, no need to push down. RequirementsCompatibility::Satisfy } else if child_plan .equivalence_properties() .requirements_compatible(parent_required, request_child) { - // parent requirements are more specific, adjust the request child requirements and push down the new requirements - let adjusted = if parent_required.is_empty() { - None - } else { - Some(parent_required.to_vec()) - }; + // Parent requirements are more specific, adjust child's requirements + // and push down the new requirements: + let adjusted = (!parent_required.is_empty()).then(|| parent_required.to_vec()); RequirementsCompatibility::Compatible(adjusted) } else { RequirementsCompatibility::NonCompatible diff --git a/datafusion/core/src/physical_optimizer/utils.rs b/datafusion/core/src/physical_optimizer/utils.rs index fccc1db0d359..f8063e969422 100644 --- a/datafusion/core/src/physical_optimizer/utils.rs +++ b/datafusion/core/src/physical_optimizer/utils.rs @@ -17,83 +17,18 @@ //! Collection of utility functions that are leveraged by the query optimizer rules -use std::fmt; -use std::fmt::Formatter; use std::sync::Arc; use crate::physical_plan::coalesce_partitions::CoalescePartitionsExec; -use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec; use crate::physical_plan::union::UnionExec; use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec}; -use crate::physical_plan::{get_plan_string, ExecutionPlan}; +use crate::physical_plan::ExecutionPlan; use datafusion_physical_expr::{LexRequirementRef, PhysicalSortRequirement}; - -/// This object implements a tree that we use while keeping track of paths -/// leading to [`SortExec`]s. -#[derive(Debug, Clone)] -pub(crate) struct ExecTree { - /// The `ExecutionPlan` associated with this node - pub plan: Arc, - /// Child index of the plan in its parent - pub idx: usize, - /// Children of the plan that would need updating if we remove leaf executors - pub children: Vec, -} - -impl fmt::Display for ExecTree { - fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { - let plan_string = get_plan_string(&self.plan); - write!(f, "\nidx: {:?}", self.idx)?; - write!(f, "\nplan: {:?}", plan_string)?; - for child in self.children.iter() { - write!(f, "\nexec_tree:{}", child)?; - } - writeln!(f) - } -} - -impl ExecTree { - /// Create new Exec tree - pub fn new( - plan: Arc, - idx: usize, - children: Vec, - ) -> Self { - ExecTree { - plan, - idx, - children, - } - } -} - -/// Get `ExecTree` for each child of the plan if they are tracked. -/// # Arguments -/// -/// * `n_children` - Children count of the plan of interest -/// * `onward` - Contains `Some(ExecTree)` of the plan tracked. -/// - Contains `None` is plan is not tracked. -/// -/// # Returns -/// -/// A `Vec>` that contains tracking information of each child. -/// If a child is `None`, it is not tracked. If `Some(ExecTree)` child is tracked also. -pub(crate) fn get_children_exectrees( - n_children: usize, - onward: &Option, -) -> Vec> { - let mut children_onward = vec![None; n_children]; - if let Some(exec_tree) = &onward { - for child in &exec_tree.children { - children_onward[child.idx] = Some(child.clone()); - } - } - children_onward -} +use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; /// This utility function adds a `SortExec` above an operator according to the /// given ordering requirements while preserving the original partitioning. diff --git a/datafusion/physical-expr/src/sort_properties.rs b/datafusion/physical-expr/src/sort_properties.rs index f51374461776..91238e5b04b4 100644 --- a/datafusion/physical-expr/src/sort_properties.rs +++ b/datafusion/physical-expr/src/sort_properties.rs @@ -151,7 +151,7 @@ impl Neg for SortProperties { pub struct ExprOrdering { pub expr: Arc, pub state: SortProperties, - pub children: Vec, + pub children: Vec, } impl ExprOrdering { @@ -191,15 +191,13 @@ impl TreeNode for ExprOrdering { where F: FnMut(Self) -> Result, { - if self.children.is_empty() { - Ok(self) - } else { + if !self.children.is_empty() { self.children = self .children .into_iter() .map(transform) - .collect::>>()?; - Ok(self) + .collect::>()?; } + Ok(self) } } diff --git a/datafusion/physical-plan/src/union.rs b/datafusion/physical-plan/src/union.rs index 14ef9c2ec27b..d01ea5507449 100644 --- a/datafusion/physical-plan/src/union.rs +++ b/datafusion/physical-plan/src/union.rs @@ -21,6 +21,7 @@ //! The Union operator combines multiple inputs with the same schema +use std::borrow::Borrow; use std::pin::Pin; use std::task::{Context, Poll}; use std::{any::Any, sync::Arc}; @@ -336,7 +337,7 @@ impl InterleaveExec { pub fn try_new(inputs: Vec>) -> Result { let schema = union_schema(&inputs); - if !can_interleave(&inputs) { + if !can_interleave(inputs.iter()) { return internal_err!( "Not all InterleaveExec children have a consistent hash partitioning" ); @@ -474,17 +475,18 @@ impl ExecutionPlan for InterleaveExec { /// It might be too strict here in the case that the input partition specs are compatible but not exactly the same. /// For example one input partition has the partition spec Hash('a','b','c') and /// other has the partition spec Hash('a'), It is safe to derive the out partition with the spec Hash('a','b','c'). -pub fn can_interleave(inputs: &[Arc]) -> bool { - if inputs.is_empty() { +pub fn can_interleave>>( + mut inputs: impl Iterator, +) -> bool { + let Some(first) = inputs.next() else { return false; - } + }; - let first_input_partition = inputs[0].output_partitioning(); - matches!(first_input_partition, Partitioning::Hash(_, _)) + let reference = first.borrow().output_partitioning(); + matches!(reference, Partitioning::Hash(_, _)) && inputs - .iter() - .map(|plan| plan.output_partitioning()) - .all(|partition| partition == first_input_partition) + .map(|plan| plan.borrow().output_partitioning()) + .all(|partition| partition == reference) } fn union_schema(inputs: &[Arc]) -> SchemaRef {