Skip to content

Commit cfd4440

Browse files
committed
Add SessionContext::create_physical_expr() and SessionState::create_physical_expr()
1 parent 44d1dda commit cfd4440

File tree

4 files changed

+113
-29
lines changed

4 files changed

+113
-29
lines changed

datafusion-examples/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ cargo run --example csv_sql
6262
- [`memtable.rs`](examples/memtable.rs): Create an query data in memory using SQL and `RecordBatch`es
6363
- [`parquet_sql.rs`](examples/parquet_sql.rs): Build and run a query plan from a SQL statement against a local Parquet file
6464
- [`parquet_sql_multiple_files.rs`](examples/parquet_sql_multiple_files.rs): Build and run a query plan from a SQL statement against multiple local Parquet files
65-
- ['parquet_exec_visitor.rs'](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after execution
65+
- ['parquet_exec_visitor.rs'](examples/parquet_exec_visitor.rs): Extract statistics by visiting an ExecutionPlan after executionØo
6666
- [`pruning.rs`](examples/parquet_sql.rs): Use pruning to rule out files based on statistics
6767
- [`query-aws-s3.rs`](examples/external_dependency/query-aws-s3.rs): Configure `object_store` and run a query against files stored in AWS S3
6868
- [`query-http-csv.rs`](examples/query-http-csv.rs): Configure `object_store` and run a query against files vi HTTP

datafusion-examples/examples/expr_api.rs

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
2525
use datafusion::common::DFSchema;
2626
use datafusion::error::Result;
2727
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
28-
use datafusion::physical_expr::{
29-
analyze, create_physical_expr, AnalysisContext, ExprBoundaries, PhysicalExpr,
30-
};
28+
use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries};
3129
use datafusion::prelude::*;
3230
use datafusion_common::{ScalarValue, ToDFSchema};
3331
use datafusion_expr::execution_props::ExecutionProps;
@@ -92,7 +90,8 @@ fn evaluate_demo() -> Result<()> {
9290
let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));
9391

9492
// First, you make a "physical expression" from the logical `Expr`
95-
let physical_expr = physical_expr(&batch.schema(), expr)?;
93+
let df_schema = DFSchema::try_from(batch.schema())?;
94+
let physical_expr = SessionContext::new().create_physical_expr(&df_schema, expr)?;
9695

9796
// Now, you can evaluate the expression against the RecordBatch
9897
let result = physical_expr.evaluate(&batch)?;
@@ -213,7 +212,7 @@ fn range_analysis_demo() -> Result<()> {
213212
// `date < '2020-10-01' AND date > '2020-09-01'`
214213

215214
// As always, we need to tell DataFusion the type of column "date"
216-
let schema = Schema::new(vec![make_field("date", DataType::Date32)]);
215+
let schema = Arc::new(Schema::new(vec![make_field("date", DataType::Date32)]));
217216

218217
// You can provide DataFusion any known boundaries on the values of `date`
219218
// (for example, maybe you know you only have data up to `2020-09-15`), but
@@ -222,9 +221,13 @@ fn range_analysis_demo() -> Result<()> {
222221
let boundaries = ExprBoundaries::try_new_unbounded(&schema)?;
223222

224223
// Now, we invoke the analysis code to perform the range analysis
225-
let physical_expr = physical_expr(&schema, expr)?;
226-
let analysis_result =
227-
analyze(&physical_expr, AnalysisContext::new(boundaries), &schema)?;
224+
let df_schema = DFSchema::try_from(schema)?;
225+
let physical_expr = SessionContext::new().create_physical_expr(&df_schema, expr)?;
226+
let analysis_result = analyze(
227+
&physical_expr,
228+
AnalysisContext::new(boundaries),
229+
df_schema.as_ref(),
230+
)?;
228231

229232
// The results of the analysis is an range, encoded as an `Interval`, for
230233
// each column in the schema, that must be true in order for the predicate
@@ -248,21 +251,6 @@ fn make_ts_field(name: &str) -> Field {
248251
make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz))
249252
}
250253

251-
/// Build a physical expression from a logical one, after applying simplification and type coercion
252-
pub fn physical_expr(schema: &Schema, expr: Expr) -> Result<Arc<dyn PhysicalExpr>> {
253-
let df_schema = schema.clone().to_dfschema_ref()?;
254-
255-
// Simplify
256-
let props = ExecutionProps::new();
257-
let simplifier =
258-
ExprSimplifier::new(SimplifyContext::new(&props).with_schema(df_schema.clone()));
259-
260-
// apply type coercion here to ensure types match
261-
let expr = simplifier.coerce(expr, &df_schema)?;
262-
263-
create_physical_expr(&expr, df_schema.as_ref(), &props)
264-
}
265-
266254
/// This function shows how to use `Expr::get_type` to retrieve the DataType
267255
/// of an expression
268256
fn expression_type_demo() -> Result<()> {

datafusion/common/src/dfschema.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,20 @@ impl DFSchema {
125125
}
126126
}
127127

128+
/// Return a reference to the inner Arrow [`Schema`]
129+
///
130+
/// Note this does not have the qualifier information
131+
pub fn as_arrow(&self) -> &Schema {
132+
self.inner.as_ref()
133+
}
134+
135+
/// Return a reference to the inner Arrow [`SchemaRef`]
136+
///
137+
/// Note this does not have the qualifier information
138+
pub fn inner(&self) -> &SchemaRef {
139+
&self.inner
140+
}
141+
128142
/// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
129143
pub fn new_with_metadata(
130144
qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
@@ -806,6 +820,12 @@ impl From<&DFSchema> for Schema {
806820
}
807821
}
808822

823+
impl AsRef<Schema> for DFSchema {
824+
fn as_ref(&self) -> &Schema {
825+
self.as_arrow()
826+
}
827+
}
828+
809829
/// Create a `DFSchema` from an Arrow schema
810830
impl TryFrom<Schema> for DFSchema {
811831
type Error = DataFusionError;

datafusion/core/src/execution/context/mod.rs

Lines changed: 81 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,13 @@ use datafusion_common::{
7070
config::{ConfigExtension, TableOptions},
7171
exec_err, not_impl_err, plan_datafusion_err, plan_err,
7272
tree_node::{TreeNodeRecursion, TreeNodeVisitor},
73-
SchemaReference, TableReference,
73+
DFSchema, SchemaReference, TableReference,
7474
};
7575
use datafusion_execution::registry::SerializerRegistry;
7676
use datafusion_expr::{
7777
logical_plan::{DdlStatement, Statement},
7878
var_provider::is_system_variables,
79-
Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
79+
Expr, ExprSchemable, StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
8080
};
8181
use datafusion_sql::{
8282
parser::{CopyToSource, CopyToStatement, DFParser},
@@ -91,10 +91,14 @@ use sqlparser::dialect::dialect_from_str;
9191
use url::Url;
9292
use uuid::Uuid;
9393

94+
use crate::physical_expr::PhysicalExpr;
9495
pub use datafusion_execution::config::SessionConfig;
9596
pub use datafusion_execution::TaskContext;
9697
pub use datafusion_expr::execution_props::ExecutionProps;
9798
use datafusion_expr::expr_rewriter::FunctionRewrite;
99+
use datafusion_expr::simplify::SimplifyInfo;
100+
use datafusion_optimizer::simplify_expressions::ExprSimplifier;
101+
use datafusion_physical_expr::create_physical_expr;
98102

99103
mod avro;
100104
mod csv;
@@ -510,6 +514,18 @@ impl SessionContext {
510514
}
511515
}
512516

517+
/// Creates a [`PhysicalExpr`] from an [`Expr`] after applying
518+
///
519+
/// See [`SessionState::create_physical_expr`] for more details and
520+
/// examples.
521+
pub fn create_physical_expr(
522+
&self,
523+
df_schema: &DFSchema,
524+
expr: Expr,
525+
) -> Result<Arc<dyn PhysicalExpr>> {
526+
self.state.read().create_physical_expr(df_schema, expr)
527+
}
528+
513529
// return an empty dataframe
514530
fn return_empty_dataframe(&self) -> Result<DataFrame> {
515531
let plan = LogicalPlanBuilder::empty(false).build()?;
@@ -1320,6 +1336,7 @@ pub enum RegisterFunction {
13201336
/// Table user defined function
13211337
Table(String, Arc<dyn TableFunctionImpl>),
13221338
}
1339+
13231340
/// Execution context for registering data sources and executing queries.
13241341
/// See [`SessionContext`] for a higher level API.
13251342
///
@@ -1930,13 +1947,14 @@ impl SessionState {
19301947
}
19311948
}
19321949

1933-
/// Creates a physical plan from a logical plan.
1950+
/// Creates a physical [`ExecutionPlan`] plan from a [`LogicalPlan`].
19341951
///
19351952
/// Note: this first calls [`Self::optimize`] on the provided
19361953
/// plan.
19371954
///
1938-
/// This function will error for [`LogicalPlan`]s such as catalog
1939-
/// DDL `CREATE TABLE` must be handled by another layer.
1955+
/// This function will error for [`LogicalPlan`]s such as catalog DDL like
1956+
/// `CREATE TABLE`, which do not have corresponding physical plans and must
1957+
/// be handled by another layer, typically [`SessionContext`].
19401958
pub async fn create_physical_plan(
19411959
&self,
19421960
logical_plan: &LogicalPlan,
@@ -1947,6 +1965,35 @@ impl SessionState {
19471965
.await
19481966
}
19491967

1968+
/// Creates a [`PhysicalExpr`] from an [`Expr`] after applying type
1969+
/// coercion, and function rewrites.
1970+
///
1971+
/// Note that no simplification (TODO link) is applied.
1972+
///
1973+
/// TODO links to coercsion, simplificiation, and rewrites
1974+
///
1975+
/// # Example:
1976+
/// ```
1977+
///TODO
1978+
/// ```
1979+
pub fn create_physical_expr(
1980+
&self,
1981+
// todo make this schema
1982+
df_schema: &DFSchema,
1983+
expr: Expr,
1984+
) -> Result<Arc<dyn PhysicalExpr>> {
1985+
// Simplify
1986+
let simplifier =
1987+
ExprSimplifier::new(SessionSimpifyProvider::new(self, df_schema));
1988+
1989+
// apply type coercion here to ensure types match
1990+
let expr = simplifier.coerce(expr, df_schema)?;
1991+
// TODO should we also simplify the expression?
1992+
// simplifier.simplify()
1993+
1994+
create_physical_expr(&expr, df_schema, self.execution_props())
1995+
}
1996+
19501997
/// Return the session ID
19511998
pub fn session_id(&self) -> &str {
19521999
&self.session_id
@@ -2024,6 +2071,35 @@ impl SessionState {
20242071
}
20252072
}
20262073

2074+
struct SessionSimpifyProvider<'a> {
2075+
state: &'a SessionState,
2076+
df_schema: &'a DFSchema,
2077+
}
2078+
2079+
impl<'a> SessionSimpifyProvider<'a> {
2080+
fn new(state: &'a SessionState, df_schema: &'a DFSchema) -> Self {
2081+
Self { state, df_schema }
2082+
}
2083+
}
2084+
2085+
impl<'a> SimplifyInfo for SessionSimpifyProvider<'a> {
2086+
fn is_boolean_type(&self, expr: &Expr) -> Result<bool> {
2087+
Ok(expr.get_type(self.df_schema)? == DataType::Boolean)
2088+
}
2089+
2090+
fn nullable(&self, expr: &Expr) -> Result<bool> {
2091+
expr.nullable(self.df_schema)
2092+
}
2093+
2094+
fn execution_props(&self) -> &ExecutionProps {
2095+
self.state.execution_props()
2096+
}
2097+
2098+
fn get_data_type(&self, expr: &Expr) -> Result<DataType> {
2099+
expr.get_type(self.df_schema)
2100+
}
2101+
}
2102+
20272103
struct SessionContextProvider<'a> {
20282104
state: &'a SessionState,
20292105
tables: HashMap<String, Arc<dyn TableSource>>,

0 commit comments

Comments
 (0)