-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add SessionContext
/SessionState::create_physical_expr()
to create PhysicalExpressions
from Expr
s
#10330
Add SessionContext
/SessionState::create_physical_expr()
to create PhysicalExpressions
from Expr
s
#10330
Changes from 2 commits
37bb5dc
e896763
88d0545
46880f6
95d6739
77b27cc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,9 +25,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; | |
use datafusion::common::DFSchema; | ||
use datafusion::error::Result; | ||
use datafusion::optimizer::simplify_expressions::ExprSimplifier; | ||
use datafusion::physical_expr::{ | ||
analyze, create_physical_expr, AnalysisContext, ExprBoundaries, PhysicalExpr, | ||
}; | ||
use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries}; | ||
use datafusion::prelude::*; | ||
use datafusion_common::{ScalarValue, ToDFSchema}; | ||
use datafusion_expr::execution_props::ExecutionProps; | ||
|
@@ -92,7 +90,8 @@ fn evaluate_demo() -> Result<()> { | |
let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8))); | ||
|
||
// First, you make a "physical expression" from the logical `Expr` | ||
let physical_expr = physical_expr(&batch.schema(), expr)?; | ||
let df_schema = DFSchema::try_from(batch.schema())?; | ||
let physical_expr = SessionContext::new().create_physical_expr(expr, &df_schema)?; | ||
|
||
// Now, you can evaluate the expression against the RecordBatch | ||
let result = physical_expr.evaluate(&batch)?; | ||
|
@@ -213,7 +212,7 @@ fn range_analysis_demo() -> Result<()> { | |
// `date < '2020-10-01' AND date > '2020-09-01'` | ||
|
||
// As always, we need to tell DataFusion the type of column "date" | ||
let schema = Schema::new(vec![make_field("date", DataType::Date32)]); | ||
let schema = Arc::new(Schema::new(vec![make_field("date", DataType::Date32)])); | ||
|
||
// You can provide DataFusion any known boundaries on the values of `date` | ||
// (for example, maybe you know you only have data up to `2020-09-15`), but | ||
|
@@ -222,9 +221,13 @@ fn range_analysis_demo() -> Result<()> { | |
let boundaries = ExprBoundaries::try_new_unbounded(&schema)?; | ||
|
||
// Now, we invoke the analysis code to perform the range analysis | ||
let physical_expr = physical_expr(&schema, expr)?; | ||
let analysis_result = | ||
analyze(&physical_expr, AnalysisContext::new(boundaries), &schema)?; | ||
let df_schema = DFSchema::try_from(schema)?; | ||
let physical_expr = SessionContext::new().create_physical_expr(expr, &df_schema)?; | ||
let analysis_result = analyze( | ||
&physical_expr, | ||
AnalysisContext::new(boundaries), | ||
df_schema.as_ref(), | ||
)?; | ||
|
||
// The results of the analysis is an range, encoded as an `Interval`, for | ||
// each column in the schema, that must be true in order for the predicate | ||
|
@@ -248,21 +251,6 @@ fn make_ts_field(name: &str) -> Field { | |
make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz)) | ||
} | ||
|
||
/// Build a physical expression from a logical one, after applying simplification and type coercion | ||
pub fn physical_expr(schema: &Schema, expr: Expr) -> Result<Arc<dyn PhysicalExpr>> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This PR basically ports this code out of the examples and into |
||
let df_schema = schema.clone().to_dfschema_ref()?; | ||
|
||
// Simplify | ||
let props = ExecutionProps::new(); | ||
let simplifier = | ||
ExprSimplifier::new(SimplifyContext::new(&props).with_schema(df_schema.clone())); | ||
|
||
// apply type coercion here to ensure types match | ||
let expr = simplifier.coerce(expr, df_schema.clone())?; | ||
|
||
create_physical_expr(&expr, df_schema.as_ref(), &props) | ||
} | ||
|
||
/// This function shows how to use `Expr::get_type` to retrieve the DataType | ||
/// of an expression | ||
fn expression_type_demo() -> Result<()> { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -125,6 +125,20 @@ impl DFSchema { | |
} | ||
} | ||
|
||
/// Return a reference to the inner Arrow [`Schema`] | ||
/// | ||
/// Note this does not have the qualifier information | ||
pub fn as_arrow(&self) -> &Schema { | ||
self.inner.as_ref() | ||
} | ||
|
||
/// Return a reference to the inner Arrow [`SchemaRef`] | ||
/// | ||
/// Note this does not have the qualifier information | ||
pub fn inner(&self) -> &SchemaRef { | ||
&self.inner | ||
} | ||
|
||
/// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier | ||
pub fn new_with_metadata( | ||
qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>, | ||
|
@@ -806,6 +820,21 @@ impl From<&DFSchema> for Schema { | |
} | ||
} | ||
|
||
/// Allow DFSchema to be converted into an Arrow `&Schema` | ||
impl AsRef<Schema> for DFSchema { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This allows DFSchema to be treated like a &Schema, which is now possible after #9595 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤯 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Very nice! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, #9595 was epic -- kudos to @haohuaijin and @matthewmturner for making that happen |
||
fn as_ref(&self) -> &Schema { | ||
self.as_arrow() | ||
} | ||
} | ||
|
||
/// Allow DFSchema to be converted into an Arrow `&SchemaRef` (to clone, for | ||
/// example) | ||
impl AsRef<SchemaRef> for DFSchema { | ||
fn as_ref(&self) -> &SchemaRef { | ||
self.inner() | ||
} | ||
} | ||
|
||
/// Create a `DFSchema` from an Arrow schema | ||
impl TryFrom<Schema> for DFSchema { | ||
type Error = DataFusionError; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -70,13 +70,13 @@ use datafusion_common::{ | |
config::{ConfigExtension, TableOptions}, | ||
exec_err, not_impl_err, plan_datafusion_err, plan_err, | ||
tree_node::{TreeNodeRecursion, TreeNodeVisitor}, | ||
SchemaReference, TableReference, | ||
DFSchema, SchemaReference, TableReference, | ||
}; | ||
use datafusion_execution::registry::SerializerRegistry; | ||
use datafusion_expr::{ | ||
logical_plan::{DdlStatement, Statement}, | ||
var_provider::is_system_variables, | ||
Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF, | ||
Expr, ExprSchemable, StringifiedPlan, UserDefinedLogicalNode, WindowUDF, | ||
}; | ||
use datafusion_sql::{ | ||
parser::{CopyToSource, CopyToStatement, DFParser}, | ||
|
@@ -86,15 +86,20 @@ use datafusion_sql::{ | |
|
||
use async_trait::async_trait; | ||
use chrono::{DateTime, Utc}; | ||
use datafusion_common::tree_node::TreeNode; | ||
use parking_lot::RwLock; | ||
use sqlparser::dialect::dialect_from_str; | ||
use url::Url; | ||
use uuid::Uuid; | ||
|
||
use crate::physical_expr::PhysicalExpr; | ||
pub use datafusion_execution::config::SessionConfig; | ||
pub use datafusion_execution::TaskContext; | ||
pub use datafusion_expr::execution_props::ExecutionProps; | ||
use datafusion_expr::expr_rewriter::FunctionRewrite; | ||
use datafusion_expr::simplify::SimplifyInfo; | ||
use datafusion_optimizer::simplify_expressions::ExprSimplifier; | ||
use datafusion_physical_expr::create_physical_expr; | ||
|
||
mod avro; | ||
mod csv; | ||
|
@@ -510,6 +515,34 @@ impl SessionContext { | |
} | ||
} | ||
|
||
/// Create a [`PhysicalExpr`] from an [`Expr`] after applying type | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. New public API with example |
||
/// coercion, and function rewrites. | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
/// | ||
/// # Example | ||
/// ``` | ||
/// # use std::sync::Arc; | ||
/// # use arrow::datatypes::{DataType, Field, Schema}; | ||
/// # use datafusion::prelude::*; | ||
/// # use datafusion_common::DFSchema; | ||
/// // a = 1 (i64) | ||
/// let expr = col("a").eq(lit(1i64)); | ||
/// // provide type information that `a` is an Int32 | ||
/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]); | ||
/// let df_schema = DFSchema::try_from(schema).unwrap(); | ||
/// // Create a PhysicalExpr. Note DataFusion automatically coerces (casts) `1i64` to `1i32` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice, we have some (probably duplicated) type coercion logic on our side we might be able to replace. |
||
/// let physical_expr = SessionContext::new() | ||
/// .create_physical_expr(expr, &df_schema).unwrap(); | ||
/// ``` | ||
/// # See Also | ||
/// * [`SessionState::create_physical_expr`] for a lower level API | ||
pub fn create_physical_expr( | ||
&self, | ||
expr: Expr, | ||
df_schema: &DFSchema, | ||
) -> Result<Arc<dyn PhysicalExpr>> { | ||
self.state.read().create_physical_expr(expr, df_schema) | ||
} | ||
|
||
// return an empty dataframe | ||
fn return_empty_dataframe(&self) -> Result<DataFrame> { | ||
let plan = LogicalPlanBuilder::empty(false).build()?; | ||
|
@@ -1320,6 +1353,7 @@ pub enum RegisterFunction { | |
/// Table user defined function | ||
Table(String, Arc<dyn TableFunctionImpl>), | ||
} | ||
|
||
/// Execution context for registering data sources and executing queries. | ||
/// See [`SessionContext`] for a higher level API. | ||
/// | ||
|
@@ -1930,13 +1964,14 @@ impl SessionState { | |
} | ||
} | ||
|
||
/// Creates a physical plan from a logical plan. | ||
/// Creates a physical [`ExecutionPlan`] plan from a [`LogicalPlan`]. | ||
/// | ||
/// Note: this first calls [`Self::optimize`] on the provided | ||
/// plan. | ||
/// | ||
/// This function will error for [`LogicalPlan`]s such as catalog | ||
/// DDL `CREATE TABLE` must be handled by another layer. | ||
/// This function will error for [`LogicalPlan`]s such as catalog DDL like | ||
/// `CREATE TABLE`, which do not have corresponding physical plans and must | ||
/// be handled by another layer, typically [`SessionContext`]. | ||
pub async fn create_physical_plan( | ||
&self, | ||
logical_plan: &LogicalPlan, | ||
|
@@ -1947,6 +1982,36 @@ impl SessionState { | |
.await | ||
} | ||
|
||
/// Create a [`PhysicalExpr`] from an [`Expr`] after applying type | ||
/// coercion, and function rewrites. | ||
/// | ||
/// Note: The expression is not [simplified] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My rationale was that simplification is substantially more expensive due to the rewrites required and is not strictly necessary for evaluation, so I felt not simplifying would be less surprising Also, if we are going to do simplification, it might also be reasonable to ask "why not other optimization like comparison cast unwrapping" too I think it is important to apply coercion as is very hard to create the right expression to get the type exactly aligned resulting in hard to fix errors. Same thing for this function rewriting. I'll add this context to the comments in this PR |
||
/// | ||
/// # See Also: | ||
/// * [`SessionContext::create_physical_expr`] for a higher-level API | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is somewaht unrelated to this PR but It's news to me that "SessionContext" is higher level than "SessionState". The only comparison between the two that I can find is https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#sessioncontext-sessionstate-and-taskcontext which says:
From this it is not obvious that a "SessionContext" contains a "SessionState". It's also very confusing when I would use "SessionContext" and when I would use "SessionState" in my user application. Currently, I've been going off the philosophy of "if an API method needs a SessionState I'd better create one and if an API method needs a SessionContext I'd better create one." Some kind of long form design documentation on these two types would be interesting (or if something already exists that I've missed or is in a blog post somewhere that would be cool too). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree this is confusing -- I will write up some documentation about this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Made #10350 to hopefully clarify |
||
/// * [`create_physical_expr`] for a lower-level API | ||
/// | ||
/// [simplified]: datafusion_optimizer::simplify_expressions | ||
pub fn create_physical_expr( | ||
&self, | ||
expr: Expr, | ||
df_schema: &DFSchema, | ||
) -> Result<Arc<dyn PhysicalExpr>> { | ||
let simplifier = | ||
ExprSimplifier::new(SessionSimpifyProvider::new(self, df_schema)); | ||
// apply type coercion here to ensure types match | ||
let mut expr = simplifier.coerce(expr, df_schema)?; | ||
|
||
// rewrite Exprs to functions if necessary | ||
let config_options = self.config_options(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Applying these rewrites is a new feature (and what actually closes #10181) |
||
for rewrite in self.analyzer.function_rewrites() { | ||
expr = expr | ||
.transform_up(|expr| rewrite.rewrite(expr, df_schema, config_options))? | ||
.data; | ||
} | ||
create_physical_expr(&expr, df_schema, self.execution_props()) | ||
} | ||
|
||
/// Return the session ID | ||
pub fn session_id(&self) -> &str { | ||
&self.session_id | ||
|
@@ -2024,6 +2089,35 @@ impl SessionState { | |
} | ||
} | ||
|
||
struct SessionSimpifyProvider<'a> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This avoids cloning schema / schema refs |
||
state: &'a SessionState, | ||
df_schema: &'a DFSchema, | ||
} | ||
|
||
impl<'a> SessionSimpifyProvider<'a> { | ||
alamb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
fn new(state: &'a SessionState, df_schema: &'a DFSchema) -> Self { | ||
Self { state, df_schema } | ||
} | ||
} | ||
|
||
impl<'a> SimplifyInfo for SessionSimpifyProvider<'a> { | ||
fn is_boolean_type(&self, expr: &Expr) -> Result<bool> { | ||
Ok(expr.get_type(self.df_schema)? == DataType::Boolean) | ||
} | ||
|
||
fn nullable(&self, expr: &Expr) -> Result<bool> { | ||
expr.nullable(self.df_schema) | ||
} | ||
|
||
fn execution_props(&self) -> &ExecutionProps { | ||
self.state.execution_props() | ||
} | ||
|
||
fn get_data_type(&self, expr: &Expr) -> Result<DataType> { | ||
expr.get_type(self.df_schema) | ||
} | ||
} | ||
|
||
struct SessionContextProvider<'a> { | ||
state: &'a SessionState, | ||
tables: HashMap<String, Arc<dyn TableSource>>, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This shows how the new API is used -
SessionContext::new().create_physical_expr