Skip to content
Merged
30 changes: 11 additions & 19 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,29 +177,21 @@ fn optimize_partitions(
let children = plan
.children()
.iter()
.map(|child| {
// does plan itelf (not parent) require its input to
.enumerate()
.map(|(idx, child)| {
// Does plan itself (not its parent) require its input to
// be sorted in some way?
let required_input_ordering =
plan_has_required_input_ordering(plan.as_ref());

let can_reorder_child = if can_reorder {
// parent of `plan` will not use any particular order

// if `plan` itself doesn't need order OR
!required_input_ordering ||
// child has no order to preserve
child.output_ordering().is_none()
} else {
// parent would like to use the `plan`'s output
// order.

// if `plan` doesn't maintain the input order and
// doesn't need the child's output order itself
(!plan.maintains_input_order() && !required_input_ordering) ||
// child has no ordering to preserve
child.output_ordering().is_none()
};
// We can reorder a child if:
// - It has no ordering to preserve, or
// - Its parent has no required input ordering and does not
// maintain input ordering.
// Check if this condition holds:
let can_reorder_child = child.output_ordering().is_none()
|| (!required_input_ordering
&& (can_reorder || !plan.maintains_input_order()[idx]));

optimize_partitions(
target_partitions,
Expand Down
115 changes: 98 additions & 17 deletions datafusion/core/src/physical_optimizer/sort_enforcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@
//! by another SortExec. Therefore, this rule removes it from the physical plan.
use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_optimizer::utils::{
add_sort_above_child, ordering_satisfy, ordering_satisfy_concrete,
};
use crate::physical_optimizer::utils::add_sort_above_child;
use crate::physical_optimizer::PhysicalOptimizerRule;
use crate::physical_plan::rewrite::TreeNodeRewritable;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::windows::{BoundedWindowAggExec, WindowAggExec};
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use arrow::datatypes::SchemaRef;
use datafusion_common::{reverse_sort_options, DataFusionError};
use datafusion_physical_expr::utils::{ordering_satisfy, ordering_satisfy_concrete};
use datafusion_physical_expr::window::WindowExpr;
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
use itertools::izip;
Expand Down Expand Up @@ -108,13 +107,21 @@ impl TreeNodeRewritable for PlanWithCorrespondingSort {
let sort_onwards = children_requirements
.iter()
.map(|item| {
if item.sort_onwards.is_empty() {
vec![]
} else {
// TODO: When `maintains_input_order` returns Vec<bool>,
// pass the order-enforcing sort upwards.
item.sort_onwards[0].clone()
let onwards = &item.sort_onwards;
if !onwards.is_empty() {
let flags = item.plan.maintains_input_order();
// `onwards` starts from sort introducing executor(e.g `SortExec`, `SortPreservingMergeExec`) till the current executor
// if the executors in between maintain input ordering. If we are at
// the beginning both `SortExec` and `SortPreservingMergeExec` doesn't maintain ordering(they introduce ordering).
// However, we want to propagate them above anyway.
for (maintains, element) in flags.into_iter().zip(onwards.iter())
{
if (maintains || is_sort(&item.plan)) && !element.is_empty() {
return element.clone();
}
}
}
vec![]
})
.collect::<Vec<_>>();
let plan = with_new_children_if_necessary(self.plan, children_plans)?;
Expand Down Expand Up @@ -144,6 +151,12 @@ impl PhysicalOptimizerRule for EnforceSorting {
}
}

// Checks whether executor is Sort
// TODO: Add support for SortPreservingMergeExec also.
fn is_sort(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<SortExec>()
}

fn ensure_sorting(
requirements: PlanWithCorrespondingSort,
) -> Result<Option<PlanWithCorrespondingSort>> {
Expand Down Expand Up @@ -230,7 +243,7 @@ fn ensure_sorting(
(None, Some(_)) => {
// We have a SortExec whose effect may be neutralized by a order-imposing
// operator. In this case, remove this sort:
if !requirements.plan.maintains_input_order() {
if !requirements.plan.maintains_input_order()[idx] {
update_child_to_remove_unnecessary_sort(child, sort_onwards)?;
}
}
Expand All @@ -247,15 +260,14 @@ fn ensure_sorting(
.enumerate()
.take(new_plan.children().len())
{
// TODO: When `maintains_input_order` returns a `Vec<bool>`, use corresponding index.
if new_plan.maintains_input_order()
if new_plan.maintains_input_order()[idx]
&& required_ordering.is_none()
&& !trace.is_empty()
{
trace.push((idx, new_plan.clone()));
} else {
trace.clear();
if new_plan.as_any().is::<SortExec>() {
if is_sort(&new_plan) {
trace.push((idx, new_plan.clone()));
}
}
Expand Down Expand Up @@ -378,17 +390,15 @@ fn convert_to_sort_exec(sort_any: &Arc<dyn ExecutionPlan>) -> Result<&SortExec>
fn remove_corresponding_sort_from_sub_plan(
sort_onwards: &mut Vec<(usize, Arc<dyn ExecutionPlan>)>,
) -> Result<Arc<dyn ExecutionPlan>> {
let (sort_child_idx, sort_any) = sort_onwards[0].clone();
let (_, sort_any) = sort_onwards[0].clone();
let sort_exec = convert_to_sort_exec(&sort_any)?;
let mut prev_layer = sort_exec.input().clone();
let mut prev_child_idx = sort_child_idx;
// In the loop below, se start from 1 as the first one is a SortExec
// and we are removing it from the plan.
for (child_idx, layer) in sort_onwards.iter().skip(1) {
let mut children = layer.children();
children[prev_child_idx] = prev_layer;
children[*child_idx] = prev_layer;
prev_layer = layer.clone().with_new_children(children)?;
prev_child_idx = *child_idx;
}
// We have removed the sort, hence empty the sort_onwards:
sort_onwards.clear();
Expand Down Expand Up @@ -816,6 +826,77 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_union_inputs_different_sorted() -> Result<()> {
let schema = create_test_schema()?;

let source1 = parquet_exec(&schema);
let sort_exprs = vec![sort_expr("nullable_col", &schema)];
let sort = sort_exec(sort_exprs.clone(), source1);

let parquet_sort_exprs = vec![
sort_expr("nullable_col", &schema),
sort_expr("non_nullable_col", &schema),
];
let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);

let union = union_exec(vec![source2, sort]);
let physical_plan = sort_preserving_merge_exec(sort_exprs, union);

// one input to the union is already sorted, one is not.
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC]",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC, non_nullable_col@1 ASC], projection=[nullable_col, non_nullable_col]",
" SortExec: [nullable_col@0 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
// should not add a sort at the output of the union, input plan should not be changed
let expected_optimized = expected_input.clone();
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}

#[tokio::test]
async fn test_union_inputs_different_sorted2() -> Result<()> {
let schema = create_test_schema()?;

let source1 = parquet_exec(&schema);
let sort_exprs = vec![
sort_expr("nullable_col", &schema),
sort_expr("non_nullable_col", &schema),
];
let sort = sort_exec(sort_exprs.clone(), source1);

let parquet_sort_exprs = vec![sort_expr("nullable_col", &schema)];
let source2 = parquet_exec_sorted(&schema, parquet_sort_exprs);

let union = union_exec(vec![source2, sort]);
let physical_plan = sort_preserving_merge_exec(sort_exprs, union);

// Input is an invalid plan. In this case rule should add required sorting in appropriate places.
// First ParquetExec has output ordering(nullable_col@0 ASC). However, it doesn't satisfy required ordering
// of SortPreservingMergeExec. Hence rule should remove unnecessary sort for second child of the UnionExec
// and put a sort above Union to satisfy required ordering.
let expected_input = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
" SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
// should remove unnecessary sorting from below and move it to top
let expected_optimized = vec![
"SortPreservingMergeExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" SortExec: [nullable_col@0 ASC,non_nullable_col@1 ASC]",
" UnionExec",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, output_ordering=[nullable_col@0 ASC], projection=[nullable_col, non_nullable_col]",
" ParquetExec: limit=None, partitions={1 group: [[x]]}, projection=[nullable_col, non_nullable_col]",
];
assert_optimized!(expected_input, expected_optimized, physical_plan);
Ok(())
}

/// make PhysicalSortExpr with default options
fn sort_expr(name: &str, schema: &Schema) -> PhysicalSortExpr {
sort_expr_options(name, schema, SortOptions::default())
Expand Down
55 changes: 1 addition & 54 deletions datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,7 @@ use crate::config::ConfigOptions;
use crate::error::Result;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::{with_new_children_if_necessary, ExecutionPlan};
use datafusion_physical_expr::{
normalize_sort_expr_with_equivalence_properties, EquivalenceProperties,
PhysicalSortExpr,
};
use datafusion_physical_expr::PhysicalSortExpr;
use std::sync::Arc;

/// Convenience rule for writing optimizers: recursively invoke
Expand All @@ -51,56 +48,6 @@ pub fn optimize_children(
}
}

/// Checks whether given ordering requirements are satisfied by provided [PhysicalSortExpr]s.
pub fn ordering_satisfy<F: FnOnce() -> EquivalenceProperties>(
provided: Option<&[PhysicalSortExpr]>,
required: Option<&[PhysicalSortExpr]>,
equal_properties: F,
) -> bool {
match (provided, required) {
(_, None) => true,
(None, Some(_)) => false,
(Some(provided), Some(required)) => {
ordering_satisfy_concrete(provided, required, equal_properties)
}
}
}

pub fn ordering_satisfy_concrete<F: FnOnce() -> EquivalenceProperties>(
provided: &[PhysicalSortExpr],
required: &[PhysicalSortExpr],
equal_properties: F,
) -> bool {
if required.len() > provided.len() {
false
} else if required
.iter()
.zip(provided.iter())
.all(|(order1, order2)| order1.eq(order2))
{
true
} else if let eq_classes @ [_, ..] = equal_properties().classes() {
let normalized_required_exprs = required
.iter()
.map(|e| {
normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes)
})
.collect::<Vec<_>>();
let normalized_provided_exprs = provided
.iter()
.map(|e| {
normalize_sort_expr_with_equivalence_properties(e.clone(), eq_classes)
})
.collect::<Vec<_>>();
normalized_required_exprs
.iter()
.zip(normalized_provided_exprs.iter())
.all(|(order1, order2)| order1.eq(order2))
} else {
false
}
}

/// Util function to add SortExec above child
/// preserving the original partitioning
pub fn add_sort_above_child(
Expand Down
63 changes: 63 additions & 0 deletions datafusion/core/src/physical_plan/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ use arrow::error::ArrowError;
use arrow::error::Result as ArrowResult;
use arrow::ipc::writer::{FileWriter, IpcWriteOptions};
use arrow::record_batch::RecordBatch;
use datafusion_physical_expr::utils::ordering_satisfy;
use datafusion_physical_expr::PhysicalSortExpr;
use futures::{Future, Stream, StreamExt, TryStreamExt};
use log::debug;
use pin_project_lite::pin_project;
Expand Down Expand Up @@ -322,15 +324,76 @@ pub fn transpose<T>(original: Vec<Vec<T>>) -> Vec<Vec<T>> {
}
}

/// Calculates the "meet" of children orderings
/// The meet is the finest ordering that satisfied by all the input
/// orderings, see https://en.wikipedia.org/wiki/Join_and_meet.
pub fn get_meet_of_orderings(
children: &[Arc<dyn ExecutionPlan>],
) -> Option<&[PhysicalSortExpr]> {
// To find the meet, we first find the smallest input ordering.
let mut smallest: Option<&[PhysicalSortExpr]> = None;
for item in children.iter() {
if let Some(ordering) = item.output_ordering() {
smallest = match smallest {
None => Some(ordering),
Some(expr) if ordering.len() < expr.len() => Some(ordering),
_ => continue,
}
} else {
return None;
}
}
// Check if the smallest ordering is a meet or not:
if children.iter().all(|child| {
ordering_satisfy(child.output_ordering(), smallest, || {
child.equivalence_properties()
})
}) {
smallest
} else {
None
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::from_slice::FromSlice;
use crate::physical_plan::memory::MemoryExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::union::UnionExec;
use arrow::compute::SortOptions;
use arrow::{
array::{Float32Array, Float64Array},
datatypes::{DataType, Field, Schema},
record_batch::RecordBatch,
};
use datafusion_physical_expr::expressions::col;

#[test]
fn test_meet_of_orderings() -> Result<()> {
let schema = Arc::new(Schema::new(vec![
Field::new("f32", DataType::Float32, false),
Field::new("f64", DataType::Float64, false),
]));
let sort_expr = vec![PhysicalSortExpr {
expr: col("f32", &schema).unwrap(),
options: SortOptions::default(),
}];
let memory_exec = Arc::new(MemoryExec::try_new(&[], schema.clone(), None)?) as _;
let sort_exec = Arc::new(SortExec::try_new(sort_expr.clone(), memory_exec, None)?)
as Arc<dyn ExecutionPlan>;
let memory_exec2 = Arc::new(MemoryExec::try_new(&[], schema, None)?) as _;
// memory_exec2 doesn't have output ordering
let union_exec = UnionExec::new(vec![sort_exec.clone(), memory_exec2]);
let res = get_meet_of_orderings(union_exec.inputs());
assert!(res.is_none());

let union_exec = UnionExec::new(vec![sort_exec.clone(), sort_exec]);
let res = get_meet_of_orderings(union_exec.inputs());
assert_eq!(res, Some(&sort_expr[..]));
Ok(())
}

#[test]
fn test_compute_record_batch_statistics_empty() -> Result<()> {
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ impl ExecutionPlan for FilterExec {
self.input.output_ordering()
}

fn maintains_input_order(&self) -> bool {
fn maintains_input_order(&self) -> Vec<bool> {
// tell optimizer this operator doesn't reorder its input
true
vec![true]
}

fn equivalence_properties(&self) -> EquivalenceProperties {
Expand Down
Loading