-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Use TableReference for TableScan #5615
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a9dfd29
54b0d8a
201d115
2c3b6d7
feac21f
6aeff12
ef1bc06
c234489
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -41,7 +41,7 @@ use crate::{ | |
| use arrow::datatypes::{DataType, Schema, SchemaRef}; | ||
| use datafusion_common::{ | ||
| Column, DFField, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, Result, | ||
| ScalarValue, ToDFSchema, | ||
| ScalarValue, TableReference, ToDFSchema, | ||
| }; | ||
| use std::any::Any; | ||
| use std::cmp::Ordering; | ||
|
|
@@ -192,8 +192,39 @@ impl LogicalPlanBuilder { | |
| } | ||
|
|
||
| /// Convert a table provider into a builder with a TableScan | ||
| /// | ||
| /// Note that if you pass a string as `table_name`, it is treated | ||
| /// as a SQL identifier, as described on [`TableReference`] and | ||
| /// thus is normalized | ||
| /// | ||
| /// # Example: | ||
| /// ``` | ||
| /// # use datafusion_expr::{lit, col, LogicalPlanBuilder, | ||
| /// # logical_plan::builder::LogicalTableSource, logical_plan::table_scan | ||
| /// # }; | ||
| /// # use std::sync::Arc; | ||
| /// # use arrow::datatypes::{Schema, DataType, Field}; | ||
| /// # use datafusion_common::TableReference; | ||
| /// # | ||
| /// # let employee_schema = Arc::new(Schema::new(vec![ | ||
| /// # Field::new("id", DataType::Int32, false), | ||
| /// # ])) as _; | ||
| /// # let table_source = Arc::new(LogicalTableSource::new(employee_schema)); | ||
| /// // Scan table_source with the name "mytable" (after normalization) | ||
| /// # let table = table_source.clone(); | ||
| /// let scan = LogicalPlanBuilder::scan("MyTable", table, None); | ||
| /// | ||
| /// // Scan table_source with the name "MyTable" by enclosing in quotes | ||
| /// # let table = table_source.clone(); | ||
| /// let scan = LogicalPlanBuilder::scan(r#""MyTable""#, table, None); | ||
| /// | ||
| /// // Scan table_source with the name "MyTable" by forming the table reference | ||
| /// # let table = table_source.clone(); | ||
| /// let table_reference = TableReference::bare("MyTable"); | ||
| /// let scan = LogicalPlanBuilder::scan(table_reference, table, None); | ||
| /// ``` | ||
| pub fn scan( | ||
| table_name: impl Into<String>, | ||
| table_name: impl Into<OwnedTableReference>, | ||
| table_source: Arc<dyn TableSource>, | ||
| projection: Option<Vec<usize>>, | ||
| ) -> Result<Self> { | ||
|
|
@@ -217,14 +248,14 @@ impl LogicalPlanBuilder { | |
|
|
||
| /// Convert a table provider into a builder with a TableScan | ||
| pub fn scan_with_filters( | ||
| table_name: impl Into<String>, | ||
| table_name: impl Into<OwnedTableReference>, | ||
| table_source: Arc<dyn TableSource>, | ||
| projection: Option<Vec<usize>>, | ||
| filters: Vec<Expr>, | ||
| ) -> Result<Self> { | ||
| let table_name = table_name.into(); | ||
|
|
||
| if table_name.is_empty() { | ||
| if table_name.table().is_empty() { | ||
| return Err(DataFusionError::Plan( | ||
| "table_name cannot be empty".to_string(), | ||
| )); | ||
|
|
@@ -239,7 +270,7 @@ impl LogicalPlanBuilder { | |
| p.iter() | ||
| .map(|i| { | ||
| DFField::from_qualified( | ||
| table_name.to_string(), | ||
| table_name.clone(), | ||
| schema.field(*i).clone(), | ||
| ) | ||
| }) | ||
|
|
@@ -248,7 +279,7 @@ impl LogicalPlanBuilder { | |
| ) | ||
| }) | ||
| .unwrap_or_else(|| { | ||
| DFSchema::try_from_qualified_schema(&table_name, &schema) | ||
| DFSchema::try_from_qualified_schema(table_name.clone(), &schema) | ||
| })?; | ||
|
|
||
| let table_scan = LogicalPlan::TableScan(TableScan { | ||
|
|
@@ -1196,14 +1227,22 @@ pub fn subquery_alias( | |
|
|
||
| /// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema. | ||
| /// This is mostly used for testing and documentation. | ||
| pub fn table_scan( | ||
| name: Option<&str>, | ||
| pub fn table_scan<'a>( | ||
| name: Option<impl Into<TableReference<'a>>>, | ||
| table_schema: &Schema, | ||
| projection: Option<Vec<usize>>, | ||
| ) -> Result<LogicalPlanBuilder> { | ||
| let table_source = table_source(table_schema); | ||
| let name = name | ||
| .map(|n| n.into()) | ||
| .unwrap_or_else(|| OwnedTableReference::bare(UNNAMED_TABLE)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe |
||
| .to_owned_reference(); | ||
| LogicalPlanBuilder::scan(name, table_source, projection) | ||
| } | ||
|
|
||
| fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> { | ||
| let table_schema = Arc::new(table_schema.clone()); | ||
| let table_source = Arc::new(LogicalTableSource { table_schema }); | ||
| LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), table_source, projection) | ||
| Arc::new(LogicalTableSource { table_schema }) | ||
| } | ||
|
|
||
| /// Wrap projection for a plan, if the join keys contains normal expression. | ||
|
|
@@ -1361,12 +1400,36 @@ mod tests { | |
| #[test] | ||
| fn plan_builder_schema() { | ||
| let schema = employee_schema(); | ||
| let plan = table_scan(Some("employee_csv"), &schema, None).unwrap(); | ||
| let projection = None; | ||
| let plan = | ||
| LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection) | ||
| .unwrap(); | ||
| let expected = DFSchema::try_from_qualified_schema( | ||
| TableReference::bare("employee_csv"), | ||
| &schema, | ||
| ) | ||
| .unwrap(); | ||
| assert_eq!(&expected, plan.schema().as_ref()); | ||
|
|
||
| let expected = | ||
| DFSchema::try_from_qualified_schema("employee_csv", &schema).unwrap(); | ||
| // Note scan of "EMPLOYEE_CSV" is treated as a SQL identifer | ||
| // (and thus normalized to "employee"csv") as well | ||
| let projection = None; | ||
| let plan = | ||
| LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection) | ||
| .unwrap(); | ||
| assert_eq!(&expected, plan.schema().as_ref()); | ||
| } | ||
|
|
||
| assert_eq!(&expected, plan.schema().as_ref()) | ||
| #[test] | ||
| fn plan_builder_empty_name() { | ||
| let schema = employee_schema(); | ||
| let projection = None; | ||
| let err = | ||
| LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err(); | ||
| assert_eq!( | ||
| err.to_string(), | ||
| "Error during planning: table_name cannot be empty" | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
|
|
@@ -1481,9 +1544,10 @@ mod tests { | |
|
|
||
| #[test] | ||
| fn plan_builder_union_different_num_columns_error() -> Result<()> { | ||
| let plan1 = table_scan(None, &employee_schema(), Some(vec![3]))?; | ||
|
|
||
| let plan2 = table_scan(None, &employee_schema(), Some(vec![3, 4]))?; | ||
| let plan1 = | ||
| table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?; | ||
| let plan2 = | ||
| table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?; | ||
|
|
||
| let expected = "Error during planning: Union queries must have the same number of columns, (left is 1, right is 2)"; | ||
| let err_msg1 = plan1.clone().union(plan2.clone().build()?).unwrap_err(); | ||
|
|
@@ -1707,9 +1771,10 @@ mod tests { | |
|
|
||
| #[test] | ||
| fn plan_builder_intersect_different_num_columns_error() -> Result<()> { | ||
| let plan1 = table_scan(None, &employee_schema(), Some(vec![3]))?; | ||
|
|
||
| let plan2 = table_scan(None, &employee_schema(), Some(vec![3, 4]))?; | ||
| let plan1 = | ||
| table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?; | ||
| let plan2 = | ||
| table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?; | ||
|
|
||
| let expected = "Error during planning: INTERSECT/EXCEPT query must have the same number of columns. \ | ||
| Left is 1 and right is 2."; | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,7 +57,16 @@ impl OptimizerRule for InlineTableScan { | |
| generate_projection_expr(projection, sub_plan)?; | ||
| let plan = LogicalPlanBuilder::from(sub_plan.clone()) | ||
| .project(projection_exprs)? | ||
| .alias(table_name)?; | ||
| // Since this This is creating a subquery like: | ||
| //```sql | ||
| // ... | ||
| // FROM <view definition> as "table_name" | ||
| // ``` | ||
| // | ||
| // it doesn't make sense to have a qualified | ||
| // reference (e.g. "foo"."bar") -- this convert to | ||
| // string | ||
| .alias(table_name.to_string())?; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will subquery_alias need similar treatment to have an OwnedTableReference instead of String? or String itself should be just fine?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it is actually important that the subquery alias is a non parsed string here (otherwise tests fail). This rewrite is effectively inlining the view defition as a named subquery (that doesn't make sense to be in a schema): ...
FROM <view definition> as "table_name"I will add a comment here. |
||
| Ok(Some(plan.build()?)) | ||
| } else { | ||
| Ok(None) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
im curious about why explicit lifetime is needed here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically because the compiler told me to do so 😆
But seriously, I think the reason is that
impldoesn't support anonymous lifetimes -- I would have to change this to use the explicit generic syntaxHere is what happens if I remove
<'a>: