Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SessionContext/SessionState::create_physical_expr() to create PhysicalExpressions from Exprs #10330

Merged
merged 6 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 11 additions & 23 deletions datafusion-examples/examples/expr_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use datafusion::common::DFSchema;
use datafusion::error::Result;
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
use datafusion::physical_expr::{
analyze, create_physical_expr, AnalysisContext, ExprBoundaries, PhysicalExpr,
};
use datafusion::physical_expr::{analyze, AnalysisContext, ExprBoundaries};
use datafusion::prelude::*;
use datafusion_common::{ScalarValue, ToDFSchema};
use datafusion_expr::execution_props::ExecutionProps;
Expand Down Expand Up @@ -92,7 +90,8 @@ fn evaluate_demo() -> Result<()> {
let expr = col("a").lt(lit(5)).or(col("a").eq(lit(8)));

// First, you make a "physical expression" from the logical `Expr`
let physical_expr = physical_expr(&batch.schema(), expr)?;
let df_schema = DFSchema::try_from(batch.schema())?;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shows how the new API is used - SessionContext::new().create_physical_expr

let physical_expr = SessionContext::new().create_physical_expr(expr, &df_schema)?;

// Now, you can evaluate the expression against the RecordBatch
let result = physical_expr.evaluate(&batch)?;
Expand Down Expand Up @@ -213,7 +212,7 @@ fn range_analysis_demo() -> Result<()> {
// `date < '2020-10-01' AND date > '2020-09-01'`

// As always, we need to tell DataFusion the type of column "date"
let schema = Schema::new(vec![make_field("date", DataType::Date32)]);
let schema = Arc::new(Schema::new(vec![make_field("date", DataType::Date32)]));

// You can provide DataFusion any known boundaries on the values of `date`
// (for example, maybe you know you only have data up to `2020-09-15`), but
Expand All @@ -222,9 +221,13 @@ fn range_analysis_demo() -> Result<()> {
let boundaries = ExprBoundaries::try_new_unbounded(&schema)?;

// Now, we invoke the analysis code to perform the range analysis
let physical_expr = physical_expr(&schema, expr)?;
let analysis_result =
analyze(&physical_expr, AnalysisContext::new(boundaries), &schema)?;
let df_schema = DFSchema::try_from(schema)?;
let physical_expr = SessionContext::new().create_physical_expr(expr, &df_schema)?;
let analysis_result = analyze(
&physical_expr,
AnalysisContext::new(boundaries),
df_schema.as_ref(),
)?;

// The results of the analysis is an range, encoded as an `Interval`, for
// each column in the schema, that must be true in order for the predicate
Expand All @@ -248,21 +251,6 @@ fn make_ts_field(name: &str) -> Field {
make_field(name, DataType::Timestamp(TimeUnit::Nanosecond, tz))
}

/// Build a physical expression from a logical one, after applying simplification and type coercion
pub fn physical_expr(schema: &Schema, expr: Expr) -> Result<Arc<dyn PhysicalExpr>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR basically ports this code out of the examples and into SessionState and adds documentation and tests

let df_schema = schema.clone().to_dfschema_ref()?;

// Simplify
let props = ExecutionProps::new();
let simplifier =
ExprSimplifier::new(SimplifyContext::new(&props).with_schema(df_schema.clone()));

// apply type coercion here to ensure types match
let expr = simplifier.coerce(expr, df_schema.clone())?;

create_physical_expr(&expr, df_schema.as_ref(), &props)
}

/// This function shows how to use `Expr::get_type` to retrieve the DataType
/// of an expression
fn expression_type_demo() -> Result<()> {
Expand Down
29 changes: 29 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,20 @@ impl DFSchema {
}
}

/// Return a reference to the inner Arrow [`Schema`]
///
/// Note this does not have the qualifier information
pub fn as_arrow(&self) -> &Schema {
self.inner.as_ref()
}

/// Return a reference to the inner Arrow [`SchemaRef`]
///
/// Note this does not have the qualifier information
pub fn inner(&self) -> &SchemaRef {
&self.inner
}

/// Create a `DFSchema` from an Arrow schema where all the fields have a given qualifier
pub fn new_with_metadata(
qualified_fields: Vec<(Option<TableReference>, Arc<Field>)>,
Expand Down Expand Up @@ -806,6 +820,21 @@ impl From<&DFSchema> for Schema {
}
}

/// Allow DFSchema to be converted into an Arrow `&Schema`
impl AsRef<Schema> for DFSchema {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows DFSchema to be treated like a &Schema, which is now possible after #9595

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤯

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice!

Copy link
Contributor Author

@alamb alamb May 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, #9595 was epic -- kudos to @haohuaijin and @matthewmturner for making that happen

fn as_ref(&self) -> &Schema {
self.as_arrow()
}
}

/// Allow DFSchema to be converted into an Arrow `&SchemaRef` (to clone, for
/// example)
impl AsRef<SchemaRef> for DFSchema {
fn as_ref(&self) -> &SchemaRef {
self.inner()
}
}

/// Create a `DFSchema` from an Arrow schema
impl TryFrom<Schema> for DFSchema {
type Error = DataFusionError;
Expand Down
104 changes: 99 additions & 5 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ use datafusion_common::{
config::{ConfigExtension, TableOptions},
exec_err, not_impl_err, plan_datafusion_err, plan_err,
tree_node::{TreeNodeRecursion, TreeNodeVisitor},
SchemaReference, TableReference,
DFSchema, SchemaReference, TableReference,
};
use datafusion_execution::registry::SerializerRegistry;
use datafusion_expr::{
logical_plan::{DdlStatement, Statement},
var_provider::is_system_variables,
Expr, StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
Expr, ExprSchemable, StringifiedPlan, UserDefinedLogicalNode, WindowUDF,
};
use datafusion_sql::{
parser::{CopyToSource, CopyToStatement, DFParser},
Expand All @@ -86,15 +86,20 @@ use datafusion_sql::{

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion_common::tree_node::TreeNode;
use parking_lot::RwLock;
use sqlparser::dialect::dialect_from_str;
use url::Url;
use uuid::Uuid;

use crate::physical_expr::PhysicalExpr;
pub use datafusion_execution::config::SessionConfig;
pub use datafusion_execution::TaskContext;
pub use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::expr_rewriter::FunctionRewrite;
use datafusion_expr::simplify::SimplifyInfo;
use datafusion_optimizer::simplify_expressions::ExprSimplifier;
use datafusion_physical_expr::create_physical_expr;

mod avro;
mod csv;
Expand Down Expand Up @@ -510,6 +515,34 @@ impl SessionContext {
}
}

/// Create a [`PhysicalExpr`] from an [`Expr`] after applying type
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New public API with example

/// coercion, and function rewrites.
alamb marked this conversation as resolved.
Show resolved Hide resolved
///
/// # Example
/// ```
/// # use std::sync::Arc;
/// # use arrow::datatypes::{DataType, Field, Schema};
/// # use datafusion::prelude::*;
/// # use datafusion_common::DFSchema;
/// // a = 1 (i64)
/// let expr = col("a").eq(lit(1i64));
/// // provide type information that `a` is an Int32
/// let schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
/// let df_schema = DFSchema::try_from(schema).unwrap();
/// // Create a PhysicalExpr. Note DataFusion automatically coerces (casts) `1i64` to `1i32`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, we have some (probably duplicated) type coercion logic on our side we might be able to replace.

/// let physical_expr = SessionContext::new()
/// .create_physical_expr(expr, &df_schema).unwrap();
/// ```
/// # See Also
/// * [`SessionState::create_physical_expr`] for a lower level API
pub fn create_physical_expr(
&self,
expr: Expr,
df_schema: &DFSchema,
) -> Result<Arc<dyn PhysicalExpr>> {
self.state.read().create_physical_expr(expr, df_schema)
}

// return an empty dataframe
fn return_empty_dataframe(&self) -> Result<DataFrame> {
let plan = LogicalPlanBuilder::empty(false).build()?;
Expand Down Expand Up @@ -1320,6 +1353,7 @@ pub enum RegisterFunction {
/// Table user defined function
Table(String, Arc<dyn TableFunctionImpl>),
}

/// Execution context for registering data sources and executing queries.
/// See [`SessionContext`] for a higher level API.
///
Expand Down Expand Up @@ -1930,13 +1964,14 @@ impl SessionState {
}
}

/// Creates a physical plan from a logical plan.
/// Creates a physical [`ExecutionPlan`] plan from a [`LogicalPlan`].
///
/// Note: this first calls [`Self::optimize`] on the provided
/// plan.
///
/// This function will error for [`LogicalPlan`]s such as catalog
/// DDL `CREATE TABLE` must be handled by another layer.
/// This function will error for [`LogicalPlan`]s such as catalog DDL like
/// `CREATE TABLE`, which do not have corresponding physical plans and must
/// be handled by another layer, typically [`SessionContext`].
pub async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
Expand All @@ -1947,6 +1982,36 @@ impl SessionState {
.await
}

/// Create a [`PhysicalExpr`] from an [`Expr`] after applying type
/// coercion, and function rewrites.
///
/// Note: The expression is not [simplified]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My rationale was that simplification is substantially more expensive due to the rewrites required and is not strictly necessary for evaluation, so I felt not simplifying would be less surprising

Also, if we are going to do simplification, it might also be reasonable to ask "why not other optimization like comparison cast unwrapping" too

I think it is important to apply coercion as is very hard to create the right expression to get the type exactly aligned resulting in hard to fix errors. Same thing for this function rewriting.

I'll add this context to the comments in this PR

///
/// # See Also:
/// * [`SessionContext::create_physical_expr`] for a higher-level API
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is somewaht unrelated to this PR but It's news to me that "SessionContext" is higher level than "SessionState". The only comparison between the two that I can find is https://docs.rs/datafusion/latest/datafusion/execution/context/struct.SessionContext.html#sessioncontext-sessionstate-and-taskcontext which says:

A SessionContext can be created from a SessionConfig and stores the state for a particular query session. A single SessionContext can run multiple queries.

SessionState contains information available during query planning (creating LogicalPlans and ExecutionPlans).

From this it is not obvious that a "SessionContext" contains a "SessionState". It's also very confusing when I would use "SessionContext" and when I would use "SessionState" in my user application. Currently, I've been going off the philosophy of "if an API method needs a SessionState I'd better create one and if an API method needs a SessionContext I'd better create one."

Some kind of long form design documentation on these two types would be interesting (or if something already exists that I've missed or is in a blog post somewhere that would be cool too).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this is confusing -- I will write up some documentation about this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made #10350 to hopefully clarify

/// * [`create_physical_expr`] for a lower-level API
///
/// [simplified]: datafusion_optimizer::simplify_expressions
pub fn create_physical_expr(
&self,
expr: Expr,
df_schema: &DFSchema,
) -> Result<Arc<dyn PhysicalExpr>> {
let simplifier =
ExprSimplifier::new(SessionSimpifyProvider::new(self, df_schema));
// apply type coercion here to ensure types match
let mut expr = simplifier.coerce(expr, df_schema)?;

// rewrite Exprs to functions if necessary
let config_options = self.config_options();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applying these rewrites is a new feature (and what actually closes #10181)

for rewrite in self.analyzer.function_rewrites() {
expr = expr
.transform_up(|expr| rewrite.rewrite(expr, df_schema, config_options))?
.data;
}
create_physical_expr(&expr, df_schema, self.execution_props())
}

/// Return the session ID
pub fn session_id(&self) -> &str {
&self.session_id
Expand Down Expand Up @@ -2024,6 +2089,35 @@ impl SessionState {
}
}

struct SessionSimpifyProvider<'a> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This avoids cloning schema / schema refs

state: &'a SessionState,
df_schema: &'a DFSchema,
}

impl<'a> SessionSimpifyProvider<'a> {
alamb marked this conversation as resolved.
Show resolved Hide resolved
fn new(state: &'a SessionState, df_schema: &'a DFSchema) -> Self {
Self { state, df_schema }
}
}

impl<'a> SimplifyInfo for SessionSimpifyProvider<'a> {
fn is_boolean_type(&self, expr: &Expr) -> Result<bool> {
Ok(expr.get_type(self.df_schema)? == DataType::Boolean)
}

fn nullable(&self, expr: &Expr) -> Result<bool> {
expr.nullable(self.df_schema)
}

fn execution_props(&self) -> &ExecutionProps {
self.state.execution_props()
}

fn get_data_type(&self, expr: &Expr) -> Result<DataType> {
expr.get_type(self.df_schema)
}
}

struct SessionContextProvider<'a> {
state: &'a SessionState,
tables: HashMap<String, Arc<dyn TableSource>>,
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/test_util/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl TestParquetFile {
let parquet_options = ctx.copied_table_options().parquet;
if let Some(filter) = maybe_filter {
let simplifier = ExprSimplifier::new(context);
let filter = simplifier.coerce(filter, df_schema.clone()).unwrap();
let filter = simplifier.coerce(filter, &df_schema).unwrap();
let physical_filter_expr =
create_physical_expr(&filter, &df_schema, &ExecutionProps::default())?;
let parquet_exec = Arc::new(ParquetExec::new(
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/tests/core_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ mod dataframe;
/// Run all tests that are found in the `macro_hygiene` directory
mod macro_hygiene;

/// Run all tests that are found in the `expr_api` directory
mod expr_api;

#[cfg(test)]
#[ctor::ctor]
fn init() {
Expand Down
Loading