Skip to content

Commit 1daa5ed

Browse files
ozankabakmertak-synnadaalambberkaysynnada
authored
[MAJOR] Equivalence System Overhaul (#16217)
* introduce Soft & Hard RequiredInputOrderings remove usage of prefer_existing_sort as default set requirements Hard set soft on AggregateExec and BoundedWindowExec since they have InputOrderMode functionalities * add documentation to replace_with_partial_sort simplify indentation * add documentation to analyze_immediate_sort_removal simplify indentation * remove prefer_existing_sort effects remove prefer_existing_sort based test cases * remove prefer_existing_sort configuration * remove prefer_existing_sort configuration * add documentation * add documentation * add documentation * fix imports and test cases * fix imports and test cases * implement RequiredInputOrdering as vectors * implement RequiredInputOrdering as vectors return alternative on BoundedWindowAggExec * fix test cases * change doc * revert prefer_existing_sort flag * fix changes * fix test case * make LexRequirement private * ensure RequiredInputOrdering inner requirement can not be empty simplify sort_pushdown.rs * add default test cases add requirements compatible test cases * doc fixes * fix clippy and docs * format code * format code * doc fix * add TODO test cases with test_soft_hard_requirements prefix * Review Part 1 * Review Part 2 * Review Part 3 * Review Part 4 * Review Part 5 * Review Part 6 * Enforce non-degeneracy for LexRequirement * Enforce non-degeneracy for LexOrdering (Part 1) * Enforce non-degeneracy for LexOrdering (Part 2) * fix first phase of merge conflicts and other bugs * Fix sqllogictests except the schema mismatch * Cleanup Part 1 * Cleanup Part 2 * Cleanup Part 3 * do not initialize Trivial accumulators if ordering is set * initialize TrivialFirstPrimitiveGroupsAccumulator struct and return * fix clippy * fix merge conflicts * fix typos remove TrivialFirstPrimitiveGroupsAccumulator make groups accumulator available only when order requirement is set * format code * Add requirement_satisfied back in * Replace AsRef with ordinary & for LexOrdering * Further cleanup * add OutputRequirementExec fetches to sort adding * Simplify remove_redundant_entries * Work with iterators in ordering_satisfy_requirement * Fix doctests * Cleanup LexOrdering APIs * Cleanup LexOrdering APIs 2 * Add reverse_each to LexOrdering * Use LexOrdering instead of Arc<[PhysicalSortExpr]> * Use PhysicalSortExpr slices in contexts where we simply list sort expressions * Generalize add_new_ordering APIs * Simplifications * More cleanups * API Simplifications * Improve comments * Use vector in Expr structs * Fix doctests * Simplify sort * Simplify the get_finer_aggregate_exprs_requirement function * Avoid hidden clones * bugfix * Simplify the get_finer_aggregate_exprs_requirement function * Simplify the function with_reorder * Fix with_reorder bug * Simplify the function with_reorder (Part 2) * Simplify * DRY * Simplifications * Improve add_equal_condition * Improve docs * Simplifications * Simplifications * RequiredInputOrdering -> OrderingAlternatives * Simplify new_with_orderings * Transition to fallible LexOrdering constructor * Transition to fallible LexOrdering constructor - 2 * Transition to fallible LexOrdering constructor - 3 * Transition to fallible LexOrdering constructor - 4 * Transition to fallible LexOrdering constructor - 5 * Transition to fallible LexOrdering constructor - 6 * Transition to fallible LexOrdering constructor - 7 * Transition to fallible LexOrdering constructor - 8 * Transition to fallible LexOrdering constructor - 9 * Transition to fallible LexOrdering constructor - 10 * Transition to fallible LexOrdering constructor - 11 * Simplify constant expressions * Simplify constant expressions - 2 * Simplify constant expressions - 3 * Simplify constant expressions - 4 * Simplify constant expressions - 5 * Simplify constant expressions - 6 * Simplify constant expressions - 7 * Simplify constant expressions - 8 * Simplify constant expressions - 9 * Fix imports * Remove explicit constant tracking from equivalences * Resolve logical conflict * Remove the unusual take API, instead use the from trait * Simplify projection mapping - 1 * Use a map instead of a vector in ProjectionMapping * Simplify DependencyMap * Simplify DependencyMap - 2 * Simplify DependencyMap - 3 * Incorporate Jay's suggestions * Simplifications * Fix doctest * Improve docstrings * Update/cast the constant value accordingly when schema changes * Improve ProjectionMapping * Remove DerefMut from ProjectionTargets to preserve non-emptiness * Docstring * Optimize project_expr by fetching equivalence classes only once * Project multiple expressions more efficiently at once * Project multiple expressions more efficiently at once - 2 * Project multiple expressions more efficiently at once - 3 * Project multiple expressions more efficiently at once - 4 * Move normalization of sort expressions to equivalence group * Improve comments * Improve display for EquivalenceProperties * More idiomatic code * More succinct code * Remove extend_orderings from EquivalenceProperties * Simplify with_reorder * Store normalized orderings - 1 * Reduce time complexity of normalization w.r.t. number of equivalence classes * Simplify bridge_classes logic * Remove TODOs * Simplify generate_dependency_orderings * normalized orderings - 2 * normalized orderings - 3 * undo normalized orderings * Fix logical conflicts * Fix imports * Remove noop code * Move add_offset_to_expr * Remove mutation from LexOrdering * Remove unwraps * Remove unwraps - 2 * Remove unwraps - 3 * Remove unwraps - 4 * Remove collapse from LexOrdering * Remove unwraps - 5 * Remove unwraps - 6 * Remove unwraps - 7 * Remove unwraps - 8 * Remove unwraps - 9 * Remove unwraps - 10 * Remove collapse from LexRequirement * Simplify ordering_satisfy * Enforce uniqueness in LexOrdering * Fix with_reorder * Use tee * Fix reorder api * Comment grammar * Remove unwraps * Cache normalized orderings * Minor: remove an unecessary clone in common_sort_prefix_length * Address reviews --------- Co-authored-by: mertak-synnada <mertak67+synaada@gmail.com> Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org> Co-authored-by: Berkay Şahin <124376117+berkaysynnada@users.noreply.github.com>
1 parent 0f83c1d commit 1daa5ed

File tree

130 files changed

+7350
-7643
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

130 files changed

+7350
-7643
lines changed

benchmarks/src/sort.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -70,28 +70,31 @@ impl RunOpt {
7070
let sort_cases = vec![
7171
(
7272
"sort utf8",
73-
LexOrdering::new(vec![PhysicalSortExpr {
73+
[PhysicalSortExpr {
7474
expr: col("request_method", &schema)?,
7575
options: Default::default(),
76-
}]),
76+
}]
77+
.into(),
7778
),
7879
(
7980
"sort int",
80-
LexOrdering::new(vec![PhysicalSortExpr {
81+
[PhysicalSortExpr {
8182
expr: col("response_bytes", &schema)?,
8283
options: Default::default(),
83-
}]),
84+
}]
85+
.into(),
8486
),
8587
(
8688
"sort decimal",
87-
LexOrdering::new(vec![PhysicalSortExpr {
89+
[PhysicalSortExpr {
8890
expr: col("decimal_price", &schema)?,
8991
options: Default::default(),
90-
}]),
92+
}]
93+
.into(),
9194
),
9295
(
9396
"sort integer tuple",
94-
LexOrdering::new(vec![
97+
[
9598
PhysicalSortExpr {
9699
expr: col("request_bytes", &schema)?,
97100
options: Default::default(),
@@ -100,11 +103,12 @@ impl RunOpt {
100103
expr: col("response_bytes", &schema)?,
101104
options: Default::default(),
102105
},
103-
]),
106+
]
107+
.into(),
104108
),
105109
(
106110
"sort utf8 tuple",
107-
LexOrdering::new(vec![
111+
[
108112
// sort utf8 tuple
109113
PhysicalSortExpr {
110114
expr: col("service", &schema)?,
@@ -122,11 +126,12 @@ impl RunOpt {
122126
expr: col("image", &schema)?,
123127
options: Default::default(),
124128
},
125-
]),
129+
]
130+
.into(),
126131
),
127132
(
128133
"sort mixed tuple",
129-
LexOrdering::new(vec![
134+
[
130135
PhysicalSortExpr {
131136
expr: col("service", &schema)?,
132137
options: Default::default(),
@@ -139,7 +144,8 @@ impl RunOpt {
139144
expr: col("decimal_price", &schema)?,
140145
options: Default::default(),
141146
},
142-
]),
147+
]
148+
.into(),
143149
),
144150
];
145151
for (title, expr) in sort_cases {

datafusion-examples/examples/custom_file_format.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,24 @@ use arrow::{
2121
array::{AsArray, RecordBatch, StringArray, UInt8Array},
2222
datatypes::{DataType, Field, Schema, SchemaRef, UInt64Type},
2323
};
24-
use datafusion::physical_expr::LexRequirement;
2524
use datafusion::{
2625
catalog::Session,
2726
common::{GetExt, Statistics},
28-
};
29-
use datafusion::{
30-
datasource::physical_plan::FileSource, execution::session_state::SessionStateBuilder,
31-
};
32-
use datafusion::{
3327
datasource::{
3428
file_format::{
3529
csv::CsvFormatFactory, file_compression_type::FileCompressionType,
3630
FileFormat, FileFormatFactory,
3731
},
38-
physical_plan::{FileScanConfig, FileSinkConfig},
32+
physical_plan::{FileScanConfig, FileSinkConfig, FileSource},
3933
MemTable,
4034
},
4135
error::Result,
36+
execution::session_state::SessionStateBuilder,
37+
physical_expr_common::sort_expr::LexRequirement,
4238
physical_plan::ExecutionPlan,
4339
prelude::SessionContext,
4440
};
41+
4542
use object_store::{ObjectMeta, ObjectStore};
4643
use tempfile::tempdir;
4744

datafusion-examples/examples/function_factory.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -150,10 +150,6 @@ impl ScalarUDFImpl for ScalarFunctionWrapper {
150150
Ok(ExprSimplifyResult::Simplified(replacement))
151151
}
152152

153-
fn aliases(&self) -> &[String] {
154-
&[]
155-
}
156-
157153
fn output_ordering(&self, _input: &[ExprProperties]) -> Result<SortProperties> {
158154
Ok(SortProperties::Unordered)
159155
}

datafusion/catalog/src/listing_schema.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ use std::sync::{Arc, Mutex};
2525
use crate::{SchemaProvider, TableProvider, TableProviderFactory};
2626

2727
use crate::Session;
28-
use datafusion_common::{
29-
Constraints, DFSchema, DataFusionError, HashMap, TableReference,
30-
};
28+
use datafusion_common::{DFSchema, DataFusionError, HashMap, TableReference};
3129
use datafusion_expr::CreateExternalTable;
3230

3331
use async_trait::async_trait;
@@ -143,7 +141,7 @@ impl ListingSchemaProvider {
143141
order_exprs: vec![],
144142
unbounded: false,
145143
options: Default::default(),
146-
constraints: Constraints::empty(),
144+
constraints: Default::default(),
147145
column_defaults: Default::default(),
148146
},
149147
)

datafusion/catalog/src/memory/table.rs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,25 +23,22 @@ use std::fmt::Debug;
2323
use std::sync::Arc;
2424

2525
use crate::TableProvider;
26-
use datafusion_common::error::Result;
27-
use datafusion_expr::Expr;
28-
use datafusion_expr::TableType;
29-
use datafusion_physical_expr::create_physical_sort_exprs;
30-
use datafusion_physical_plan::repartition::RepartitionExec;
31-
use datafusion_physical_plan::{
32-
common, ExecutionPlan, ExecutionPlanProperties, Partitioning,
33-
};
3426

3527
use arrow::datatypes::SchemaRef;
3628
use arrow::record_batch::RecordBatch;
29+
use datafusion_common::error::Result;
3730
use datafusion_common::{not_impl_err, plan_err, Constraints, DFSchema, SchemaExt};
3831
use datafusion_common_runtime::JoinSet;
39-
use datafusion_datasource::memory::MemSink;
40-
use datafusion_datasource::memory::MemorySourceConfig;
32+
use datafusion_datasource::memory::{MemSink, MemorySourceConfig};
4133
use datafusion_datasource::sink::DataSinkExec;
4234
use datafusion_datasource::source::DataSourceExec;
4335
use datafusion_expr::dml::InsertOp;
44-
use datafusion_expr::SortExpr;
36+
use datafusion_expr::{Expr, SortExpr, TableType};
37+
use datafusion_physical_expr::{create_physical_sort_exprs, LexOrdering};
38+
use datafusion_physical_plan::repartition::RepartitionExec;
39+
use datafusion_physical_plan::{
40+
common, ExecutionPlan, ExecutionPlanProperties, Partitioning,
41+
};
4542
use datafusion_session::Session;
4643

4744
use async_trait::async_trait;
@@ -89,7 +86,7 @@ impl MemTable {
8986
.into_iter()
9087
.map(|e| Arc::new(RwLock::new(e)))
9188
.collect::<Vec<_>>(),
92-
constraints: Constraints::empty(),
89+
constraints: Constraints::default(),
9390
column_defaults: HashMap::new(),
9491
sort_order: Arc::new(Mutex::new(vec![])),
9592
})
@@ -239,16 +236,13 @@ impl TableProvider for MemTable {
239236
if !sort_order.is_empty() {
240237
let df_schema = DFSchema::try_from(self.schema.as_ref().clone())?;
241238

242-
let file_sort_order = sort_order
243-
.iter()
244-
.map(|sort_exprs| {
245-
create_physical_sort_exprs(
246-
sort_exprs,
247-
&df_schema,
248-
state.execution_props(),
249-
)
250-
})
251-
.collect::<Result<Vec<_>>>()?;
239+
let eqp = state.execution_props();
240+
let mut file_sort_order = vec![];
241+
for sort_exprs in sort_order.iter() {
242+
let physical_exprs =
243+
create_physical_sort_exprs(sort_exprs, &df_schema, eqp)?;
244+
file_sort_order.extend(LexOrdering::new(physical_exprs));
245+
}
252246
source = source.try_with_sort_information(file_sort_order)?;
253247
}
254248

datafusion/catalog/src/stream.rs

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,7 @@ impl StreamConfig {
256256
Self {
257257
source,
258258
order: vec![],
259-
constraints: Constraints::empty(),
259+
constraints: Constraints::default(),
260260
}
261261
}
262262

@@ -350,15 +350,10 @@ impl TableProvider for StreamTable {
350350
input: Arc<dyn ExecutionPlan>,
351351
_insert_op: InsertOp,
352352
) -> Result<Arc<dyn ExecutionPlan>> {
353-
let ordering = match self.0.order.first() {
354-
Some(x) => {
355-
let schema = self.0.source.schema();
356-
let orders = create_ordering(schema, std::slice::from_ref(x))?;
357-
let ordering = orders.into_iter().next().unwrap();
358-
Some(ordering.into_iter().map(Into::into).collect())
359-
}
360-
None => None,
361-
};
353+
let schema = self.0.source.schema();
354+
let orders = create_ordering(schema, &self.0.order)?;
355+
// It is sufficient to pass only one of the equivalent orderings:
356+
let ordering = orders.into_iter().next().map(Into::into);
362357

363358
Ok(Arc::new(DataSinkExec::new(
364359
input,

datafusion/common/src/functional_dependencies.rs

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -36,35 +36,31 @@ pub enum Constraint {
3636
}
3737

3838
/// This object encapsulates a list of functional constraints:
39-
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
39+
#[derive(Clone, Debug, Default, Eq, Hash, PartialEq, PartialOrd)]
4040
pub struct Constraints {
4141
inner: Vec<Constraint>,
4242
}
4343

4444
impl Constraints {
45-
/// Create empty constraints
46-
pub fn empty() -> Self {
47-
Constraints::new_unverified(vec![])
48-
}
49-
5045
/// Create a new [`Constraints`] object from the given `constraints`.
51-
/// Users should use the [`Constraints::empty`] or [`SqlToRel::new_constraint_from_table_constraints`] functions
52-
/// for constructing [`Constraints`]. This constructor is for internal
53-
/// purposes only and does not check whether the argument is valid. The user
54-
/// is responsible for supplying a valid vector of [`Constraint`] objects.
46+
/// Users should use the [`Constraints::default`] or [`SqlToRel::new_constraint_from_table_constraints`]
47+
/// functions for constructing [`Constraints`] instances. This constructor
48+
/// is for internal purposes only and does not check whether the argument
49+
/// is valid. The user is responsible for supplying a valid vector of
50+
/// [`Constraint`] objects.
5551
///
5652
/// [`SqlToRel::new_constraint_from_table_constraints`]: https://docs.rs/datafusion/latest/datafusion/sql/planner/struct.SqlToRel.html#method.new_constraint_from_table_constraints
5753
pub fn new_unverified(constraints: Vec<Constraint>) -> Self {
5854
Self { inner: constraints }
5955
}
6056

61-
/// Check whether constraints is empty
62-
pub fn is_empty(&self) -> bool {
63-
self.inner.is_empty()
57+
/// Extends the current constraints with the given `other` constraints.
58+
pub fn extend(&mut self, other: Constraints) {
59+
self.inner.extend(other.inner);
6460
}
6561

66-
/// Projects constraints using the given projection indices.
67-
/// Returns None if any of the constraint columns are not included in the projection.
62+
/// Projects constraints using the given projection indices. Returns `None`
63+
/// if any of the constraint columns are not included in the projection.
6864
pub fn project(&self, proj_indices: &[usize]) -> Option<Self> {
6965
let projected = self
7066
.inner
@@ -74,14 +70,14 @@ impl Constraints {
7470
Constraint::PrimaryKey(indices) => {
7571
let new_indices =
7672
update_elements_with_matching_indices(indices, proj_indices);
77-
// Only keep constraint if all columns are preserved
73+
// Only keep the constraint if all columns are preserved:
7874
(new_indices.len() == indices.len())
7975
.then_some(Constraint::PrimaryKey(new_indices))
8076
}
8177
Constraint::Unique(indices) => {
8278
let new_indices =
8379
update_elements_with_matching_indices(indices, proj_indices);
84-
// Only keep constraint if all columns are preserved
80+
// Only keep the constraint if all columns are preserved:
8581
(new_indices.len() == indices.len())
8682
.then_some(Constraint::Unique(new_indices))
8783
}
@@ -93,15 +89,9 @@ impl Constraints {
9389
}
9490
}
9591

96-
impl Default for Constraints {
97-
fn default() -> Self {
98-
Constraints::empty()
99-
}
100-
}
101-
10292
impl IntoIterator for Constraints {
10393
type Item = Constraint;
104-
type IntoIter = IntoIter<Constraint>;
94+
type IntoIter = IntoIter<Self::Item>;
10595

10696
fn into_iter(self) -> Self::IntoIter {
10797
self.inner.into_iter()

datafusion/core/benches/physical_plan.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,8 @@ fn sort_preserving_merge_operator(
5050

5151
let sort = sort
5252
.iter()
53-
.map(|name| PhysicalSortExpr {
54-
expr: col(name, &schema).unwrap(),
55-
options: Default::default(),
56-
})
57-
.collect::<LexOrdering>();
53+
.map(|name| PhysicalSortExpr::new_default(col(name, &schema).unwrap()));
54+
let sort = LexOrdering::new(sort).unwrap();
5855

5956
let exec = MemorySourceConfig::try_new_exec(
6057
&batches.into_iter().map(|rb| vec![rb]).collect::<Vec<_>>(),

datafusion/core/benches/sort.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ use std::sync::Arc;
7171
use arrow::array::StringViewArray;
7272
use arrow::{
7373
array::{DictionaryArray, Float64Array, Int64Array, StringArray},
74-
compute::SortOptions,
7574
datatypes::{Int32Type, Schema},
7675
record_batch::RecordBatch,
7776
};
@@ -272,14 +271,11 @@ impl BenchCase {
272271

273272
/// Make sort exprs for each column in `schema`
274273
fn make_sort_exprs(schema: &Schema) -> LexOrdering {
275-
schema
274+
let sort_exprs = schema
276275
.fields()
277276
.iter()
278-
.map(|f| PhysicalSortExpr {
279-
expr: col(f.name(), schema).unwrap(),
280-
options: SortOptions::default(),
281-
})
282-
.collect()
277+
.map(|f| PhysicalSortExpr::new_default(col(f.name(), schema).unwrap()));
278+
LexOrdering::new(sort_exprs).unwrap()
283279
}
284280

285281
/// Create streams of int64 (where approximately 1/3 values is repeated)

datafusion/core/benches/spm.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use arrow::array::{ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray};
2121
use datafusion_execution::TaskContext;
2222
use datafusion_physical_expr::expressions::col;
2323
use datafusion_physical_expr::PhysicalSortExpr;
24-
use datafusion_physical_expr_common::sort_expr::LexOrdering;
2524
use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
2625
use datafusion_physical_plan::{collect, ExecutionPlan};
2726

@@ -70,7 +69,7 @@ fn generate_spm_for_round_robin_tie_breaker(
7069
let partitiones = vec![rbs.clone(); partition_count];
7170

7271
let schema = rb.schema();
73-
let sort = LexOrdering::new(vec![
72+
let sort = [
7473
PhysicalSortExpr {
7574
expr: col("b", &schema).unwrap(),
7675
options: Default::default(),
@@ -79,7 +78,8 @@ fn generate_spm_for_round_robin_tie_breaker(
7978
expr: col("c", &schema).unwrap(),
8079
options: Default::default(),
8180
},
82-
]);
81+
]
82+
.into();
8383

8484
let exec = MemorySourceConfig::try_new_exec(&partitiones, schema, None).unwrap();
8585
SortPreservingMergeExec::new(sort, exec)

0 commit comments

Comments
 (0)