Skip to content

Commit 8ebdc13

Browse files
committed
Implement qualified expression alias and extend test coverage
1 parent 8467736 commit 8ebdc13

File tree

16 files changed

+134
-36
lines changed

16 files changed

+134
-36
lines changed

datafusion/expr/src/expr.rs

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use crate::Operator;
2828
use crate::{aggregate_function, ExprSchemable};
2929
use arrow::datatypes::DataType;
3030
use datafusion_common::tree_node::{Transformed, TreeNode};
31-
use datafusion_common::{internal_err, DFSchema};
31+
use datafusion_common::{internal_err, DFSchema, OwnedTableReference};
3232
use datafusion_common::{plan_err, Column, DataFusionError, Result, ScalarValue};
3333
use std::collections::HashSet;
3434
use std::fmt;
@@ -172,7 +172,7 @@ pub enum Expr {
172172
/// plan into physical plan.
173173
Wildcard,
174174
/// Represents a reference to all available fields in a specific schema.
175-
///
175+
///
176176
/// This expr has to be resolved to a list of columns before translating logical
177177
/// plan into physical plan.
178178
QualifiedWildcard { qualifier: String },
@@ -191,13 +191,20 @@ pub enum Expr {
191191
#[derive(Clone, PartialEq, Eq, Hash, Debug)]
192192
pub struct Alias {
193193
pub expr: Box<Expr>,
194+
pub relation: Option<OwnedTableReference>,
194195
pub name: String,
195196
}
196197

197198
impl Alias {
198-
pub fn new(expr: Expr, name: impl Into<String>) -> Self {
199+
/// Create an alias with an optional schema/field qualifier.
200+
pub fn new(
201+
expr: Expr,
202+
relation: Option<impl Into<OwnedTableReference>>,
203+
name: impl Into<String>,
204+
) -> Self {
199205
Self {
200206
expr: Box::new(expr),
207+
relation: relation.map(|r| r.into()),
201208
name: name.into(),
202209
}
203210
}
@@ -849,7 +856,27 @@ impl Expr {
849856
asc,
850857
nulls_first,
851858
}) => Expr::Sort(Sort::new(Box::new(expr.alias(name)), asc, nulls_first)),
852-
_ => Expr::Alias(Alias::new(self, name.into())),
859+
_ => Expr::Alias(Alias::new(self, None::<&str>, name.into())),
860+
}
861+
}
862+
863+
/// Return `self AS name` alias expression with a specific qualifier
864+
pub fn alias_qualified(
865+
self,
866+
relation: Option<impl Into<OwnedTableReference>>,
867+
name: impl Into<String>,
868+
) -> Expr {
869+
match self {
870+
Expr::Sort(Sort {
871+
expr,
872+
asc,
873+
nulls_first,
874+
}) => Expr::Sort(Sort::new(
875+
Box::new(expr.alias_qualified(relation, name)),
876+
asc,
877+
nulls_first,
878+
)),
879+
_ => Expr::Alias(Alias::new(self, relation, name.into())),
853880
}
854881
}
855882

datafusion/expr/src/expr_schema.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,13 @@ impl ExprSchemable for Expr {
308308
self.nullable(input_schema)?,
309309
)
310310
.with_metadata(self.metadata(input_schema)?)),
311+
Expr::Alias(Alias { relation, name, .. }) => Ok(DFField::new(
312+
relation.clone(),
313+
name,
314+
self.get_type(input_schema)?,
315+
self.nullable(input_schema)?,
316+
)
317+
.with_metadata(self.metadata(input_schema)?)),
311318
_ => Ok(DFField::new_unqualified(
312319
&self.display_name()?,
313320
self.get_type(input_schema)?,

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2219,19 +2219,10 @@ impl DistinctOn {
22192219

22202220
let on_expr = normalize_cols(on_expr, input.as_ref())?;
22212221

2222-
// Create fields with any qualifier stuffed in the name itself
2223-
let fields = exprlist_to_fields(&select_expr, &input)?
2224-
.iter()
2225-
.map(|f| {
2226-
DFField::new_unqualified(
2227-
&f.qualified_name(),
2228-
f.data_type().clone(),
2229-
f.is_nullable(),
2230-
)
2231-
})
2232-
.collect();
2233-
let schema =
2234-
DFSchema::new_with_metadata(fields, input.schema().metadata().clone())?;
2222+
let schema = DFSchema::new_with_metadata(
2223+
exprlist_to_fields(&select_expr, &input)?,
2224+
input.schema().metadata().clone(),
2225+
)?;
22352226

22362227
let mut distinct_on = DistinctOn {
22372228
on_expr,

datafusion/expr/src/tree_node/expr.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -158,9 +158,11 @@ impl TreeNode for Expr {
158158
let mut transform = transform;
159159

160160
Ok(match self {
161-
Expr::Alias(Alias { expr, name, .. }) => {
162-
Expr::Alias(Alias::new(transform(*expr)?, name))
163-
}
161+
Expr::Alias(Alias {
162+
expr,
163+
relation,
164+
name,
165+
}) => Expr::Alias(Alias::new(transform(*expr)?, relation, name)),
164166
Expr::Column(_) => self,
165167
Expr::OuterReferenceColumn(_, _) => self,
166168
Expr::Exists { .. } => self,

datafusion/expr/src/utils.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -801,9 +801,11 @@ pub fn columnize_expr(e: Expr, input_schema: &DFSchema) -> Expr {
801801
match e {
802802
Expr::Column(_) => e,
803803
Expr::OuterReferenceColumn(_, _) => e,
804-
Expr::Alias(Alias { expr, name, .. }) => {
805-
columnize_expr(*expr, input_schema).alias(name)
806-
}
804+
Expr::Alias(Alias {
805+
expr,
806+
relation,
807+
name,
808+
}) => columnize_expr(*expr, input_schema).alias_qualified(relation, name),
807809
Expr::Cast(Cast { expr, data_type }) => Expr::Cast(Cast {
808810
expr: Box::new(columnize_expr(*expr, input_schema)),
809811
data_type,

datafusion/optimizer/src/optimizer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ impl Optimizer {
427427
/// Returns an error if plans have different schemas.
428428
///
429429
/// It ignores metadata and nullability.
430-
fn assert_schema_is_the_same(
430+
pub(crate) fn assert_schema_is_the_same(
431431
rule_name: &str,
432432
prev_plan: &LogicalPlan,
433433
new_plan: &LogicalPlan,

datafusion/optimizer/src/replace_distinct_aggregate.rs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
8585
on_expr,
8686
sort_expr,
8787
input,
88-
..
88+
schema,
8989
})) => {
9090
// Construct the aggregation expression to be used to fetch the selected expressions.
9191
let aggr_expr = select_expr
@@ -124,9 +124,12 @@ impl OptimizerRule for ReplaceDistinctWithAggregate {
124124
.fields()
125125
.iter()
126126
.skip(on_expr.len())
127-
.zip(select_expr.iter())
128-
.map(|(f, sel)| {
129-
Ok(col(f.qualified_column()).alias(sel.display_name()?))
127+
.zip(schema.fields().iter())
128+
.map(|(new_field, old_field)| {
129+
Ok(col(new_field.qualified_column()).alias_qualified(
130+
old_field.qualifier().cloned(),
131+
old_field.name(),
132+
))
130133
})
131134
.collect::<Result<Vec<Expr>>>()?;
132135

@@ -174,4 +177,27 @@ mod tests {
174177
expected,
175178
)
176179
}
180+
181+
#[test]
182+
fn replace_distinct_on() -> datafusion_common::Result<()> {
183+
let table_scan = test_table_scan().unwrap();
184+
let plan = LogicalPlanBuilder::from(table_scan)
185+
.distinct_on(
186+
vec![col("a")],
187+
vec![col("b")],
188+
Some(vec![col("a").sort(false, true), col("c").sort(true, false)]),
189+
)?
190+
.build()?;
191+
192+
let expected = "Projection: FIRST_VALUE(test.b) ORDER BY [test.a DESC NULLS FIRST, test.c ASC NULLS LAST] AS b\
193+
\n Sort: test.a DESC NULLS FIRST\
194+
\n Aggregate: groupBy=[[test.a]], aggr=[[FIRST_VALUE(test.b) ORDER BY [test.a DESC NULLS FIRST, test.c ASC NULLS LAST]]]\
195+
\n TableScan: test";
196+
197+
assert_optimized_plan_eq(
198+
Arc::new(ReplaceDistinctWithAggregate::new()),
199+
&plan,
200+
expected,
201+
)
202+
}
177203
}

datafusion/optimizer/src/test/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use crate::analyzer::{Analyzer, AnalyzerRule};
19-
use crate::optimizer::Optimizer;
19+
use crate::optimizer::{assert_schema_is_the_same, Optimizer};
2020
use crate::{OptimizerContext, OptimizerRule};
2121
use arrow::datatypes::{DataType, Field, Schema};
2222
use datafusion_common::config::ConfigOptions;
@@ -155,14 +155,17 @@ pub fn assert_optimized_plan_eq(
155155
plan: &LogicalPlan,
156156
expected: &str,
157157
) -> Result<()> {
158-
let optimizer = Optimizer::with_rules(vec![rule]);
158+
let optimizer = Optimizer::with_rules(vec![rule.clone()]);
159159
let optimized_plan = optimizer
160160
.optimize_recursively(
161161
optimizer.rules.get(0).unwrap(),
162162
plan,
163163
&OptimizerContext::new(),
164164
)?
165165
.unwrap_or_else(|| plan.clone());
166+
167+
// Ensure schemas always match after an optimization
168+
assert_schema_is_the_same(rule.name(), plan, &optimized_plan)?;
166169
let formatted_plan = format!("{optimized_plan:?}");
167170
assert_eq!(formatted_plan, expected);
168171

datafusion/proto/proto/datafusion.proto

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,8 @@ message Not {
488488

489489
message AliasNode {
490490
LogicalExprNode expr = 1;
491-
string alias = 2;
491+
repeated OwnedTableReference relation = 2;
492+
string alias = 3;
492493
}
493494

494495
message BinaryExprNode {

datafusion/proto/src/generated/pbjson.rs

Lines changed: 17 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)