Skip to content

Commit 92110dd

Browse files
authored
Add assertion for invariant in create_physical_expression and fix ViewTable projection (#3242)
1 parent 82da46d commit 92110dd

File tree

4 files changed

+69
-13
lines changed

4 files changed

+69
-13
lines changed

datafusion/core/src/datasource/view.rs

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::{any::Any, sync::Arc};
2121

2222
use arrow::datatypes::SchemaRef;
2323
use async_trait::async_trait;
24+
use datafusion_expr::LogicalPlanBuilder;
2425

2526
use crate::{
2627
error::Result,
@@ -81,14 +82,36 @@ impl TableProvider for ViewTable {
8182
async fn scan(
8283
&self,
8384
state: &SessionState,
84-
_projection: &Option<Vec<usize>>,
85+
projection: &Option<Vec<usize>>,
8586
_filters: &[Expr],
8687
_limit: Option<usize>,
8788
) -> Result<Arc<dyn ExecutionPlan>> {
8889
// clone state and start_execution so that now() works in views
8990
let mut state_cloned = state.clone();
9091
state_cloned.execution_props.start_execution();
91-
state_cloned.create_physical_plan(&self.logical_plan).await
92+
if let Some(projection) = projection {
93+
// avoiding adding a redundant projection (e.g. SELECT * FROM view)
94+
let current_projection =
95+
(0..self.logical_plan.schema().fields().len()).collect::<Vec<usize>>();
96+
if projection == &current_projection {
97+
state_cloned.create_physical_plan(&self.logical_plan).await
98+
} else {
99+
let fields: Vec<Expr> = projection
100+
.iter()
101+
.map(|i| {
102+
Expr::Column(
103+
self.logical_plan.schema().field(*i).qualified_column(),
104+
)
105+
})
106+
.collect();
107+
let plan = LogicalPlanBuilder::from(self.logical_plan.clone())
108+
.project(fields)?
109+
.build()?;
110+
state_cloned.create_physical_plan(&plan).await
111+
}
112+
} else {
113+
state_cloned.create_physical_plan(&self.logical_plan).await
114+
}
92115
}
93116
}
94117

@@ -99,6 +122,32 @@ mod tests {
99122

100123
use super::*;
101124

125+
#[tokio::test]
126+
async fn issue_3242() -> Result<()> {
127+
// regression test for https://github.com/apache/arrow-datafusion/pull/3242
128+
let session_ctx = SessionContext::with_config(
129+
SessionConfig::new().with_information_schema(true),
130+
);
131+
132+
session_ctx
133+
.sql("create view v as select 1 as a, 2 as b, 3 as c")
134+
.await?
135+
.collect()
136+
.await?;
137+
138+
let results = session_ctx
139+
.sql("select * from (select b from v)")
140+
.await?
141+
.collect()
142+
.await?;
143+
144+
let expected = vec!["+---+", "| b |", "+---+", "| 2 |", "+---+"];
145+
146+
assert_batches_eq!(expected, &results);
147+
148+
Ok(())
149+
}
150+
102151
#[tokio::test]
103152
async fn create_view_return_empty_dataframe() -> Result<()> {
104153
let session_ctx = SessionContext::new();

datafusion/optimizer/src/expr_simplifier.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ impl ExprSimplifiable for Expr {
8888
/// ```
8989
fn simplify<S: SimplifyInfo>(self, info: &S) -> Result<Self> {
9090
let mut rewriter = Simplifier::new(info);
91-
let mut const_evaluator = ConstEvaluator::new(info.execution_props());
91+
let mut const_evaluator = ConstEvaluator::try_new(info.execution_props())?;
9292

9393
// TODO iterate until no changes are made during rewrite
9494
// (evaluating constants can enable new simplifications and

datafusion/optimizer/src/simplify_expressions.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ impl SimplifyExpressions {
332332
/// # use datafusion_expr::expr_rewriter::ExprRewritable;
333333
///
334334
/// let execution_props = ExecutionProps::new();
335-
/// let mut const_evaluator = ConstEvaluator::new(&execution_props);
335+
/// let mut const_evaluator = ConstEvaluator::try_new(&execution_props).unwrap();
336336
///
337337
/// // (1 + 2) + a
338338
/// let expr = (lit(1) + lit(2)) + col("a");
@@ -403,25 +403,23 @@ impl<'a> ConstEvaluator<'a> {
403403
/// Create a new `ConstantEvaluator`. Session constants (such as
404404
/// the time for `now()` are taken from the passed
405405
/// `execution_props`.
406-
pub fn new(execution_props: &'a ExecutionProps) -> Self {
407-
let input_schema = DFSchema::empty();
408-
406+
pub fn try_new(execution_props: &'a ExecutionProps) -> Result<Self> {
409407
// The dummy column name is unused and doesn't matter as only
410408
// expressions without column references can be evaluated
411409
static DUMMY_COL_NAME: &str = ".";
412410
let schema = Schema::new(vec![Field::new(DUMMY_COL_NAME, DataType::Null, true)]);
411+
let input_schema = DFSchema::try_from(schema.clone())?;
413412

414413
// Need a single "input" row to produce a single output row
415414
let col = new_null_array(&DataType::Null, 1);
416-
let input_batch =
417-
RecordBatch::try_new(std::sync::Arc::new(schema), vec![col]).unwrap();
415+
let input_batch = RecordBatch::try_new(std::sync::Arc::new(schema), vec![col])?;
418416

419-
Self {
417+
Ok(Self {
420418
can_evaluate: vec![],
421419
execution_props,
422420
input_schema,
423421
input_batch,
424-
}
422+
})
425423
}
426424

427425
/// Can a function of the specified volatility be evaluated?
@@ -1273,7 +1271,7 @@ mod tests {
12731271
var_providers: None,
12741272
};
12751273

1276-
let mut const_evaluator = ConstEvaluator::new(&execution_props);
1274+
let mut const_evaluator = ConstEvaluator::try_new(&execution_props).unwrap();
12771275
let evaluated_expr = input_expr
12781276
.clone()
12791277
.rewrite(&mut const_evaluator)

datafusion/physical-expr/src/planner.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,22 @@ use datafusion_expr::binary_rule::comparison_coercion;
3232
use datafusion_expr::{Expr, Operator};
3333
use std::sync::Arc;
3434

35-
/// Create a physical expression from a logical expression ([Expr])
35+
/// Create a physical expression from a logical expression ([Expr]).
36+
///
37+
/// # Arguments
38+
///
39+
/// * `e` - The logical expression
40+
/// * `input_dfschema` - The DataFusion schema for the input, used to resolve `Column` references
41+
/// to qualified or unqualified fields by name.
42+
/// * `input_schema` - The Arrow schema for the input, used for determining expression data types
43+
/// when performing type coercion.
3644
pub fn create_physical_expr(
3745
e: &Expr,
3846
input_dfschema: &DFSchema,
3947
input_schema: &Schema,
4048
execution_props: &ExecutionProps,
4149
) -> Result<Arc<dyn PhysicalExpr>> {
50+
assert_eq!(input_schema.fields.len(), input_dfschema.fields().len());
4251
match e {
4352
Expr::Alias(expr, ..) => Ok(create_physical_expr(
4453
expr,

0 commit comments

Comments
 (0)