-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Detect when filters on unique constraints make subqueries scalar #8312
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,7 @@ use crate::logical_plan::{DmlStatement, Statement}; | |
| use crate::utils::{ | ||
| enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs, | ||
| grouping_set_expr_count, grouping_set_to_exprlist, inspect_expr_pre, | ||
| split_conjunction, | ||
| }; | ||
| use crate::{ | ||
| build_join_schema, expr_vec_fmt, BinaryExpr, CreateMemoryTable, CreateView, Expr, | ||
|
|
@@ -47,7 +48,7 @@ use datafusion_common::tree_node::{ | |
| }; | ||
| use datafusion_common::{ | ||
| aggregate_functional_dependencies, internal_err, plan_err, Column, Constraints, | ||
| DFField, DFSchema, DFSchemaRef, DataFusionError, FunctionalDependencies, | ||
| DFField, DFSchema, DFSchemaRef, DataFusionError, Dependency, FunctionalDependencies, | ||
| OwnedTableReference, ParamValues, Result, UnnestOptions, | ||
| }; | ||
| // backwards compatibility | ||
|
|
@@ -1032,7 +1033,13 @@ impl LogicalPlan { | |
| pub fn max_rows(self: &LogicalPlan) -> Option<usize> { | ||
| match self { | ||
| LogicalPlan::Projection(Projection { input, .. }) => input.max_rows(), | ||
| LogicalPlan::Filter(Filter { input, .. }) => input.max_rows(), | ||
| LogicalPlan::Filter(filter) => { | ||
| if filter.is_scalar() { | ||
| Some(1) | ||
| } else { | ||
| filter.input.max_rows() | ||
| } | ||
| } | ||
| LogicalPlan::Window(Window { input, .. }) => input.max_rows(), | ||
| LogicalPlan::Aggregate(Aggregate { | ||
| input, group_expr, .. | ||
|
|
@@ -1913,6 +1920,73 @@ impl Filter { | |
|
|
||
| Ok(Self { predicate, input }) | ||
| } | ||
|
|
||
| /// Is this filter guaranteed to return 0 or 1 row in a given instantiation? | ||
| /// | ||
| /// This function will return `true` if its predicate contains a conjunction of | ||
| /// `col(a) = <expr>`, where its schema has a unique filter that is covered | ||
| /// by this conjunction. | ||
| /// | ||
| /// For example, for the table: | ||
| /// ```sql | ||
| /// CREATE TABLE t (a INTEGER PRIMARY KEY, b INTEGER); | ||
| /// ``` | ||
| /// `Filter(a = 2).is_scalar() == true` | ||
| /// , whereas | ||
| /// `Filter(b = 2).is_scalar() == false` | ||
| /// and | ||
| /// `Filter(a = 2 OR b = 2).is_scalar() == false` | ||
| fn is_scalar(&self) -> bool { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I suggest use Commonly used Functional dependencies : including
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do I understand correctly that the suggestion is to rename this method to
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, and add these definition into doc/ |
||
| let schema = self.input.schema(); | ||
|
|
||
| let functional_dependencies = self.input.schema().functional_dependencies(); | ||
| let unique_keys = functional_dependencies.iter().filter(|dep| { | ||
| let nullable = dep.nullable | ||
| && dep | ||
| .source_indices | ||
| .iter() | ||
| .any(|&source| schema.field(source).is_nullable()); | ||
| !nullable | ||
| && dep.mode == Dependency::Single | ||
| && dep.target_indices.len() == schema.fields().len() | ||
| }); | ||
|
|
||
| let exprs = split_conjunction(&self.predicate); | ||
| let eq_pred_cols: HashSet<_> = exprs | ||
| .iter() | ||
| .filter_map(|expr| { | ||
| let Expr::BinaryExpr(BinaryExpr { | ||
| left, | ||
| op: Operator::Eq, | ||
| right, | ||
| }) = expr | ||
| else { | ||
| return None; | ||
| }; | ||
| // This is a no-op filter expression | ||
| if left == right { | ||
| return None; | ||
| } | ||
|
|
||
| match (left.as_ref(), right.as_ref()) { | ||
| (Expr::Column(_), Expr::Column(_)) => None, | ||
| (Expr::Column(c), _) | (_, Expr::Column(c)) => { | ||
| Some(schema.index_of_column(c).unwrap()) | ||
jackwener marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| _ => None, | ||
| } | ||
| }) | ||
| .collect(); | ||
|
|
||
| // If we have a functional dependence that is a subset of our predicate, | ||
| // this filter is scalar | ||
| for key in unique_keys { | ||
| if key.source_indices.iter().all(|c| eq_pred_cols.contains(c)) { | ||
| return true; | ||
| } | ||
| } | ||
| false | ||
| } | ||
| } | ||
|
|
||
| /// Window its input based on a set of window spec and window function (e.g. SUM or RANK) | ||
|
|
@@ -2554,12 +2628,16 @@ pub struct Unnest { | |
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use crate::builder::LogicalTableSource; | ||
| use crate::logical_plan::table_scan; | ||
| use crate::{col, count, exists, in_subquery, lit, placeholder, GroupingSet}; | ||
| use arrow::datatypes::{DataType, Field, Schema}; | ||
| use datafusion_common::tree_node::TreeNodeVisitor; | ||
| use datafusion_common::{not_impl_err, DFSchema, ScalarValue, TableReference}; | ||
| use datafusion_common::{ | ||
| not_impl_err, Constraint, DFSchema, ScalarValue, TableReference, | ||
| }; | ||
| use std::collections::HashMap; | ||
| use std::sync::Arc; | ||
|
|
||
| fn employee_schema() -> Schema { | ||
| Schema::new(vec![ | ||
|
|
@@ -3056,6 +3134,63 @@ digraph { | |
| .is_nullable()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_filter_is_scalar() { | ||
| // test empty placeholder | ||
| let schema = | ||
| Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)])); | ||
|
|
||
| let source = Arc::new(LogicalTableSource::new(schema)); | ||
| let schema = Arc::new( | ||
| DFSchema::try_from_qualified_schema( | ||
| TableReference::bare("tab"), | ||
| &source.schema(), | ||
| ) | ||
| .unwrap(), | ||
| ); | ||
| let scan = Arc::new(LogicalPlan::TableScan(TableScan { | ||
| table_name: TableReference::bare("tab"), | ||
| source: source.clone(), | ||
| projection: None, | ||
| projected_schema: schema.clone(), | ||
| filters: vec![], | ||
| fetch: None, | ||
| })); | ||
| let col = schema.field(0).qualified_column(); | ||
|
|
||
| let filter = Filter::try_new( | ||
| Expr::Column(col).eq(Expr::Literal(ScalarValue::Int32(Some(1)))), | ||
| scan, | ||
| ) | ||
| .unwrap(); | ||
| assert!(!filter.is_scalar()); | ||
| let unique_schema = | ||
| Arc::new(schema.as_ref().clone().with_functional_dependencies( | ||
| FunctionalDependencies::new_from_constraints( | ||
| Some(&Constraints::new_unverified(vec![Constraint::Unique( | ||
| vec![0], | ||
| )])), | ||
| 1, | ||
| ), | ||
| )); | ||
| let scan = Arc::new(LogicalPlan::TableScan(TableScan { | ||
| table_name: TableReference::bare("tab"), | ||
| source, | ||
| projection: None, | ||
| projected_schema: unique_schema.clone(), | ||
| filters: vec![], | ||
| fetch: None, | ||
| })); | ||
| let col = schema.field(0).qualified_column(); | ||
|
|
||
| let filter = Filter::try_new( | ||
| Expr::Column(col).eq(Expr::Literal(ScalarValue::Int32(Some(1)))), | ||
| scan, | ||
| ) | ||
| .unwrap(); | ||
| assert!(filter.is_scalar()); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_transform_explain() { | ||
| let schema = Schema::new(vec![ | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.