Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion datafusion/sql/src/unparser/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ impl Unparser<'_> {
.expr
.iter()
.map(|sort_expr| {
unproject_sort_expr(sort_expr, agg, sort.input.as_ref())
unproject_sort_expr(sort_expr.clone(), agg, sort.input.as_ref())
})
.collect::<Result<Vec<_>>>()?;

Expand Down
76 changes: 42 additions & 34 deletions datafusion/sql/src/unparser/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,51 +270,59 @@ fn find_window_expr<'a>(
.find(|expr| expr.schema_name().to_string() == column_name)
}

/// Transforms a Column expression into the actual expression from aggregation or projection if found.
/// Transforms all Column expressions in a sort expression into the actual expression from aggregation or projection if found.
/// This is required because if an ORDER BY expression is present in an Aggregate or Select, it is replaced
/// with a Column expression (e.g., "sum(catalog_returns.cr_net_loss)"). We need to transform it back to
/// the actual expression, such as sum("catalog_returns"."cr_net_loss").
pub(crate) fn unproject_sort_expr(
sort_expr: &SortExpr,
mut sort_expr: SortExpr,
agg: Option<&Aggregate>,
input: &LogicalPlan,
) -> Result<SortExpr> {
let mut sort_expr = sort_expr.clone();

// Remove alias if present, because ORDER BY cannot use aliases
if let Expr::Alias(alias) = &sort_expr.expr {
sort_expr.expr = *alias.expr.clone();
}

let Expr::Column(ref col_ref) = sort_expr.expr else {
return Ok(sort_expr);
};
sort_expr.expr = sort_expr
.expr
.transform(|sub_expr| {
match sub_expr {
// Remove alias if present, because ORDER BY cannot use aliases
Expr::Alias(alias) => Ok(Transformed::yes(*alias.expr)),
Expr::Column(col) => {
if col.relation.is_some() {
return Ok(Transformed::no(Expr::Column(col)));
}

if col_ref.relation.is_some() {
return Ok(sort_expr);
};
// In case of aggregation there could be columns containing aggregation functions we need to unproject
if let Some(agg) = agg {
if agg.schema.is_column_from_schema(&col) {
return Ok(Transformed::yes(unproject_agg_exprs(
Expr::Column(col),
agg,
None,
)?));
}
}

// In case of aggregation there could be columns containing aggregation functions we need to unproject
if let Some(agg) = agg {
if agg.schema.is_column_from_schema(col_ref) {
let new_expr = unproject_agg_exprs(sort_expr.expr, agg, None)?;
sort_expr.expr = new_expr;
return Ok(sort_expr);
}
}
// If SELECT and ORDER BY contain the same expression with a scalar function, the ORDER BY expression will
// be replaced by a Column expression (e.g., "substr(customer.c_last_name, Int64(0), Int64(5))"), and we need
// to transform it back to the actual expression.
if let LogicalPlan::Projection(Projection { expr, schema, .. }) =
input
{
if let Ok(idx) = schema.index_of_column(&col) {
if let Some(Expr::ScalarFunction(scalar_fn)) = expr.get(idx) {
return Ok(Transformed::yes(Expr::ScalarFunction(
scalar_fn.clone(),
)));
}
}
return Ok(Transformed::no(Expr::Column(col)));
}

// If SELECT and ORDER BY contain the same expression with a scalar function, the ORDER BY expression will
// be replaced by a Column expression (e.g., "substr(customer.c_last_name, Int64(0), Int64(5))"), and we need
// to transform it back to the actual expression.
if let LogicalPlan::Projection(Projection { expr, schema, .. }) = input {
if let Ok(idx) = schema.index_of_column(col_ref) {
if let Some(Expr::ScalarFunction(scalar_fn)) = expr.get(idx) {
sort_expr.expr = Expr::ScalarFunction(scalar_fn.clone());
Ok(Transformed::no(Expr::Column(col)))
}
_ => Ok(Transformed::no(sub_expr)),
}
}
return Ok(sort_expr);
}

})
.map(|e| e.data)?;
Ok(sort_expr)
}

Expand Down