Skip to content

Commit

Permalink
Impl NamedRelation as CteWorkTable
Browse files Browse the repository at this point in the history
* impl cte as work table

* move SharedState to continuance

* impl WorkTableState

wip: readying pr to implement only logical plan
  • Loading branch information
matthewgapp committed Jan 12, 2024
1 parent 128b2c6 commit 98f77ff
Show file tree
Hide file tree
Showing 15 changed files with 323 additions and 29 deletions.
2 changes: 2 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,11 @@ impl DFField {
self.field = f.into();
self
}

pub fn with_qualifier(mut self, qualifier: impl Into<OwnedTableReference>) -> Self {
self.qualifier = Some(qualifier.into());
self
}
}

impl From<FieldRef> for DFField {
Expand Down
89 changes: 89 additions & 0 deletions datafusion/core/src/datasource/cte.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! CteWorkTable implementation used for recursive queries

use std::any::Any;
use std::sync::Arc;

use arrow::datatypes::SchemaRef;
use async_trait::async_trait;
use datafusion_common::not_impl_err;

use crate::{
error::Result,
logical_expr::{Expr, LogicalPlan, TableProviderFilterPushDown},
physical_plan::ExecutionPlan,
};

use datafusion_common::DataFusionError;

use crate::datasource::{TableProvider, TableType};
use crate::execution::context::SessionState;

/// TODO: add docs
pub struct CteWorkTable {
name: String,
table_schema: SchemaRef,
}

impl CteWorkTable {
/// TODO: add doc
pub fn new(name: &str, table_schema: SchemaRef) -> Self {
Self {
name: name.to_owned(),
table_schema,
}
}
}

#[async_trait]
impl TableProvider for CteWorkTable {
fn as_any(&self) -> &dyn Any {
self
}

fn get_logical_plan(&self) -> Option<&LogicalPlan> {
None
}

fn schema(&self) -> SchemaRef {
self.table_schema.clone()
}

fn table_type(&self) -> TableType {
TableType::Temporary
}

async fn scan(
&self,
_state: &SessionState,
_projection: Option<&Vec<usize>>,
_filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("scan not implemented for CteWorkTable yet")
}

fn supports_filter_pushdown(
&self,
_filter: &Expr,
) -> Result<TableProviderFilterPushDown> {
// TODO: should we support filter pushdown?
Ok(TableProviderFilterPushDown::Unsupported)
}
}
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
//! [`ListingTable`]: crate::datasource::listing::ListingTable

pub mod avro_to_arrow;
pub mod cte;
pub mod default_table_source;
pub mod empty;
pub mod file_format;
Expand Down
10 changes: 10 additions & 0 deletions datafusion/core/src/execution/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod parquet;
use crate::{
catalog::{CatalogList, MemoryCatalogList},
datasource::{
cte::CteWorkTable,
function::{TableFunction, TableFunctionImpl},
listing::{ListingOptions, ListingTable},
provider::TableProviderFactory,
Expand Down Expand Up @@ -1899,6 +1900,15 @@ impl<'a> ContextProvider for SessionContextProvider<'a> {
Ok(provider_as_source(provider))
}

fn create_cte_work_table(
&self,
name: &str,
schema: SchemaRef,
) -> Result<Arc<dyn TableSource>> {
let table = Arc::new(CteWorkTable::new(name, schema));
Ok(provider_as_source(table))
}

fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
self.state.scalar_functions().get(name).cloned()
}
Expand Down
7 changes: 5 additions & 2 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ use datafusion_expr::expr::{
use datafusion_expr::expr_rewriter::unnormalize_cols;
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
use datafusion_expr::{
DescribeTable, DmlStatement, ScalarFunctionDefinition, StringifiedPlan, WindowFrame,
WindowFrameBound, WriteOp,
DescribeTable, DmlStatement, RecursiveQuery, ScalarFunctionDefinition,
StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
};
use datafusion_physical_expr::expressions::Literal;
use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
Expand Down Expand Up @@ -1311,6 +1311,9 @@ impl DefaultPhysicalPlanner {
Ok(plan)
}
}
LogicalPlan::RecursiveQuery(RecursiveQuery { name: _, static_term: _, recursive_term: _, is_distinct: _,.. }) => {
not_impl_err!("Physical counterpart of RecursiveQuery is not implemented yet")
}
};
exec_plan
}.boxed()
Expand Down
23 changes: 23 additions & 0 deletions datafusion/expr/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ use datafusion_common::{
ScalarValue, TableReference, ToDFSchema, UnnestOptions,
};

use super::plan::RecursiveQuery;

/// Default table name for unnamed table
pub const UNNAMED_TABLE: &str = "?table?";

Expand Down Expand Up @@ -121,6 +123,27 @@ impl LogicalPlanBuilder {
}))
}

/// Convert a regular plan into a recursive query.
pub fn to_recursive_query(
&self,
name: String,
recursive_term: LogicalPlan,
is_distinct: bool,
) -> Result<Self> {
// TODO: we need to do a bunch of validation here. Maybe more.
if is_distinct {
return Err(DataFusionError::NotImplemented(
"Recursive queries with distinct is not supported".to_string(),
));
}
Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery {
name,
static_term: Arc::new(self.plan.clone()),
recursive_term: Arc::new(recursive_term),
is_distinct,
})))
}

/// Create a values list based relation, and the schema is inferred from data, consuming
/// `value`. See the [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details.
Expand Down
4 changes: 2 additions & 2 deletions datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ pub use plan::{
projection_schema, Aggregate, Analyze, CrossJoin, DescribeTable, Distinct,
DistinctOn, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint,
JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare, Projection,
Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
ToStringifiedPlan, Union, Unnest, Values, Window,
RecursiveQuery, Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias,
TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
};
pub use statement::{
SetVariable, Statement, TransactionAccessMode, TransactionConclusion, TransactionEnd,
Expand Down
46 changes: 46 additions & 0 deletions datafusion/expr/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ pub enum LogicalPlan {
/// Unnest a column that contains a nested list type such as an
/// ARRAY. This is used to implement SQL `UNNEST`
Unnest(Unnest),
/// A variadic query (e.g. "Recursive CTEs")
RecursiveQuery(RecursiveQuery),
}

impl LogicalPlan {
Expand Down Expand Up @@ -191,6 +193,10 @@ impl LogicalPlan {
LogicalPlan::Copy(CopyTo { input, .. }) => input.schema(),
LogicalPlan::Ddl(ddl) => ddl.schema(),
LogicalPlan::Unnest(Unnest { schema, .. }) => schema,
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
// we take the schema of the static term as the schema of the entire recursive query
static_term.schema()
}
}
}

Expand Down Expand Up @@ -243,6 +249,10 @@ impl LogicalPlan {
| LogicalPlan::TableScan(_) => {
vec![self.schema()]
}
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
// return only the schema of the static term
static_term.all_schemas()
}
// return children schemas
LogicalPlan::Limit(_)
| LogicalPlan::Subquery(_)
Expand Down Expand Up @@ -384,6 +394,7 @@ impl LogicalPlan {
.try_for_each(f),
// plans without expressions
LogicalPlan::EmptyRelation(_)
| LogicalPlan::RecursiveQuery(_)
| LogicalPlan::Subquery(_)
| LogicalPlan::SubqueryAlias(_)
| LogicalPlan::Limit(_)
Expand Down Expand Up @@ -430,6 +441,11 @@ impl LogicalPlan {
LogicalPlan::Ddl(ddl) => ddl.inputs(),
LogicalPlan::Unnest(Unnest { input, .. }) => vec![input],
LogicalPlan::Prepare(Prepare { input, .. }) => vec![input],
LogicalPlan::RecursiveQuery(RecursiveQuery {
static_term,
recursive_term,
..
}) => vec![static_term, recursive_term],
// plans without inputs
LogicalPlan::TableScan { .. }
| LogicalPlan::Statement { .. }
Expand Down Expand Up @@ -510,6 +526,9 @@ impl LogicalPlan {
cross.left.head_output_expr()
}
}
LogicalPlan::RecursiveQuery(RecursiveQuery { static_term, .. }) => {
static_term.head_output_expr()
}
LogicalPlan::Union(union) => Ok(Some(Expr::Column(
union.schema.fields()[0].qualified_column(),
))),
Expand Down Expand Up @@ -835,6 +854,14 @@ impl LogicalPlan {
};
Ok(LogicalPlan::Distinct(distinct))
}
LogicalPlan::RecursiveQuery(RecursiveQuery {
name, is_distinct, ..
}) => Ok(LogicalPlan::RecursiveQuery(RecursiveQuery {
name: name.clone(),
static_term: Arc::new(inputs[0].clone()),
recursive_term: Arc::new(inputs[1].clone()),
is_distinct: *is_distinct,
})),
LogicalPlan::Analyze(a) => {
assert!(expr.is_empty());
assert_eq!(inputs.len(), 1);
Expand Down Expand Up @@ -1073,6 +1100,7 @@ impl LogicalPlan {
}),
LogicalPlan::TableScan(TableScan { fetch, .. }) => *fetch,
LogicalPlan::EmptyRelation(_) => Some(0),
LogicalPlan::RecursiveQuery(_) => None,
LogicalPlan::Subquery(_) => None,
LogicalPlan::SubqueryAlias(SubqueryAlias { input, .. }) => input.max_rows(),
LogicalPlan::Limit(Limit { fetch, .. }) => *fetch,
Expand Down Expand Up @@ -1408,6 +1436,11 @@ impl LogicalPlan {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self.0 {
LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"),
LogicalPlan::RecursiveQuery(RecursiveQuery {
is_distinct, ..
}) => {
write!(f, "RecursiveQuery: is_distinct={}", is_distinct)
}
LogicalPlan::Values(Values { ref values, .. }) => {
let str_values: Vec<_> = values
.iter()
Expand Down Expand Up @@ -1718,6 +1751,19 @@ pub struct EmptyRelation {
pub schema: DFSchemaRef,
}

/// A variadic query operation
#[derive(Clone, PartialEq, Eq, Hash)]
pub struct RecursiveQuery {
/// Name of the query
pub name: String,
/// The static term
pub static_term: Arc<LogicalPlan>,
/// The recursive term
pub recursive_term: Arc<LogicalPlan>,
/// Distinction
pub is_distinct: bool,
}

/// Values expression. See
/// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html)
/// documentation for more details.
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/common_subexpr_eliminate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,7 @@ impl OptimizerRule for CommonSubexprEliminate {
| LogicalPlan::Dml(_)
| LogicalPlan::Copy(_)
| LogicalPlan::Unnest(_)
| LogicalPlan::RecursiveQuery(_)
| LogicalPlan::Prepare(_) => {
// apply the optimization to all inputs of the plan
utils::optimize_children(self, plan, config)?
Expand Down
1 change: 1 addition & 0 deletions datafusion/optimizer/src/optimize_projections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ fn optimize_projections(
.collect::<Vec<_>>()
}
LogicalPlan::EmptyRelation(_)
| LogicalPlan::RecursiveQuery(_)
| LogicalPlan::Statement(_)
| LogicalPlan::Values(_)
| LogicalPlan::Extension(_)
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1702,6 +1702,9 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlan::DescribeTable(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for DescribeTable",
)),
LogicalPlan::RecursiveQuery(_) => Err(proto_error(
"LogicalPlan serde is not yet implemented for RecursiveQuery",
)),
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions datafusion/sql/src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ pub trait ContextProvider {
not_impl_err!("Table Functions are not supported")
}

/// TODO: add doc
fn create_cte_work_table(
&self,
_name: &str,
_schema: SchemaRef,
) -> Result<Arc<dyn TableSource>> {
not_impl_err!("Recursive CTE is not supported")
}

/// Getter for a UDF description
fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>>;
/// Getter for a UDAF description
Expand Down
Loading

0 comments on commit 98f77ff

Please sign in to comment.