Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions datafusion/core/src/physical_optimizer/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,8 @@ pub(crate) fn pushdown_sorts(
plan.equivalence_properties()
}) {
// If the current plan is a SortExec, modify it to satisfy parent requirements:
let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(
parent_required.ok_or_else(err)?.iter().cloned(),
);
let parent_required_expr =
PhysicalSortRequirement::to_sort_exprs(parent_required.ok_or_else(err)?);
new_plan = sort_exec.input.clone();
add_sort_above(&mut new_plan, parent_required_expr)?;
};
Expand Down Expand Up @@ -171,9 +170,8 @@ pub(crate) fn pushdown_sorts(
}))
} else {
// Can not push down requirements, add new SortExec:
let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(
parent_required.ok_or_else(err)?.iter().cloned(),
);
let parent_required_expr =
PhysicalSortRequirement::to_sort_exprs(parent_required.ok_or_else(err)?);
let mut new_plan = plan.clone();
add_sort_above(&mut new_plan, parent_required_expr)?;
Ok(Transformed::Yes(SortPushDown::init(new_plan)))
Expand Down Expand Up @@ -209,9 +207,8 @@ fn pushdown_requirement_to_children(
} else if let Some(smj) = plan.as_any().downcast_ref::<SortMergeJoinExec>() {
// If the current plan is SortMergeJoinExec
let left_columns_len = smj.left.schema().fields().len();
let parent_required_expr = PhysicalSortRequirement::to_sort_exprs(
parent_required.ok_or_else(err)?.iter().cloned(),
);
let parent_required_expr =
PhysicalSortRequirement::to_sort_exprs(parent_required.ok_or_else(err)?);
let expr_source_side =
expr_source_sides(&parent_required_expr, smj.join_type, left_columns_len);
match expr_source_side {
Expand Down
31 changes: 24 additions & 7 deletions datafusion/physical-expr/src/sort_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,24 @@ impl From<PhysicalSortRequirement> for PhysicalSortExpr {
}
}

impl From<&PhysicalSortRequirement> for PhysicalSortExpr {
fn from(value: &PhysicalSortRequirement) -> Self {
value.clone().into_sort_expr()
}
}

impl From<PhysicalSortExpr> for PhysicalSortRequirement {
fn from(value: PhysicalSortExpr) -> Self {
PhysicalSortRequirement::new(value.expr, Some(value.options))
}
}

impl From<&PhysicalSortExpr> for PhysicalSortRequirement {
fn from(value: &PhysicalSortExpr) -> Self {
PhysicalSortRequirement::from(value.clone())
}
}

impl PartialEq for PhysicalSortRequirement {
fn eq(&self, other: &PhysicalSortRequirement) -> bool {
self.options == other.options && self.expr.eq(&other.expr)
Expand Down Expand Up @@ -180,12 +192,14 @@ impl PhysicalSortRequirement {
///
/// This method takes `&'a PhysicalSortExpr` to make it easy to
/// use implementing [`ExecutionPlan::required_input_ordering`].
pub fn from_sort_exprs<'a>(
ordering: impl IntoIterator<Item = &'a PhysicalSortExpr>,
) -> Vec<PhysicalSortRequirement> {
pub fn from_sort_exprs<T>(
ordering: impl IntoIterator<Item = T>,
) -> Vec<PhysicalSortRequirement>
where
PhysicalSortRequirement: From<T>,
{
ordering
.into_iter()
.cloned()
.map(PhysicalSortRequirement::from)
.collect()
}
Expand All @@ -196,9 +210,12 @@ impl PhysicalSortRequirement {
/// This function converts `PhysicalSortRequirement` to `PhysicalSortExpr`
/// for each entry in the input. If required ordering is None for an entry
/// default ordering `ASC, NULLS LAST` if given (see [`into_sort_expr`])
pub fn to_sort_exprs(
requirements: impl IntoIterator<Item = PhysicalSortRequirement>,
) -> Vec<PhysicalSortExpr> {
pub fn to_sort_exprs<T>(
requirements: impl IntoIterator<Item = T>,
) -> Vec<PhysicalSortExpr>
where
PhysicalSortExpr: From<T>,
{
requirements
.into_iter()
.map(PhysicalSortExpr::from)
Expand Down