Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,17 @@ impl DFSchema {
Ok(Self { fields, metadata })
}

/// Create a `DFSchema` from an Arrow schema
pub fn try_from_qualified_schema(qualifier: &str, schema: &Schema) -> Result<Self> {
/// Create a `DFSchema` from an Arrow schema and a given qualifier
pub fn try_from_qualified_schema<'a>(
qualifier: impl Into<TableReference<'a>>,
schema: &Schema,
) -> Result<Self> {
let qualifier = qualifier.into();
Self::new_with_metadata(
schema
.fields()
.iter()
.map(|f| DFField::from_qualified(qualifier.to_string(), f.clone()))
.map(|f| DFField::from_qualified(qualifier.clone(), f.clone()))
.collect(),
schema.metadata().clone(),
)
Expand Down Expand Up @@ -662,12 +666,12 @@ impl DFField {
}

/// Create a qualified field from an existing Arrow field
pub fn from_qualified(
qualifier: impl Into<OwnedTableReference>,
pub fn from_qualified<'a>(
qualifier: impl Into<TableReference<'a>>,
Comment on lines +669 to +670
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

im curious about why explicit lifetime is needed here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically because the compiler told me to do so 😆

But seriously, I think the reason is that impl doesn't support anonymous lifetimes -- I would have to change this to use the explicit generic syntax

Here is what happens if I remove <'a>:

   --> datafusion/common/src/dfschema.rs:113:45
    |
113 |         qualifier: impl Into<TableReference<'_>>,
    |                                             ^^ expected named lifetime parameter
    |
help: consider introducing a named lifetime parameter
    |
112 ~     pub fn try_from_qualified_schema<'a>(
113 ~         qualifier: impl Into<TableReference<'a>>,
    |

field: Field,
) -> Self {
Self {
qualifier: Some(qualifier.into()),
qualifier: Some(qualifier.into().to_owned_reference()),
field,
}
}
Expand Down
5 changes: 5 additions & 0 deletions datafusion/common/src/table_reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ impl std::fmt::Display for TableReference<'_> {
}

impl<'a> TableReference<'a> {
/// Convenience method for creating a typed none `None`
pub fn none() -> Option<TableReference<'a>> {
None
}

/// Convenience method for creating a `Bare` variant of `TableReference`
pub fn bare(table: impl Into<Cow<'a, str>>) -> TableReference<'a> {
TableReference::Bare {
Expand Down
7 changes: 3 additions & 4 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,10 +1020,9 @@ impl SessionContext {
table_ref: impl Into<TableReference<'a>>,
) -> Result<DataFrame> {
let table_ref = table_ref.into();
let table = table_ref.table().to_owned();
let provider = self.table_provider(table_ref).await?;
let provider = self.table_provider(table_ref.to_owned_reference()).await?;
let plan = LogicalPlanBuilder::scan(
&table,
table_ref.to_owned_reference(),
provider_as_source(Arc::clone(&provider)),
None,
)?
Expand All @@ -1037,7 +1036,7 @@ impl SessionContext {
table_ref: impl Into<TableReference<'a>>,
) -> Result<Arc<dyn TableProvider>> {
let table_ref = table_ref.into();
let table = table_ref.table().to_owned();
let table = table_ref.table().to_string();
let schema = self.state.read().schema_for_ref(table_ref)?;
match schema.table(&table).await {
Some(ref provider) => Ok(Arc::clone(provider)),
Expand Down
5 changes: 3 additions & 2 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1876,7 +1876,7 @@ mod tests {
use arrow::array::{ArrayRef, DictionaryArray, Int32Array};
use arrow::datatypes::{DataType, Field, Int32Type, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion_common::assert_contains;
use datafusion_common::{assert_contains, TableReference};
use datafusion_common::{DFField, DFSchema, DFSchemaRef};
use datafusion_expr::{
col, lit, sum, Extension, GroupingSet, LogicalPlanBuilder,
Expand Down Expand Up @@ -2518,7 +2518,8 @@ Internal error: Optimizer rule 'type_coercion' failed due to unexpected error: E
match ctx.read_csv(path, options).await?.into_optimized_plan()? {
LogicalPlan::TableScan(ref scan) => {
let mut scan = scan.clone();
scan.table_name = name.to_string();
let table_reference = TableReference::from(name).to_owned_reference();
scan.table_name = table_reference;
let new_schema = scan
.projected_schema
.as_ref()
Expand Down
16 changes: 5 additions & 11 deletions datafusion/core/src/test_util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use crate::prelude::{CsvReadOptions, SessionContext};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion_common::{DataFusionError, Statistics};
use datafusion_common::{DataFusionError, Statistics, TableReference};
use datafusion_expr::{CreateExternalTable, Expr, TableType};
use datafusion_physical_expr::PhysicalSortExpr;
use futures::Stream;
Expand Down Expand Up @@ -219,11 +219,8 @@ pub fn scan_empty(
) -> Result<LogicalPlanBuilder, DataFusionError> {
let table_schema = Arc::new(table_schema.clone());
let provider = Arc::new(EmptyTable::new(table_schema));
LogicalPlanBuilder::scan(
name.unwrap_or(UNNAMED_TABLE),
provider_as_source(provider),
projection,
)
let name = TableReference::bare(name.unwrap_or(UNNAMED_TABLE).to_string());
LogicalPlanBuilder::scan(name, provider_as_source(provider), projection)
}

/// Scan an empty data source with configured partition, mainly used in tests.
Expand All @@ -235,11 +232,8 @@ pub fn scan_empty_with_partitions(
) -> Result<LogicalPlanBuilder, DataFusionError> {
let table_schema = Arc::new(table_schema.clone());
let provider = Arc::new(EmptyTable::new(table_schema).with_partitions(partitions));
LogicalPlanBuilder::scan(
name.unwrap_or(UNNAMED_TABLE),
provider_as_source(provider),
projection,
)
let name = TableReference::bare(name.unwrap_or(UNNAMED_TABLE).to_string());
LogicalPlanBuilder::scan(name, provider_as_source(provider), projection)
}

/// Get the schema for the aggregate_test_* csv files
Expand Down
105 changes: 85 additions & 20 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::{
use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion_common::{
Column, DFField, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference, Result,
ScalarValue, ToDFSchema,
ScalarValue, TableReference, ToDFSchema,
};
use std::any::Any;
use std::cmp::Ordering;
Expand Down Expand Up @@ -192,8 +192,39 @@ impl LogicalPlanBuilder {
}

/// Convert a table provider into a builder with a TableScan
///
/// Note that if you pass a string as `table_name`, it is treated
/// as a SQL identifier, as described on [`TableReference`] and
/// thus is normalized
///
/// # Example:
/// ```
/// # use datafusion_expr::{lit, col, LogicalPlanBuilder,
/// # logical_plan::builder::LogicalTableSource, logical_plan::table_scan
/// # };
/// # use std::sync::Arc;
/// # use arrow::datatypes::{Schema, DataType, Field};
/// # use datafusion_common::TableReference;
/// #
/// # let employee_schema = Arc::new(Schema::new(vec![
/// # Field::new("id", DataType::Int32, false),
/// # ])) as _;
/// # let table_source = Arc::new(LogicalTableSource::new(employee_schema));
/// // Scan table_source with the name "mytable" (after normalization)
/// # let table = table_source.clone();
/// let scan = LogicalPlanBuilder::scan("MyTable", table, None);
///
/// // Scan table_source with the name "MyTable" by enclosing in quotes
/// # let table = table_source.clone();
/// let scan = LogicalPlanBuilder::scan(r#""MyTable""#, table, None);
///
/// // Scan table_source with the name "MyTable" by forming the table reference
/// # let table = table_source.clone();
/// let table_reference = TableReference::bare("MyTable");
/// let scan = LogicalPlanBuilder::scan(table_reference, table, None);
/// ```
pub fn scan(
table_name: impl Into<String>,
table_name: impl Into<OwnedTableReference>,
table_source: Arc<dyn TableSource>,
projection: Option<Vec<usize>>,
) -> Result<Self> {
Expand All @@ -217,14 +248,14 @@ impl LogicalPlanBuilder {

/// Convert a table provider into a builder with a TableScan
pub fn scan_with_filters(
table_name: impl Into<String>,
table_name: impl Into<OwnedTableReference>,
table_source: Arc<dyn TableSource>,
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
) -> Result<Self> {
let table_name = table_name.into();

if table_name.is_empty() {
if table_name.table().is_empty() {
return Err(DataFusionError::Plan(
"table_name cannot be empty".to_string(),
));
Expand All @@ -239,7 +270,7 @@ impl LogicalPlanBuilder {
p.iter()
.map(|i| {
DFField::from_qualified(
table_name.to_string(),
table_name.clone(),
schema.field(*i).clone(),
)
})
Expand All @@ -248,7 +279,7 @@ impl LogicalPlanBuilder {
)
})
.unwrap_or_else(|| {
DFSchema::try_from_qualified_schema(&table_name, &schema)
DFSchema::try_from_qualified_schema(table_name.clone(), &schema)
})?;

let table_scan = LogicalPlan::TableScan(TableScan {
Expand Down Expand Up @@ -1196,14 +1227,22 @@ pub fn subquery_alias(

/// Create a LogicalPlanBuilder representing a scan of a table with the provided name and schema.
/// This is mostly used for testing and documentation.
pub fn table_scan(
name: Option<&str>,
pub fn table_scan<'a>(
name: Option<impl Into<TableReference<'a>>>,
table_schema: &Schema,
projection: Option<Vec<usize>>,
) -> Result<LogicalPlanBuilder> {
let table_source = table_source(table_schema);
let name = name
.map(|n| n.into())
.unwrap_or_else(|| OwnedTableReference::bare(UNNAMED_TABLE))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe TableReference::bare(UNNAMED_TABLE) instead since next line is to_owned_reference() anyway?

.to_owned_reference();
LogicalPlanBuilder::scan(name, table_source, projection)
}

fn table_source(table_schema: &Schema) -> Arc<dyn TableSource> {
let table_schema = Arc::new(table_schema.clone());
let table_source = Arc::new(LogicalTableSource { table_schema });
LogicalPlanBuilder::scan(name.unwrap_or(UNNAMED_TABLE), table_source, projection)
Arc::new(LogicalTableSource { table_schema })
}

/// Wrap projection for a plan, if the join keys contains normal expression.
Expand Down Expand Up @@ -1361,12 +1400,36 @@ mod tests {
#[test]
fn plan_builder_schema() {
let schema = employee_schema();
let plan = table_scan(Some("employee_csv"), &schema, None).unwrap();
let projection = None;
let plan =
LogicalPlanBuilder::scan("employee_csv", table_source(&schema), projection)
.unwrap();
let expected = DFSchema::try_from_qualified_schema(
TableReference::bare("employee_csv"),
&schema,
)
.unwrap();
assert_eq!(&expected, plan.schema().as_ref());

let expected =
DFSchema::try_from_qualified_schema("employee_csv", &schema).unwrap();
// Note scan of "EMPLOYEE_CSV" is treated as a SQL identifer
// (and thus normalized to "employee"csv") as well
let projection = None;
let plan =
LogicalPlanBuilder::scan("EMPLOYEE_CSV", table_source(&schema), projection)
.unwrap();
assert_eq!(&expected, plan.schema().as_ref());
}

assert_eq!(&expected, plan.schema().as_ref())
#[test]
fn plan_builder_empty_name() {
let schema = employee_schema();
let projection = None;
let err =
LogicalPlanBuilder::scan("", table_source(&schema), projection).unwrap_err();
assert_eq!(
err.to_string(),
"Error during planning: table_name cannot be empty"
);
}

#[test]
Expand Down Expand Up @@ -1481,9 +1544,10 @@ mod tests {

#[test]
fn plan_builder_union_different_num_columns_error() -> Result<()> {
let plan1 = table_scan(None, &employee_schema(), Some(vec![3]))?;

let plan2 = table_scan(None, &employee_schema(), Some(vec![3, 4]))?;
let plan1 =
table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?;
let plan2 =
table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?;

let expected = "Error during planning: Union queries must have the same number of columns, (left is 1, right is 2)";
let err_msg1 = plan1.clone().union(plan2.clone().build()?).unwrap_err();
Expand Down Expand Up @@ -1707,9 +1771,10 @@ mod tests {

#[test]
fn plan_builder_intersect_different_num_columns_error() -> Result<()> {
let plan1 = table_scan(None, &employee_schema(), Some(vec![3]))?;

let plan2 = table_scan(None, &employee_schema(), Some(vec![3, 4]))?;
let plan1 =
table_scan(TableReference::none(), &employee_schema(), Some(vec![3]))?;
let plan2 =
table_scan(TableReference::none(), &employee_schema(), Some(vec![3, 4]))?;

let expected = "Error during planning: INTERSECT/EXCEPT query must have the same number of columns. \
Left is 1 and right is 2.";
Expand Down
15 changes: 8 additions & 7 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::{
plan_err, Column, DFSchema, DFSchemaRef, DataFusionError, OwnedTableReference,
ScalarValue,
ScalarValue, TableReference,
};
use std::collections::{HashMap, HashSet};
use std::fmt::{self, Debug, Display, Formatter};
Expand Down Expand Up @@ -1437,9 +1437,12 @@ impl SubqueryAlias {
alias: impl Into<String>,
) -> datafusion_common::Result<Self> {
let alias = alias.into();
let table_ref = TableReference::bare(&alias);
let schema: Schema = plan.schema().as_ref().clone().into();
let schema =
DFSchemaRef::new(DFSchema::try_from_qualified_schema(&alias, &schema)?);
let schema = DFSchemaRef::new(DFSchema::try_from_qualified_schema(
table_ref.to_owned_reference(),
&schema,
)?);
Ok(SubqueryAlias {
input: Arc::new(plan),
alias,
Expand Down Expand Up @@ -1521,9 +1524,7 @@ pub struct Window {
#[derive(Clone)]
pub struct TableScan {
/// The name of the table
// TODO: change to OwnedTableReference
// see: https://github.com/apache/arrow-datafusion/issues/5522
pub table_name: String,
pub table_name: OwnedTableReference,
/// The source of the table
pub source: Arc<dyn TableSource>,
/// Optional column indices to use as a projection
Expand Down Expand Up @@ -2393,7 +2394,7 @@ mod tests {
Field::new("state", DataType::Utf8, false),
]);

table_scan(None, &schema, Some(vec![0, 1]))
table_scan(TableReference::none(), &schema, Some(vec![0, 1]))
.unwrap()
.filter(col("state").eq(lit("CO")))
.unwrap()
Expand Down
12 changes: 4 additions & 8 deletions datafusion/expr/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,14 +644,10 @@ pub fn from_plan(
}))
}
LogicalPlan::SubqueryAlias(SubqueryAlias { alias, .. }) => {
let schema = inputs[0].schema().as_ref().clone().into();
let schema =
DFSchemaRef::new(DFSchema::try_from_qualified_schema(alias, &schema)?);
Ok(LogicalPlan::SubqueryAlias(SubqueryAlias {
alias: alias.clone(),
input: Arc::new(inputs[0].clone()),
schema,
}))
Ok(LogicalPlan::SubqueryAlias(SubqueryAlias::try_new(
inputs[0].clone(),
alias.clone(),
)?))
}
LogicalPlan::Limit(Limit { skip, fetch, .. }) => Ok(LogicalPlan::Limit(Limit {
skip: *skip,
Expand Down
11 changes: 10 additions & 1 deletion datafusion/optimizer/src/inline_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,16 @@ impl OptimizerRule for InlineTableScan {
generate_projection_expr(projection, sub_plan)?;
let plan = LogicalPlanBuilder::from(sub_plan.clone())
.project(projection_exprs)?
.alias(table_name)?;
// Since this This is creating a subquery like:
//```sql
// ...
// FROM <view definition> as "table_name"
// ```
//
// it doesn't make sense to have a qualified
// reference (e.g. "foo"."bar") -- this convert to
// string
.alias(table_name.to_string())?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will subquery_alias need similar treatment to have an OwnedTableReference instead of String? or String itself should be just fine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is actually important that the subquery alias is a non parsed string here (otherwise tests fail).

This rewrite is effectively inlining the view defition as a named subquery (that doesn't make sense to be in a schema):

...
FROM <view definition> as "table_name"

I will add a comment here.

Ok(Some(plan.build()?))
} else {
Ok(None)
Expand Down
Loading