Skip to content

Commit bdf474e

Browse files
committed
fix the view tests
1 parent 2f5ba33 commit bdf474e

File tree

6 files changed

+98
-35
lines changed

6 files changed

+98
-35
lines changed

datafusion/core/src/datasource/view.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,19 @@
1919
2020
use std::{any::Any, sync::Arc};
2121

22-
use arrow::datatypes::SchemaRef;
23-
use async_trait::async_trait;
24-
use datafusion_catalog::Session;
25-
use datafusion_common::Column;
26-
use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
27-
2822
use crate::{
2923
error::Result,
3024
logical_expr::{Expr, LogicalPlan},
3125
physical_plan::ExecutionPlan,
3226
};
27+
use arrow::datatypes::SchemaRef;
28+
use async_trait::async_trait;
29+
use datafusion_catalog::Session;
30+
use datafusion_common::config::ConfigOptions;
31+
use datafusion_common::Column;
32+
use datafusion_expr::{LogicalPlanBuilder, TableProviderFilterPushDown};
33+
use datafusion_optimizer::analyzer::expand_wildcard_rule::ExpandWildcardRule;
34+
use datafusion_optimizer::Analyzer;
3335

3436
use crate::datasource::{TableProvider, TableType};
3537

@@ -50,6 +52,7 @@ impl ViewTable {
5052
logical_plan: LogicalPlan,
5153
definition: Option<String>,
5254
) -> Result<Self> {
55+
let logical_plan = Self::apply_required_rule(logical_plan)?;
5356
let table_schema = logical_plan.schema().as_ref().to_owned().into();
5457

5558
let view = Self {
@@ -61,6 +64,15 @@ impl ViewTable {
6164
Ok(view)
6265
}
6366

67+
fn apply_required_rule(logical_plan: LogicalPlan) -> Result<LogicalPlan> {
68+
let options = ConfigOptions::default();
69+
Analyzer::with_rules(vec![Arc::new(ExpandWildcardRule::new())]).execute_and_check(
70+
logical_plan,
71+
&options,
72+
|_, _| {},
73+
)
74+
}
75+
6476
/// Get definition ref
6577
pub fn definition(&self) -> Option<&String> {
6678
self.definition.as_ref()

datafusion/core/src/execution/context/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,6 @@ impl SessionContext {
712712
}
713713
(_, Err(_)) => {
714714
let table = Arc::new(ViewTable::try_new((*input).clone(), definition)?);
715-
716715
self.register_table(name, table)?;
717716
self.return_empty_dataframe()
718717
}

datafusion/expr/src/utils.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,7 @@ pub fn expand_qualified_wildcard(
420420
schema: &DFSchema,
421421
wildcard_options: Option<&WildcardAdditionalOptions>,
422422
) -> Result<Vec<Expr>> {
423+
dbg!(&schema);
423424
let qualified_indices = schema.fields_indices_with_qualified(qualifier);
424425
let projected_func_dependencies = schema
425426
.functional_dependencies()

datafusion/optimizer/src/analyzer/expand_wildcard_rule.rs

Lines changed: 76 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@
1818
use crate::AnalyzerRule;
1919
use datafusion_common::config::ConfigOptions;
2020
use datafusion_common::tree_node::{Transformed, TransformedResult};
21-
use datafusion_common::{plan_err, Result};
21+
use datafusion_common::{Column, DataFusionError, Result};
2222
use datafusion_expr::utils::{expand_qualified_wildcard, expand_wildcard};
23-
use datafusion_expr::{Expr, LogicalPlan, Projection};
23+
use datafusion_expr::{Expr, LogicalPlan, Projection, SubqueryAlias};
24+
use std::collections::hash_map::Entry;
2425
use std::collections::HashMap;
2526
use std::sync::Arc;
2627

@@ -37,15 +38,15 @@ impl AnalyzerRule for ExpandWildcardRule {
3738
fn analyze(&self, plan: LogicalPlan, _: &ConfigOptions) -> Result<LogicalPlan> {
3839
// Because the wildcard expansion is based on the schema of the input plan,
3940
// using `transform_up_with_subqueries` here.
40-
plan.transform_up_with_subqueries(analyzer_internal).data()
41+
plan.transform_up_with_subqueries(expand_internal).data()
4142
}
4243

4344
fn name(&self) -> &str {
4445
"expand_wildcard_rule"
4546
}
4647
}
4748

48-
fn analyzer_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
49+
fn expand_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
4950
match plan {
5051
LogicalPlan::Projection(Projection { expr, input, .. }) => {
5152
let mut projected_expr = vec![];
@@ -66,46 +67,76 @@ fn analyzer_internal(plan: LogicalPlan) -> Result<Transformed<LogicalPlan>> {
6667
)?);
6768
}
6869
}
70+
// A workaround to handle the case when the column name is "*".
71+
// We transform the expression to a Expr::Column through [Column::from_name] in many places.
72+
// It would also convert the wildcard expression to a column expression with name "*".
73+
Expr::Column(Column {
74+
ref relation,
75+
ref name,
76+
}) => {
77+
if name.eq("*") {
78+
if let Some(qualifier) = relation {
79+
projected_expr.extend(expand_qualified_wildcard(
80+
qualifier,
81+
input.schema(),
82+
None,
83+
)?);
84+
} else {
85+
projected_expr.extend(expand_wildcard(
86+
input.schema(),
87+
&input,
88+
None,
89+
)?);
90+
}
91+
} else {
92+
projected_expr.push(e.clone());
93+
}
94+
}
6995
_ => projected_expr.push(e),
7096
}
7197
}
72-
validate_unique_names("Projections", projected_expr.iter())?;
7398
Ok(Transformed::yes(
74-
Projection::try_new(projected_expr, Arc::clone(&input))
75-
.map(LogicalPlan::Projection)?,
99+
Projection::try_new(
100+
to_unique_names(projected_expr.iter())?,
101+
Arc::clone(&input),
102+
)
103+
.map(LogicalPlan::Projection)?,
104+
))
105+
}
106+
// Teh schema of the plan should also be updated if the child plan is transformed.
107+
LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => {
108+
Ok(Transformed::yes(
109+
SubqueryAlias::try_new(input, alias).map(LogicalPlan::SubqueryAlias)?,
76110
))
77111
}
78112
_ => Ok(Transformed::no(plan)),
79113
}
80114
}
81115

82-
fn validate_unique_names<'a>(
83-
node_name: &str,
116+
fn to_unique_names<'a>(
84117
expressions: impl IntoIterator<Item = &'a Expr>,
85-
) -> Result<()> {
118+
) -> Result<Vec<Expr>> {
86119
let mut unique_names = HashMap::new();
87-
88-
expressions.into_iter().enumerate().try_for_each(|(position, expr)| {
89-
let name = expr.display_name()?;
90-
match unique_names.get(&name) {
91-
None => {
92-
unique_names.insert(name, (position, expr));
93-
Ok(())
94-
},
95-
Some((existing_position, existing_expr)) => {
96-
plan_err!("{node_name} require unique expression names \
97-
but the expression \"{existing_expr}\" at position {existing_position} and \"{expr}\" \
98-
at position {position} have the same name. Consider aliasing (\"AS\") one of them."
99-
)
120+
let mut unique_expr = vec![];
121+
expressions
122+
.into_iter()
123+
.enumerate()
124+
.try_for_each(|(position, expr)| {
125+
let name = expr.display_name()?;
126+
if let Entry::Vacant(e) = unique_names.entry(name) {
127+
e.insert((position, expr));
128+
unique_expr.push(expr.to_owned());
100129
}
101-
}
102-
})
130+
Ok::<(), DataFusionError>(())
131+
})?;
132+
Ok(unique_expr)
103133
}
104134

105135
#[cfg(test)]
106136
mod tests {
107137
use super::*;
108138
use crate::test::{assert_analyzed_plan_eq_display_indent, test_table_scan};
139+
use crate::Analyzer;
109140
use datafusion_common::TableReference;
110141
use datafusion_expr::{
111142
col, in_subquery, qualified_wildcard, wildcard, LogicalPlanBuilder,
@@ -181,4 +212,24 @@ mod tests {
181212
\n TableScan: test [a:UInt32, b:UInt32, c:UInt32]";
182213
assert_plan_eq(plan, expected)
183214
}
215+
216+
#[test]
217+
fn test_subquery_schema() -> Result<()> {
218+
let analyzer = Analyzer::with_rules(vec![Arc::new(ExpandWildcardRule::new())]);
219+
let options = ConfigOptions::default();
220+
let subquery = LogicalPlanBuilder::from(test_table_scan()?)
221+
.project(vec![wildcard()])?
222+
.build()?;
223+
let plan = LogicalPlanBuilder::from(subquery)
224+
.alias("sub")?
225+
.project(vec![wildcard()])?
226+
.build()?;
227+
let analyzed_plan = analyzer.execute_and_check(plan, &options, |_, _| {})?;
228+
for x in analyzed_plan.inputs() {
229+
for field in x.schema().fields() {
230+
assert_ne!(field.name(), "*");
231+
}
232+
}
233+
Ok(())
234+
}
184235
}

datafusion/optimizer/src/analyzer/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,11 @@ impl Analyzer {
9191
pub fn new() -> Self {
9292
let rules: Vec<Arc<dyn AnalyzerRule + Send + Sync>> = vec![
9393
Arc::new(InlineTableScan::new()),
94+
// Every rule that will generate [Expr::Wildcard] should be placed in front of [ExpandWildcardRule].
95+
Arc::new(ExpandWildcardRule::new()),
96+
// [Expr::Wildcard] should be expanded before [TypeCoercion]
9497
Arc::new(TypeCoercion::new()),
9598
Arc::new(CountWildcardRule::new()),
96-
Arc::new(ExpandWildcardRule::new()),
9799
];
98100
Self::with_rules(rules)
99101
}

datafusion/optimizer/src/analyzer/type_coercion.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1348,7 +1348,6 @@ mod test {
13481348
.eq(cast(lit("1998-03-18"), DataType::Date32));
13491349
let empty = empty();
13501350
let plan = LogicalPlan::Projection(Projection::try_new(vec![expr], empty)?);
1351-
dbg!(&plan);
13521351
let expected =
13531352
"Projection: CAST(Utf8(\"1998-03-18\") AS Timestamp(Nanosecond, None)) = CAST(CAST(Utf8(\"1998-03-18\") AS Date32) AS Timestamp(Nanosecond, None))\n EmptyRelation";
13541353
assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan, expected)?;
@@ -1535,7 +1534,6 @@ mod test {
15351534
));
15361535
let empty = empty();
15371536
let plan = LogicalPlan::Projection(Projection::try_new(vec![expr], empty)?);
1538-
dbg!(&plan);
15391537
let expected =
15401538
"Projection: CAST(Utf8(\"1998-03-18\") AS Timestamp(Nanosecond, None)) - CAST(Utf8(\"1998-03-18\") AS Timestamp(Nanosecond, None))\n EmptyRelation";
15411539
assert_analyzed_plan_eq(Arc::new(TypeCoercion::new()), plan, expected)?;

0 commit comments

Comments
 (0)