Skip to content

Commit c2e7680

Browse files
mustafasrepometesynnadaozankabak
authored
Combine Equivalence and Ordering equivalence to simplify state (#8006)
* combine equivalence and ordering equivalence * Remove EquivalenceProperties struct * Minor changes * all tests pass * Refactor oeq * Simplifications * Resolve linter errors * Minor changes * Minor changes * Add new tests * Simplifications window mode selection * Simplifications * Use set_satisfy api * Use utils for aggregate * Minor changes * Minor changes * Minor changes * All tests pass * Simplifications * Simplifications * Minor changes * Simplifications * All tests pass, fix bug * Remove unnecessary code * Simplifications * Minor changes * Simplifications * Move oeq join to methods * Simplifications * Remove redundant code * Minor changes * Minor changes * Simplifications * Simplifications * Simplifications * Move window to util from method, simplifications * Simplifications * Propagate meet in the union * Simplifications * Minor changes, rename * Address berkay reviews * Simplifications * Add new buggy test * Add data test for sort requirement * Add experimental check * Add random test * Minor changes * Random test gives error * Fix missing test case * Minor changes * Minor changes * Simplifications * Minor changes * Add new test case * Minor changes * Address reviews * Minor changes * Increase coverage of random tests * Remove redundant code * Simplifications * Simplifications * Refactor on tests * Solving clippy errors * prune_lex improvements * Fix failing tests * Update get_finer and get_meet * Fix window lex ordering implementation * Buggy state * Do not use output ordering in the aggregate * Add union test * Update comment * Fix bug, when batch_size is small * Review Part 1 * Review Part 2 * Change union meet implementation * Update comments * Remove redundant check * Simplify project out_expr function * Remove Option<Vec<_>> API. * Do not use project_out_expr * Simplifications * Review Part 3 * Review Part 4 * Review Part 5 * Review Part 6 * Review Part 7 * Review Part 8 * Update comments * Add new unit tests, simplifications * Resolve linter errors * Simplify test codes * Review Part 9 * Add unit tests for remove_redundant entries * Simplifications * Review Part 10 * Fix test * Add new test case, fix implementation * Review Part 11 * Review Part 12 * Update comments * Review Part 13 * Review Part 14 * Review Part 15 * Review Part 16 * Review Part 17 * Review Part 18 * Review Part 19 * Review Part 20 * Review Part 21 * Review Part 22 * Review Part 23 * Review Part 24 * Do not construct idx and sort_expr unnecessarily, Update comments, Union meet single entry * Review Part 25 * Review Part 26 * Name Changes, comment updates * Review Part 27 * Add issue links * Address reviews * Fix failing test * Update comments * SortPreservingMerge, SortPreservingRepartition only preserves given expression ordering among input ordering equivalences --------- Co-authored-by: metesynnada <100111937+metesynnada@users.noreply.github.com> Co-authored-by: Mehmet Ozan Kabak <ozankabak@gmail.com>
1 parent 2906a24 commit c2e7680

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+3989
-3850
lines changed

.github/pull_request_template.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,4 @@ If there are user-facing changes then we may require documentation to be updated
3737

3838
<!--
3939
If there are any breaking changes to public APIs, please add the `api change` label.
40-
-->
40+
-->

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,7 @@ use crate::physical_plan::{
3232
use arrow_schema::SchemaRef;
3333
use datafusion_common::Statistics;
3434
use datafusion_execution::TaskContext;
35-
use datafusion_physical_expr::{
36-
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
37-
PhysicalSortExpr,
38-
};
35+
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering, PhysicalSortExpr};
3936

4037
use futures::StreamExt;
4138
use object_store::{GetResultPayload, ObjectStore};
@@ -106,8 +103,8 @@ impl ExecutionPlan for ArrowExec {
106103
.map(|ordering| ordering.as_slice())
107104
}
108105

109-
fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
110-
ordering_equivalence_properties_helper(
106+
fn equivalence_properties(&self) -> EquivalenceProperties {
107+
EquivalenceProperties::new_with_orderings(
111108
self.schema(),
112109
&self.projected_output_ordering,
113110
)

datafusion/core/src/datasource/physical_plan/avro.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,7 @@ use crate::physical_plan::{
3131

3232
use arrow::datatypes::SchemaRef;
3333
use datafusion_execution::TaskContext;
34-
use datafusion_physical_expr::{
35-
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
36-
};
34+
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
3735

3836
/// Execution plan for scanning Avro data source
3937
#[derive(Debug, Clone)]
@@ -101,8 +99,8 @@ impl ExecutionPlan for AvroExec {
10199
.map(|ordering| ordering.as_slice())
102100
}
103101

104-
fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
105-
ordering_equivalence_properties_helper(
102+
fn equivalence_properties(&self) -> EquivalenceProperties {
103+
EquivalenceProperties::new_with_orderings(
106104
self.schema(),
107105
&self.projected_output_ordering,
108106
)

datafusion/core/src/datasource/physical_plan/csv.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,7 @@ use crate::physical_plan::{
4141
use arrow::csv;
4242
use arrow::datatypes::SchemaRef;
4343
use datafusion_execution::TaskContext;
44-
use datafusion_physical_expr::{
45-
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
46-
};
44+
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
4745

4846
use bytes::{Buf, Bytes};
4947
use datafusion_common::config::ConfigOptions;
@@ -159,8 +157,8 @@ impl ExecutionPlan for CsvExec {
159157
.map(|ordering| ordering.as_slice())
160158
}
161159

162-
fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
163-
ordering_equivalence_properties_helper(
160+
fn equivalence_properties(&self) -> EquivalenceProperties {
161+
EquivalenceProperties::new_with_orderings(
164162
self.schema(),
165163
&self.projected_output_ordering,
166164
)

datafusion/core/src/datasource/physical_plan/json.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,7 @@ use crate::physical_plan::{
4040
use arrow::json::ReaderBuilder;
4141
use arrow::{datatypes::SchemaRef, json};
4242
use datafusion_execution::TaskContext;
43-
use datafusion_physical_expr::{
44-
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
45-
};
43+
use datafusion_physical_expr::{EquivalenceProperties, LexOrdering};
4644

4745
use bytes::{Buf, Bytes};
4846
use futures::{ready, stream, StreamExt, TryStreamExt};
@@ -122,8 +120,8 @@ impl ExecutionPlan for NdJsonExec {
122120
.map(|ordering| ordering.as_slice())
123121
}
124122

125-
fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
126-
ordering_equivalence_properties_helper(
123+
fn equivalence_properties(&self) -> EquivalenceProperties {
124+
EquivalenceProperties::new_with_orderings(
127125
self.schema(),
128126
&self.projected_output_ordering,
129127
)

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,7 @@ use crate::{
4545
use arrow::datatypes::{DataType, SchemaRef};
4646
use arrow::error::ArrowError;
4747
use datafusion_physical_expr::{
48-
ordering_equivalence_properties_helper, LexOrdering, OrderingEquivalenceProperties,
49-
PhysicalExpr, PhysicalSortExpr,
48+
EquivalenceProperties, LexOrdering, PhysicalExpr, PhysicalSortExpr,
5049
};
5150

5251
use bytes::Bytes;
@@ -315,8 +314,8 @@ impl ExecutionPlan for ParquetExec {
315314
.map(|ordering| ordering.as_slice())
316315
}
317316

318-
fn ordering_equivalence_properties(&self) -> OrderingEquivalenceProperties {
319-
ordering_equivalence_properties_helper(
317+
fn equivalence_properties(&self) -> EquivalenceProperties {
318+
EquivalenceProperties::new_with_orderings(
320319
self.schema(),
321320
&self.projected_output_ordering,
322321
)

datafusion/core/src/physical_optimizer/enforce_distribution.rs

Lines changed: 61 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -49,12 +49,11 @@ use crate::physical_plan::{
4949
use arrow::compute::SortOptions;
5050
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
5151
use datafusion_expr::logical_plan::JoinType;
52-
use datafusion_physical_expr::equivalence::EquivalenceProperties;
5352
use datafusion_physical_expr::expressions::{Column, NoOp};
54-
use datafusion_physical_expr::utils::{
55-
map_columns_before_projection, ordering_satisfy_requirement_concrete,
53+
use datafusion_physical_expr::utils::map_columns_before_projection;
54+
use datafusion_physical_expr::{
55+
physical_exprs_equal, EquivalenceProperties, PhysicalExpr,
5656
};
57-
use datafusion_physical_expr::{expr_list_eq_strict_order, PhysicalExpr};
5857
use datafusion_physical_plan::unbounded_output;
5958
use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAggExec};
6059

@@ -498,7 +497,7 @@ fn reorder_aggregate_keys(
498497

499498
if parent_required.len() != output_exprs.len()
500499
|| !agg_exec.group_by().null_expr().is_empty()
501-
|| expr_list_eq_strict_order(&output_exprs, parent_required)
500+
|| physical_exprs_equal(&output_exprs, parent_required)
502501
{
503502
Ok(PlanWithKeyRequirements::new(agg_plan))
504503
} else {
@@ -564,13 +563,11 @@ fn reorder_aggregate_keys(
564563
Arc::new(Column::new(
565564
name,
566565
agg_schema.index_of(name).unwrap(),
567-
))
568-
as Arc<dyn PhysicalExpr>,
566+
)) as _,
569567
name.to_owned(),
570568
)
571569
})
572570
.collect::<Vec<_>>();
573-
let agg_schema = new_final_agg.schema();
574571
let agg_fields = agg_schema.fields();
575572
for (idx, field) in
576573
agg_fields.iter().enumerate().skip(output_columns.len())
@@ -706,10 +703,9 @@ pub(crate) fn reorder_join_keys_to_inputs(
706703
) {
707704
if !new_positions.is_empty() {
708705
let new_join_on = new_join_conditions(&left_keys, &right_keys);
709-
let mut new_sort_options = vec![];
710-
for idx in 0..sort_options.len() {
711-
new_sort_options.push(sort_options[new_positions[idx]])
712-
}
706+
let new_sort_options = (0..sort_options.len())
707+
.map(|idx| sort_options[new_positions[idx]])
708+
.collect();
713709
return Ok(Arc::new(SortMergeJoinExec::try_new(
714710
left.clone(),
715711
right.clone(),
@@ -757,39 +753,40 @@ fn try_reorder(
757753
expected: &[Arc<dyn PhysicalExpr>],
758754
equivalence_properties: &EquivalenceProperties,
759755
) -> Option<(JoinKeyPairs, Vec<usize>)> {
756+
let eq_groups = equivalence_properties.eq_group();
760757
let mut normalized_expected = vec![];
761758
let mut normalized_left_keys = vec![];
762759
let mut normalized_right_keys = vec![];
763760
if join_keys.left_keys.len() != expected.len() {
764761
return None;
765762
}
766-
if expr_list_eq_strict_order(expected, &join_keys.left_keys)
767-
|| expr_list_eq_strict_order(expected, &join_keys.right_keys)
763+
if physical_exprs_equal(expected, &join_keys.left_keys)
764+
|| physical_exprs_equal(expected, &join_keys.right_keys)
768765
{
769766
return Some((join_keys, vec![]));
770-
} else if !equivalence_properties.classes().is_empty() {
767+
} else if !equivalence_properties.eq_group().is_empty() {
771768
normalized_expected = expected
772769
.iter()
773-
.map(|e| equivalence_properties.normalize_expr(e.clone()))
770+
.map(|e| eq_groups.normalize_expr(e.clone()))
774771
.collect::<Vec<_>>();
775772
assert_eq!(normalized_expected.len(), expected.len());
776773

777774
normalized_left_keys = join_keys
778775
.left_keys
779776
.iter()
780-
.map(|e| equivalence_properties.normalize_expr(e.clone()))
777+
.map(|e| eq_groups.normalize_expr(e.clone()))
781778
.collect::<Vec<_>>();
782779
assert_eq!(join_keys.left_keys.len(), normalized_left_keys.len());
783780

784781
normalized_right_keys = join_keys
785782
.right_keys
786783
.iter()
787-
.map(|e| equivalence_properties.normalize_expr(e.clone()))
784+
.map(|e| eq_groups.normalize_expr(e.clone()))
788785
.collect::<Vec<_>>();
789786
assert_eq!(join_keys.right_keys.len(), normalized_right_keys.len());
790787

791-
if expr_list_eq_strict_order(&normalized_expected, &normalized_left_keys)
792-
|| expr_list_eq_strict_order(&normalized_expected, &normalized_right_keys)
788+
if physical_exprs_equal(&normalized_expected, &normalized_left_keys)
789+
|| physical_exprs_equal(&normalized_expected, &normalized_right_keys)
793790
{
794791
return Some((join_keys, vec![]));
795792
}
@@ -870,7 +867,7 @@ fn new_join_conditions(
870867
r_key.as_any().downcast_ref::<Column>().unwrap().clone(),
871868
)
872869
})
873-
.collect::<Vec<_>>()
870+
.collect()
874871
}
875872

876873
/// Updates `dist_onward` such that, to keep track of
@@ -935,9 +932,9 @@ fn add_roundrobin_on_top(
935932
let should_preserve_ordering = input.output_ordering().is_some();
936933

937934
let partitioning = Partitioning::RoundRobinBatch(n_target);
938-
let repartition = RepartitionExec::try_new(input, partitioning)?
939-
.with_preserve_order(should_preserve_ordering);
940-
let new_plan = Arc::new(repartition) as Arc<dyn ExecutionPlan>;
935+
let repartition = RepartitionExec::try_new(input, partitioning)?;
936+
let new_plan = Arc::new(repartition.with_preserve_order(should_preserve_ordering))
937+
as Arc<dyn ExecutionPlan>;
941938

942939
// update distribution onward with new operator
943940
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
@@ -1011,9 +1008,9 @@ fn add_hash_on_top(
10111008
input
10121009
};
10131010
let partitioning = Partitioning::Hash(hash_exprs, n_target);
1014-
let repartition = RepartitionExec::try_new(new_plan, partitioning)?
1015-
.with_preserve_order(should_preserve_ordering);
1016-
new_plan = Arc::new(repartition) as _;
1011+
let repartition = RepartitionExec::try_new(new_plan, partitioning)?;
1012+
new_plan =
1013+
Arc::new(repartition.with_preserve_order(should_preserve_ordering)) as _;
10171014

10181015
// update distribution onward with new operator
10191016
update_distribution_onward(new_plan.clone(), dist_onward, input_idx);
@@ -1302,16 +1299,12 @@ fn ensure_distribution(
13021299

13031300
// There is an ordering requirement of the operator:
13041301
if let Some(required_input_ordering) = required_input_ordering {
1305-
let existing_ordering = child.output_ordering().unwrap_or(&[]);
13061302
// Either:
13071303
// - Ordering requirement cannot be satisfied by preserving ordering through repartitions, or
13081304
// - using order preserving variant is not desirable.
1309-
let ordering_satisfied = ordering_satisfy_requirement_concrete(
1310-
existing_ordering,
1311-
required_input_ordering,
1312-
|| child.equivalence_properties(),
1313-
|| child.ordering_equivalence_properties(),
1314-
);
1305+
let ordering_satisfied = child
1306+
.equivalence_properties()
1307+
.ordering_satisfy_requirement(required_input_ordering);
13151308
if !ordering_satisfied || !order_preserving_variants_desirable {
13161309
replace_order_preserving_variants(&mut child, dist_onward)?;
13171310
// If ordering requirements were satisfied before repartitioning,
@@ -3763,14 +3756,14 @@ mod tests {
37633756
fn repartition_transitively_past_sort_with_filter() -> Result<()> {
37643757
let schema = schema();
37653758
let sort_key = vec![PhysicalSortExpr {
3766-
expr: col("c", &schema).unwrap(),
3759+
expr: col("a", &schema).unwrap(),
37673760
options: SortOptions::default(),
37683761
}];
37693762
let plan = sort_exec(sort_key, filter_exec(parquet_exec()), false);
37703763

37713764
let expected = &[
3772-
"SortPreservingMergeExec: [c@2 ASC]",
3773-
"SortExec: expr=[c@2 ASC]",
3765+
"SortPreservingMergeExec: [a@0 ASC]",
3766+
"SortExec: expr=[a@0 ASC]",
37743767
// Expect repartition on the input to the sort (as it can benefit from additional parallelism)
37753768
"FilterExec: c@2 = 0",
37763769
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=1",
@@ -3780,7 +3773,7 @@ mod tests {
37803773
assert_optimized!(expected, plan.clone(), true);
37813774

37823775
let expected_first_sort_enforcement = &[
3783-
"SortExec: expr=[c@2 ASC]",
3776+
"SortExec: expr=[a@0 ASC]",
37843777
"CoalescePartitionsExec",
37853778
"FilterExec: c@2 = 0",
37863779
// Expect repartition on the input of the filter (as it can benefit from additional parallelism)
@@ -4357,29 +4350,54 @@ mod tests {
43574350
fn do_not_preserve_ordering_through_repartition() -> Result<()> {
43584351
let schema = schema();
43594352
let sort_key = vec![PhysicalSortExpr {
4360-
expr: col("c", &schema).unwrap(),
4353+
expr: col("a", &schema).unwrap(),
43614354
options: SortOptions::default(),
43624355
}];
43634356
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
43644357
let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input));
43654358

43664359
let expected = &[
4367-
"SortPreservingMergeExec: [c@2 ASC]",
4368-
"SortExec: expr=[c@2 ASC]",
4360+
"SortPreservingMergeExec: [a@0 ASC]",
4361+
"SortExec: expr=[a@0 ASC]",
43694362
"FilterExec: c@2 = 0",
43704363
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
4371-
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
4364+
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
43724365
];
43734366

43744367
assert_optimized!(expected, physical_plan.clone(), true);
43754368

43764369
let expected = &[
4377-
"SortExec: expr=[c@2 ASC]",
4370+
"SortExec: expr=[a@0 ASC]",
4371+
"CoalescePartitionsExec",
4372+
"FilterExec: c@2 = 0",
4373+
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
4374+
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[a@0 ASC]",
4375+
];
4376+
assert_optimized!(expected, physical_plan, false);
4377+
4378+
Ok(())
4379+
}
4380+
4381+
#[test]
4382+
fn no_need_for_sort_after_filter() -> Result<()> {
4383+
let schema = schema();
4384+
let sort_key = vec![PhysicalSortExpr {
4385+
expr: col("c", &schema).unwrap(),
4386+
options: SortOptions::default(),
4387+
}];
4388+
let input = parquet_exec_multiple_sorted(vec![sort_key.clone()]);
4389+
let physical_plan = sort_preserving_merge_exec(sort_key, filter_exec(input));
4390+
4391+
let expected = &[
4392+
// After CoalescePartitionsExec c is still constant. Hence c@2 ASC ordering is already satisfied.
43784393
"CoalescePartitionsExec",
4394+
// Since after this stage c is constant. c@2 ASC ordering is already satisfied.
43794395
"FilterExec: c@2 = 0",
43804396
"RepartitionExec: partitioning=RoundRobinBatch(10), input_partitions=2",
43814397
"ParquetExec: file_groups={2 groups: [[x], [y]]}, projection=[a, b, c, d, e], output_ordering=[c@2 ASC]",
43824398
];
4399+
4400+
assert_optimized!(expected, physical_plan.clone(), true);
43834401
assert_optimized!(expected, physical_plan, false);
43844402

43854403
Ok(())

0 commit comments

Comments
 (0)