Skip to content

Commit fc77409

Browse files
committed
Encapsulate PhysicalSortRequrement and add more doc comment
1 parent b87871f commit fc77409

File tree

9 files changed

+182
-104
lines changed

9 files changed

+182
-104
lines changed

datafusion/core/src/physical_optimizer/repartition.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -339,9 +339,7 @@ mod tests {
339339
use crate::physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
340340
use crate::physical_plan::union::UnionExec;
341341
use crate::physical_plan::{displayable, DisplayFormatType, Statistics};
342-
use datafusion_physical_expr::{
343-
make_sort_requirements_from_exprs, PhysicalSortRequirement,
344-
};
342+
use datafusion_physical_expr::PhysicalSortRequirement;
345343

346344
fn schema() -> SchemaRef {
347345
Arc::new(Schema::new(vec![Field::new("c1", DataType::Boolean, true)]))
@@ -1162,7 +1160,7 @@ mod tests {
11621160
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
11631161
vec![self
11641162
.output_ordering()
1165-
.map(make_sort_requirements_from_exprs)]
1163+
.map(PhysicalSortRequirement::from_sort_exprs)]
11661164
}
11671165

11681166
fn with_new_children(

datafusion/core/src/physical_optimizer/sort_enforcement.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,9 @@ use arrow::datatypes::SchemaRef;
5050
use datafusion_common::tree_node::{Transformed, TreeNode, VisitRecursion};
5151
use datafusion_common::{reverse_sort_options, DataFusionError};
5252
use datafusion_physical_expr::utils::{
53-
make_sort_exprs_from_requirements, ordering_satisfy,
54-
ordering_satisfy_requirement_concrete,
53+
ordering_satisfy, ordering_satisfy_requirement_concrete,
5554
};
56-
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr};
55+
use datafusion_physical_expr::{PhysicalExpr, PhysicalSortExpr, PhysicalSortRequirement};
5756
use itertools::{concat, izip};
5857
use std::iter::zip;
5958
use std::sync::Arc;
@@ -468,7 +467,8 @@ fn ensure_sorting(
468467
) {
469468
// Make sure we preserve the ordering requirements:
470469
update_child_to_remove_unnecessary_sort(child, sort_onwards, &plan)?;
471-
let sort_expr = make_sort_exprs_from_requirements(&required_ordering);
470+
let sort_expr =
471+
PhysicalSortRequirement::to_sort_exprs(required_ordering);
472472
add_sort_above(child, sort_expr)?;
473473
if is_sort(child) {
474474
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
@@ -479,7 +479,7 @@ fn ensure_sorting(
479479
}
480480
(Some(required), None) => {
481481
// Ordering requirement is not met, we should add a `SortExec` to the plan.
482-
let sort_expr = make_sort_exprs_from_requirements(&required);
482+
let sort_expr = PhysicalSortRequirement::to_sort_exprs(required);
483483
add_sort_above(child, sort_expr)?;
484484
*sort_onwards = Some(ExecTree::new(child.clone(), idx, vec![]));
485485
}

datafusion/core/src/physical_optimizer/sort_pushdown.rs

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,9 @@ use datafusion_common::{DataFusionError, Result};
2727
use datafusion_expr::JoinType;
2828
use datafusion_physical_expr::expressions::Column;
2929
use datafusion_physical_expr::utils::{
30-
make_sort_exprs_from_requirements, ordering_satisfy_requirement,
31-
requirements_compatible,
32-
};
33-
use datafusion_physical_expr::{
34-
make_sort_requirements_from_exprs, PhysicalSortExpr, PhysicalSortRequirement,
30+
ordering_satisfy_requirement, requirements_compatible,
3531
};
32+
use datafusion_physical_expr::{PhysicalSortExpr, PhysicalSortRequirement};
3633
use itertools::izip;
3734
use std::ops::Deref;
3835
use std::sync::Arc;
@@ -130,14 +127,15 @@ pub(crate) fn pushdown_sorts(
130127
plan.equivalence_properties()
131128
}) {
132129
// If the current plan is a SortExec, modify it to satisfy parent requirements:
133-
let parent_required_expr =
134-
make_sort_exprs_from_requirements(parent_required.ok_or_else(err)?);
130+
let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(
131+
parent_required.ok_or_else(err)?.iter().cloned(),
132+
);
135133
new_plan = sort_exec.input.clone();
136134
add_sort_above(&mut new_plan, parent_required_expr)?;
137135
};
138136
let required_ordering = new_plan
139137
.output_ordering()
140-
.map(make_sort_requirements_from_exprs);
138+
.map(PhysicalSortRequirement::from_sort_exprs);
141139
// Since new_plan is a SortExec, we can safely get the 0th index.
142140
let child = &new_plan.children()[0];
143141
if let Some(adjusted) =
@@ -173,8 +171,9 @@ pub(crate) fn pushdown_sorts(
173171
}))
174172
} else {
175173
// Can not push down requirements, add new SortExec:
176-
let parent_required_expr =
177-
make_sort_exprs_from_requirements(parent_required.ok_or_else(err)?);
174+
let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(
175+
parent_required.ok_or_else(err)?.iter().cloned(),
176+
);
178177
let mut new_plan = plan.clone();
179178
add_sort_above(&mut new_plan, parent_required_expr)?;
180179
Ok(Transformed::Yes(SortPushDown::init(new_plan)))
@@ -210,8 +209,9 @@ fn pushdown_requirement_to_children(
210209
} else if let Some(smj) = plan.as_any().downcast_ref::<SortMergeJoinExec>() {
211210
// If the current plan is SortMergeJoinExec
212211
let left_columns_len = smj.left.schema().fields().len();
213-
let parent_required_expr =
214-
make_sort_exprs_from_requirements(parent_required.ok_or_else(err)?);
212+
let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(
213+
parent_required.ok_or_else(err)?.iter().cloned(),
214+
);
215215
let expr_source_side =
216216
expr_source_sides(&parent_required_expr, smj.join_type, left_columns_len);
217217
match expr_source_side {
@@ -383,15 +383,17 @@ fn shift_right_required(
383383
let new_right_required: Vec<PhysicalSortRequirement> = parent_required
384384
.iter()
385385
.filter_map(|r| {
386-
r.expr.as_any().downcast_ref::<Column>().and_then(|col| {
387-
(col.index() >= left_columns_len).then_some(PhysicalSortRequirement {
388-
expr: Arc::new(Column::new(
389-
col.name(),
390-
col.index() - left_columns_len,
391-
)) as _,
392-
options: r.options,
393-
})
394-
})
386+
let Some(col) = r.expr().as_any().downcast_ref::<Column>() else {
387+
return None;
388+
};
389+
390+
if col.index() < left_columns_len {
391+
return None;
392+
}
393+
394+
let new_col =
395+
Arc::new(Column::new(col.name(), col.index() - left_columns_len));
396+
Some(r.clone().with_expr(new_col))
395397
})
396398
.collect::<Vec<_>>();
397399
if new_right_required.len() == parent_required.len() {

datafusion/core/src/physical_plan/joins/sort_merge_join.rs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,7 @@ use arrow::compute::{concat_batches, take, SortOptions};
3535
use arrow::datatypes::{DataType, SchemaRef, TimeUnit};
3636
use arrow::error::ArrowError;
3737
use arrow::record_batch::RecordBatch;
38-
use datafusion_physical_expr::{
39-
make_sort_requirements_from_exprs, PhysicalSortRequirement,
40-
};
38+
use datafusion_physical_expr::PhysicalSortRequirement;
4139
use futures::{Stream, StreamExt};
4240

4341
use crate::error::DataFusionError;
@@ -230,8 +228,12 @@ impl ExecutionPlan for SortMergeJoinExec {
230228

231229
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
232230
vec![
233-
Some(make_sort_requirements_from_exprs(&self.left_sort_exprs)),
234-
Some(make_sort_requirements_from_exprs(&self.right_sort_exprs)),
231+
Some(PhysicalSortRequirement::from_sort_exprs(
232+
&self.left_sort_exprs,
233+
)),
234+
Some(PhysicalSortRequirement::from_sort_exprs(
235+
&self.right_sort_exprs,
236+
)),
235237
]
236238
}
237239

datafusion/core/src/physical_plan/sorts/sort_preserving_merge.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,7 @@ use crate::physical_plan::{
4646
Distribution, ExecutionPlan, Partitioning, PhysicalExpr, RecordBatchStream,
4747
SendableRecordBatchStream, Statistics,
4848
};
49-
use datafusion_physical_expr::{
50-
make_sort_requirements_from_exprs, EquivalenceProperties, PhysicalSortRequirement,
51-
};
49+
use datafusion_physical_expr::{EquivalenceProperties, PhysicalSortRequirement};
5250

5351
/// Sort preserving merge execution plan
5452
///
@@ -128,7 +126,7 @@ impl ExecutionPlan for SortPreservingMergeExec {
128126
}
129127

130128
fn required_input_ordering(&self) -> Vec<Option<Vec<PhysicalSortRequirement>>> {
131-
vec![Some(make_sort_requirements_from_exprs(&self.expr))]
129+
vec![Some(PhysicalSortRequirement::from_sort_exprs(&self.expr))]
132130
}
133131

134132
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {

datafusion/core/src/physical_plan/windows/mod.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -194,18 +194,12 @@ pub(crate) fn calc_requirements(
194194
) -> Option<Vec<PhysicalSortRequirement>> {
195195
let mut sort_reqs = vec![];
196196
for partition_by in partition_by_exprs {
197-
sort_reqs.push(PhysicalSortRequirement {
198-
expr: partition_by.clone(),
199-
options: None,
200-
});
197+
sort_reqs.push(PhysicalSortRequirement::new_expr_only(partition_by.clone()))
201198
}
202-
for PhysicalSortExpr { expr, options } in orderby_sort_exprs {
203-
let contains = sort_reqs.iter().any(|e| expr.eq(&e.expr));
199+
for sort_expr in orderby_sort_exprs {
200+
let contains = sort_reqs.iter().any(|e| sort_expr.expr.eq(e.expr()));
204201
if !contains {
205-
sort_reqs.push(PhysicalSortRequirement {
206-
expr: expr.clone(),
207-
options: Some(*options),
208-
});
202+
sort_reqs.push(PhysicalSortRequirement::from(sort_expr));
209203
}
210204
}
211205
// Convert empty result to None. Otherwise wrap result inside Some()
@@ -297,7 +291,11 @@ mod tests {
297291
nulls_first,
298292
});
299293
let expr = col(col_name, &schema)?;
300-
let res = PhysicalSortRequirement { expr, options };
294+
let res = if let Some(options) = options {
295+
PhysicalSortRequirement::new_exact(expr, options)
296+
} else {
297+
PhysicalSortRequirement::new_expr_only(expr)
298+
};
301299
if let Some(expected) = &mut expected {
302300
expected.push(res);
303301
} else {

datafusion/physical-expr/src/lib.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,7 @@ pub use equivalence::EquivalentClass;
5353
pub use physical_expr::{AnalysisContext, ExprBoundaries, PhysicalExpr, PhysicalExprRef};
5454
pub use planner::create_physical_expr;
5555
pub use scalar_function::ScalarFunctionExpr;
56-
pub use sort_expr::{
57-
make_sort_requirements_from_exprs, PhysicalSortExpr, PhysicalSortRequirement,
58-
};
56+
pub use sort_expr::{PhysicalSortExpr, PhysicalSortRequirement};
5957
pub use utils::{
6058
expr_list_eq_any_order, expr_list_eq_strict_order,
6159
normalize_expr_with_equivalence_properties, normalize_out_expr_with_alias_schema,

0 commit comments

Comments
 (0)