From 4b8c5e67f894e7c5e2df9d3a4ced2276a930f3d7 Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Sun, 2 Oct 2022 23:06:39 +0300 Subject: [PATCH] Implement recursive query execution --- datafusion/core/src/execution/context.rs | 38 ++ datafusion/core/src/logical_plan/plan.rs | 6 +- .../core/src/physical_plan/continuance.rs | 162 ++++++++ datafusion/core/src/physical_plan/mod.rs | 2 + datafusion/core/src/physical_plan/planner.rs | 30 +- .../core/src/physical_plan/recursive_query.rs | 357 ++++++++++++++++++ datafusion/core/src/physical_plan/stream.rs | 23 +- datafusion/expr/src/logical_plan/builder.rs | 24 +- datafusion/expr/src/logical_plan/mod.rs | 6 +- datafusion/expr/src/logical_plan/plan.rs | 21 ++ datafusion/expr/src/utils.rs | 16 +- .../optimizer/src/common_subexpr_eliminate.rs | 1 + .../optimizer/src/projection_push_down.rs | 1 + datafusion/proto/src/logical_plan.rs | 3 + datafusion/sql/src/planner.rs | 41 +- 15 files changed, 690 insertions(+), 41 deletions(-) create mode 100644 datafusion/core/src/physical_plan/continuance.rs create mode 100644 datafusion/core/src/physical_plan/recursive_query.rs diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index 2a805a5fc0e8..5f239b8a60e4 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -57,6 +57,7 @@ use std::{ }; use arrow::datatypes::{DataType, SchemaRef}; +use arrow::error::Result as ArrowResult; use arrow::record_batch::RecordBatch; use crate::catalog::{ @@ -118,7 +119,9 @@ use datafusion_sql::{ parser::DFParser, planner::{ContextProvider, SqlToRel}, }; +use parking_lot::Mutex; use parquet::file::properties::WriterProperties; +use tokio::sync::mpsc::Receiver as SingleChannelReceiver; use uuid::Uuid; use super::options::{ @@ -1736,6 +1739,8 @@ pub enum TaskProperties { KVPairs(HashMap), } +type RelationHandler = SingleChannelReceiver>; + /// Task Execution Context pub struct TaskContext { /// Session Id @@ -1750,6 +1755,8 @@ pub struct TaskContext { aggregate_functions: HashMap>, /// Runtime environment associated with this task context runtime: Arc, + /// Registered relation handlers + relation_handlers: Mutex>, } impl TaskContext { @@ -1769,6 +1776,7 @@ impl TaskContext { scalar_functions, aggregate_functions, runtime, + relation_handlers: Mutex::new(HashMap::new()), } } @@ -1824,6 +1832,34 @@ impl TaskContext { pub fn runtime_env(&self) -> Arc { self.runtime.clone() } + + /// Register a new relation handler. If a handler with the same name already exists + /// this function will return an error. + pub fn push_relation_handler( + &self, + name: String, + handler: RelationHandler, + ) -> Result<()> { + let mut handlers = self.relation_handlers.lock(); + if handlers.contains_key(&name) { + return Err(DataFusionError::Internal(format!( + "Relation handler {} already registered", + name + ))); + } + handlers.insert(name, handler); + Ok(()) + } + + /// Retrieve the relation handler for the given name. It will remove the handler from + /// the storage if it exists, and return it as is. + pub fn pop_relation_handler(&self, name: String) -> Result { + let mut handlers = self.relation_handlers.lock(); + + handlers.remove(name.as_str()).ok_or_else(|| { + DataFusionError::Internal(format!("Relation handler {} not registered", name)) + }) + } } /// Create a new task context instance from SessionContext @@ -1846,6 +1882,7 @@ impl From<&SessionContext> for TaskContext { scalar_functions, aggregate_functions, runtime, + relation_handlers: Mutex::new(HashMap::new()), } } } @@ -1865,6 +1902,7 @@ impl From<&SessionState> for TaskContext { scalar_functions, aggregate_functions, runtime, + relation_handlers: Mutex::new(HashMap::new()), } } } diff --git a/datafusion/core/src/logical_plan/plan.rs b/datafusion/core/src/logical_plan/plan.rs index d03b9d43ff5c..5100b95753b3 100644 --- a/datafusion/core/src/logical_plan/plan.rs +++ b/datafusion/core/src/logical_plan/plan.rs @@ -24,9 +24,9 @@ pub use datafusion_expr::{ Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan, - Partitioning, PlanType, PlanVisitor, Projection, RecursiveQuery, Repartition, - Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan, - Union, UserDefinedLogicalNode, Values, Window, + NamedRelation, Partitioning, PlanType, PlanVisitor, Projection, RecursiveQuery, + Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, + ToStringifiedPlan, Union, UserDefinedLogicalNode, Values, Window, }, TableProviderFilterPushDown, TableSource, }; diff --git a/datafusion/core/src/physical_plan/continuance.rs b/datafusion/core/src/physical_plan/continuance.rs new file mode 100644 index 000000000000..9004360016e6 --- /dev/null +++ b/datafusion/core/src/physical_plan/continuance.rs @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the continuance query plan + +use std::any::Any; +use std::sync::Arc; + +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::{ + DisplayFormatType, Distribution, ExecutionPlan, Partitioning, +}; +use arrow::datatypes::SchemaRef; + +use super::expressions::PhysicalSortExpr; +use super::stream::RecordBatchReceiverStream; +use super::{ + metrics::{ExecutionPlanMetricsSet, MetricsSet}, + SendableRecordBatchStream, Statistics, +}; + +use crate::execution::context::TaskContext; + +/// A temporary "working table" operation wehre the input data will be +/// taken from the named handle during the execution and will be re-published +/// as is (kind of like a mirror). +/// +/// Most notably used in the implementation of recursive queries where the +/// underlying relation does not exist yet but the data will come as the previous +/// term is evaluated. +#[derive(Debug)] +pub struct ContinuanceExec { + /// Name of the relation handler + name: String, + /// The schema of the stream + schema: SchemaRef, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, +} + +impl ContinuanceExec { + /// Create a new execution plan for a continuance stream. The given relation + /// handler must exist in the task context before calling [`execute`] on this + /// plan. + pub fn new(name: String, schema: SchemaRef) -> Self { + Self { + name, + schema, + metrics: ExecutionPlanMetricsSet::new(), + } + } +} + +impl ExecutionPlan for ContinuanceExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.schema.clone() + } + + fn children(&self) -> Vec> { + vec![] + } + + fn required_child_distribution(&self) -> Distribution { + Distribution::UnspecifiedDistribution + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + fn relies_on_input_order(&self) -> bool { + false + } + + fn maintains_input_order(&self) -> bool { + false + } + + fn benefits_from_input_partitioning(&self) -> bool { + false + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + Ok(Arc::new(ContinuanceExec::new( + self.name.clone(), + self.schema.clone(), + ))) + } + + /// This plan does not come with any special streams, but rather we use + /// the existing [`RecordBatchReceiverStream`] to receive the data from + /// the registered handle. + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + // Continuance streams must be the plan base. + if partition != 0 { + return Err(DataFusionError::Internal(format!( + "ContinuanceExec got an invalid partition {} (expected 0)", + partition + ))); + } + + // The relation handler must be already registered by the + // parent op. + let receiver = context.pop_relation_handler(self.name.clone())?; + Ok(RecordBatchReceiverStream::create_without_handle( + &self.schema, + receiver, + )) + } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "ContinuanceExec: name={}", self.name) + } + } + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn statistics(&self) -> Statistics { + Statistics::default() + } +} + +#[cfg(test)] +mod tests {} diff --git a/datafusion/core/src/physical_plan/mod.rs b/datafusion/core/src/physical_plan/mod.rs index 4613be7cdcbc..b40e39367b26 100644 --- a/datafusion/core/src/physical_plan/mod.rs +++ b/datafusion/core/src/physical_plan/mod.rs @@ -549,6 +549,7 @@ pub mod analyze; pub mod coalesce_batches; pub mod coalesce_partitions; pub mod common; +pub mod continuance; pub mod cross_join; pub mod display; pub mod empty; @@ -563,6 +564,7 @@ pub mod memory; pub mod metrics; pub mod planner; pub mod projection; +pub mod recursive_query; pub mod repartition; pub mod sort_merge_join; pub mod sorts; diff --git a/datafusion/core/src/physical_plan/planner.rs b/datafusion/core/src/physical_plan/planner.rs index 07c9e424503a..4bae6116e7a4 100644 --- a/datafusion/core/src/physical_plan/planner.rs +++ b/datafusion/core/src/physical_plan/planner.rs @@ -28,8 +28,8 @@ use crate::datasource::source_as_provider; use crate::execution::context::{ExecutionProps, SessionState}; use crate::logical_expr::utils::generate_sort_key; use crate::logical_plan::plan::{ - Aggregate, Distinct, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias, - TableScan, Window, + Aggregate, Distinct, EmptyRelation, Filter, Join, NamedRelation, Projection, Sort, + SubqueryAlias, TableScan, Window, }; use crate::logical_plan::{ unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan, @@ -40,6 +40,7 @@ use crate::logical_plan::{Limit, Values}; use crate::physical_expr::create_physical_expr; use crate::physical_optimizer::optimizer::PhysicalOptimizerRule; use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy}; +use crate::physical_plan::continuance::ContinuanceExec; use crate::physical_plan::cross_join::CrossJoinExec; use crate::physical_plan::explain::ExplainExec; use crate::physical_plan::expressions::{Column, PhysicalSortExpr}; @@ -47,6 +48,7 @@ use crate::physical_plan::filter::FilterExec; use crate::physical_plan::hash_join::HashJoinExec; use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec}; use crate::physical_plan::projection::ProjectionExec; +use crate::physical_plan::recursive_query::RecursiveQueryExec; use crate::physical_plan::repartition::RepartitionExec; use crate::physical_plan::sorts::sort::SortExec; use crate::physical_plan::windows::WindowAggExec; @@ -1028,9 +1030,27 @@ impl DefaultPhysicalPlanner { Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch))) } - LogicalPlan::RecursiveQuery(RecursiveQuery { .. }) => { - dbg!(logical_plan); - todo!(); + LogicalPlan::RecursiveQuery(RecursiveQuery { name, static_term, recursive_term, is_distinct }) => { + let static_term = self.create_initial_plan(static_term, session_state).await?; + let recursive_term = self.create_initial_plan(recursive_term, session_state).await?; + + Ok(Arc::new(RecursiveQueryExec::new(name.clone(), static_term, recursive_term, *is_distinct))) + } + LogicalPlan::NamedRelation(NamedRelation {name, schema}) => { + // Named relations is how we represent access to any sort of dynamic data provider. They + // differ from tables in the sense that they can start existing dynamically during the + // execution of a query and then disappear before it even finishes. + // + // This system allows us to replicate the tricky behavior of classical databases where a + // temporary "working table" (as it is called in Postgres) can be used when dealing with + // complex operations (such as recursive CTEs) and then can be dropped. Since DataFusion + // at its core is heavily stream-based and vectorized, we try to avoid using 'real' tables + // and let the streams take care of the data flow in this as well. + + // Since the actual "input"'s will be only available to us at runtime (through task context) + // we can't really do any sort of meaningful validation here. + let schema = SchemaRef::new(schema.as_ref().to_owned().into()); + Ok(Arc::new(ContinuanceExec::new(name.clone(), schema))) } LogicalPlan::CreateExternalTable(_) => { // There is no default plan for "CREATE EXTERNAL diff --git a/datafusion/core/src/physical_plan/recursive_query.rs b/datafusion/core/src/physical_plan/recursive_query.rs new file mode 100644 index 000000000000..c2adeb0bcae6 --- /dev/null +++ b/datafusion/core/src/physical_plan/recursive_query.rs @@ -0,0 +1,357 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the recursive query plan + +use std::any::Any; +use std::sync::Arc; +use std::task::{Context, Poll}; + +use crate::error::{DataFusionError, Result}; +use crate::physical_plan::{ + DisplayFormatType, Distribution, ExecutionPlan, Partitioning, +}; +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use futures::{Stream, StreamExt}; +use tokio::sync::mpsc; + +use super::expressions::PhysicalSortExpr; +use super::metrics::BaselineMetrics; +use super::RecordBatchStream; +use super::{ + metrics::{ExecutionPlanMetricsSet, MetricsSet}, + SendableRecordBatchStream, Statistics, +}; +use arrow::error::{ArrowError, Result as ArrowResult}; +use tokio::sync::mpsc::{Receiver, Sender}; + +use crate::execution::context::TaskContext; + +/// Recursive query execution plan. +/// +/// This plan has two components: a base part (the static term) and +/// a dynamic part (the recuursive term). The execution will start from +/// the base, and as long as the previous iteration produced at least +/// a single new row (taking care of the distinction) the recursive +/// part will be continously executed. +/// +/// Before each execution of the dynamic part, the rows from the previous +/// iteration will be available in a "working table" (not a real table, +/// can be only accessed using a continuance operation). +/// +/// Note that there won't be any limit or checks applied to detect +/// an infinite recurison, so it is up to the planner to ensure that +/// it won't happen. +#[derive(Debug)] +pub struct RecursiveQueryExec { + /// Name of the query handler + name: String, + /// The base part (static term) + static_term: Arc, + /// The dynamic part (recursive term) + recursive_term: Arc, + /// Distinction + is_distinct: bool, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, +} + +impl RecursiveQueryExec { + /// Create a new RecursiveQueryExec + pub fn new( + name: String, + static_term: Arc, + recursive_term: Arc, + is_distinct: bool, + ) -> Self { + RecursiveQueryExec { + name, + static_term, + recursive_term, + is_distinct, + metrics: ExecutionPlanMetricsSet::new(), + } + } +} + +impl ExecutionPlan for RecursiveQueryExec { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + self.static_term.schema() + } + + fn children(&self) -> Vec> { + vec![self.static_term.clone(), self.recursive_term.clone()] + } + + // Distribution on a recursive query is really tricky to handle. + // For now, we are going to use a single partition but in the + // future we might find a better way to handle this. + fn required_child_distribution(&self) -> Distribution { + Distribution::SinglePartition + } + + fn output_partitioning(&self) -> Partitioning { + Partitioning::UnknownPartitioning(1) + } + + // TODO: control these hints and see whether we can + // infer some from the child plans (static/recurisve terms). + + fn relies_on_input_order(&self) -> bool { + false + } + + fn maintains_input_order(&self) -> bool { + false + } + + fn benefits_from_input_partitioning(&self) -> bool { + false + } + + fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { + None + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + Ok(Arc::new(RecursiveQueryExec::new( + self.name.clone(), + children[0].clone(), + children[1].clone(), + self.is_distinct, + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + // All partitions must be coalesced before coming to RecursiveQueryExec. + // TODO: we might be able to handle multiple partitions in the future. + if partition != 0 { + return Err(DataFusionError::Internal(format!( + "RecursiveQueryExec got an invalid partition {} (expected 0)", + partition + ))); + } + + let static_stream = self.static_term.execute(partition, context.clone())?; + let baseline_metrics = BaselineMetrics::new(&self.metrics, partition); + Ok(Box::pin(RecursiveQueryStream::new( + context, + self.name.clone(), + self.recursive_term.clone(), + static_stream, + baseline_metrics, + ))) + } + + fn fmt_as( + &self, + t: DisplayFormatType, + f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + match t { + DisplayFormatType::Default => { + write!(f, "RecursiveQueryExec: is_distinct={}", self.is_distinct) + } + } + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn statistics(&self) -> Statistics { + Statistics::default() + } +} + +/// The actual logic of the recursive queries happens during the streaming +/// process. A simplified version of the algorithm is the following: +/// +/// buffer = [] +/// +/// while batch := static_stream.next(): +/// buffer.push(batch) +/// yield buffer +/// +/// while buffer.len() > 0: +/// sender, receiver = Channel() +/// register_continuation(handle_name, receiver) +/// sender.send(buffer.drain()) +/// recursive_stream = recursive_term.execute() +/// while batch := recursive_stream.next(): +/// buffer.append(batch) +/// yield buffer +/// +struct RecursiveQueryStream { + /// The context to be used for managing handlers & executing new tasks + task_context: Arc, + /// Name of the relation handler to be used by the recursive term + name: String, + /// The dynamic part (recursive term) as is (without being executed) + recursive_term: Arc, + /// The static part (static term) as a stream. If the processing of this + /// part is completed, then it will be None. + static_stream: Option, + /// The dynamic part (recursive term) as a stream. If the processing of this + /// part has not started yet, or has been completed, then it will be None. + recursive_stream: Option, + /// The schema of the output. + schema: SchemaRef, + /// In-memory buffer for storing a copy of the current results. Will be + /// cleared after each iteration. + buffer: Vec, + /// Metrics. + _baseline_metrics: BaselineMetrics, +} + +impl RecursiveQueryStream { + /// Create a new recursive query stream + fn new( + task_context: Arc, + name: String, + recursive_term: Arc, + static_stream: SendableRecordBatchStream, + baseline_metrics: BaselineMetrics, + ) -> Self { + let schema = static_stream.schema(); + Self { + task_context, + name, + recursive_term, + static_stream: Some(static_stream), + recursive_stream: None, + schema, + buffer: vec![], + _baseline_metrics: baseline_metrics, + } + } + + /// Push a clone of the given batch to the in memory buffer, and then return + /// a poll with it. + fn push_batch( + mut self: std::pin::Pin<&mut Self>, + batch: RecordBatch, + ) -> Poll>> { + self.buffer.push(batch.clone()); + Poll::Ready(Some(Ok(batch))) + } + + /// Start polling for the next iteration, will be called either after the static term + /// is completed or another term is completed. It will follow the algorithm above on + /// to check whether the recursion has ended. + fn poll_next_iteration( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + let total_length = self + .buffer + .iter() + .fold(0, |acc, batch| acc + batch.num_rows()); + + if total_length == 0 { + return Poll::Ready(None); + } + + // The initial capacity of the channels is the same as the number of partitions + // we currently hold in the buffer. + let (sender, receiver): ( + Sender>, + Receiver>, + ) = mpsc::channel(self.buffer.len() + 1); + + // There shouldn't be any handlers with this name, since the execution of recursive + // term will immediately consume the relation handler. + self.task_context + .push_relation_handler(self.name.clone(), receiver)?; + + // This part heavily assumes that the buffer is not going to change. Maybe we + // should use a mutex? + for batch in self.buffer.drain(..) { + match sender.try_send(Ok(batch)) { + Ok(_) => {} + Err(e) => { + return Poll::Ready(Some(Err(ArrowError::from_external_error( + Box::new(e), + )))) + } + } + } + + self.recursive_stream = + Some(self.recursive_term.execute(0, self.task_context.clone())?); + self.poll_next(cx) + } +} + +impl Stream for RecursiveQueryStream { + type Item = ArrowResult; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + // TODO: we should use this poll to record some metrics! + if let Some(static_stream) = &mut self.static_stream { + // While the static term's stream is available, we'll be forwarding the batches from it (also + // saving them for the initial iteration of the recursive term). + let poll = static_stream.poll_next_unpin(cx); + match &poll { + Poll::Ready(None) => { + // Once this is done, we can start running the setup for the recursive term. + self.static_stream = None; + self.poll_next_iteration(cx) + } + Poll::Ready(Some(Ok(batch))) => self.push_batch(batch.clone()), + _ => poll, + } + } else if let Some(recursive_stream) = &mut self.recursive_stream { + let poll = recursive_stream.poll_next_unpin(cx); + match &poll { + Poll::Ready(None) => { + self.recursive_stream = None; + self.poll_next_iteration(cx) + } + Poll::Ready(Some(Ok(batch))) => self.push_batch(batch.clone()), + _ => poll, + } + } else { + Poll::Ready(None) + } + } +} + +impl RecordBatchStream for RecursiveQueryStream { + /// Get the schema + fn schema(&self) -> SchemaRef { + self.schema.clone() + } +} + +#[cfg(test)] +mod tests {} diff --git a/datafusion/core/src/physical_plan/stream.rs b/datafusion/core/src/physical_plan/stream.rs index 77be217e4cfa..63f786e341f5 100644 --- a/datafusion/core/src/physical_plan/stream.rs +++ b/datafusion/core/src/physical_plan/stream.rs @@ -37,12 +37,14 @@ pub struct RecordBatchReceiverStream { inner: ReceiverStream>, #[allow(dead_code)] - drop_helper: AbortOnDropSingle<()>, + drop_helper: Option>, } impl RecordBatchReceiverStream { /// Construct a new [`RecordBatchReceiverStream`] which will send - /// batches of the specified schema from `inner` + /// batches of the specified schema from `inner`. This API also expects + /// a `JoinHandle` to the task that is producing the batches. If there + /// isn't an accessible handle, see [`create_without_handle`]. pub fn create( schema: &SchemaRef, rx: tokio::sync::mpsc::Receiver>, @@ -53,7 +55,22 @@ impl RecordBatchReceiverStream { Box::pin(Self { schema, inner, - drop_helper: AbortOnDropSingle::new(join_handle), + drop_helper: Some(AbortOnDropSingle::new(join_handle)), + }) + } + + /// Construct a new [`RecordBatchReceiverStream`] which will send + /// batches of the specified schema from `inner`. + pub fn create_without_handle( + schema: &SchemaRef, + rx: tokio::sync::mpsc::Receiver>, + ) -> SendableRecordBatchStream { + let schema = schema.clone(); + let inner = ReceiverStream::new(rx); + Box::pin(Self { + schema, + inner, + drop_helper: None, }) } } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index a58068a0d22b..2d0ccc567dac 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -28,9 +28,9 @@ use crate::{and, binary_expr, Operator}; use crate::{ logical_plan::{ Aggregate, Analyze, CrossJoin, Distinct, EmptyRelation, Explain, Filter, Join, - JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Projection, - RecursiveQuery, Repartition, Sort, SubqueryAlias, TableScan, ToStringifiedPlan, - Union, Values, Window, + JoinConstraint, JoinType, Limit, LogicalPlan, NamedRelation, Partitioning, + PlanType, Projection, RecursiveQuery, Repartition, Sort, SubqueryAlias, + TableScan, ToStringifiedPlan, Union, Values, Window, }, utils::{ can_hash, expand_qualified_wildcard, expand_wildcard, expr_to_columns, @@ -116,9 +116,14 @@ impl LogicalPlanBuilder { })) } - pub fn empty_with_schema(produce_one_row: bool, schema: DFSchemaRef) -> Self { - Self::from(LogicalPlan::EmptyRelation(EmptyRelation { - produce_one_row, + /// A named temporary relation with a schema. + /// + /// This is used to represent a relation that does not exist at the + /// planning stage, but will be created at execution time with the + /// given schema. + pub fn named_relation(name: &str, schema: DFSchemaRef) -> Self { + Self::from(LogicalPlan::NamedRelation(NamedRelation { + name: name.to_string(), schema, })) } @@ -454,11 +459,18 @@ impl LogicalPlanBuilder { /// Convert a regular plan into a recursive query. pub fn to_recursive_query( &self, + name: String, recursive_term: LogicalPlan, is_distinct: bool, ) -> Result { // TODO: we need to do a bunch of validation here. Maybe more. + if is_distinct { + return Err(DataFusionError::NotImplemented( + "Recursive queries with distinct is not supported".to_string(), + )); + } Ok(Self::from(LogicalPlan::RecursiveQuery(RecursiveQuery { + name, static_term: Arc::new(self.plan.clone()), recursive_term: Arc::new(recursive_term), is_distinct, diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 8aac63cd5f45..c02f4bbf243a 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -25,9 +25,9 @@ pub use plan::{ Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable, CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, DropView, EmptyRelation, Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, - LogicalPlan, Partitioning, PlanType, PlanVisitor, Projection, RecursiveQuery, - Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, - ToStringifiedPlan, Union, Values, Window, + LogicalPlan, NamedRelation, Partitioning, PlanType, PlanVisitor, Projection, + RecursiveQuery, Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, + TableScan, ToStringifiedPlan, Union, Values, Window, }; pub use display::display_schema; diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 2fc61af95097..2dd5230fb278 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -68,6 +68,8 @@ pub enum LogicalPlan { TableScan(TableScan), /// Produces no rows: An empty relation with an empty schema EmptyRelation(EmptyRelation), + /// A named temporary relation with a schema. + NamedRelation(NamedRelation), /// Subquery Subquery(Subquery), /// Aliased relation provides, or changes, the name of a relation. @@ -111,6 +113,7 @@ impl LogicalPlan { pub fn schema(&self) -> &DFSchemaRef { match self { LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) => schema, + LogicalPlan::NamedRelation(NamedRelation { schema, .. }) => schema, LogicalPlan::Values(Values { schema, .. }) => schema, LogicalPlan::TableScan(TableScan { projected_schema, .. @@ -189,6 +192,7 @@ impl LogicalPlan { LogicalPlan::Explain(Explain { schema, .. }) | LogicalPlan::Analyze(Analyze { schema, .. }) | LogicalPlan::EmptyRelation(EmptyRelation { schema, .. }) + | LogicalPlan::NamedRelation(NamedRelation { schema, .. }) | LogicalPlan::CreateExternalTable(CreateExternalTable { schema, .. }) | LogicalPlan::CreateCatalogSchema(CreateCatalogSchema { schema, .. }) | LogicalPlan::CreateCatalog(CreateCatalog { schema, .. }) => { @@ -255,6 +259,7 @@ impl LogicalPlan { // plans without expressions LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation(_) + | LogicalPlan::NamedRelation(_) | LogicalPlan::Subquery(_) | LogicalPlan::SubqueryAlias(_) | LogicalPlan::Limit(_) @@ -310,6 +315,7 @@ impl LogicalPlan { // plans without inputs LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation { .. } + | LogicalPlan::NamedRelation { .. } | LogicalPlan::Values { .. } | LogicalPlan::CreateExternalTable(_) | LogicalPlan::CreateCatalogSchema(_) @@ -469,6 +475,7 @@ impl LogicalPlan { // plans without inputs LogicalPlan::TableScan { .. } | LogicalPlan::EmptyRelation(_) + | LogicalPlan::NamedRelation(_) | LogicalPlan::Values(_) | LogicalPlan::CreateExternalTable(_) | LogicalPlan::CreateCatalogSchema(_) @@ -709,6 +716,9 @@ impl LogicalPlan { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self.0 { LogicalPlan::EmptyRelation(_) => write!(f, "EmptyRelation"), + LogicalPlan::NamedRelation(NamedRelation { name, .. }) => { + write!(f, "NamedRelation: {}", name) + } LogicalPlan::Values(Values { ref values, .. }) => { let str_values: Vec<_> = values .iter() @@ -1082,6 +1092,15 @@ pub struct EmptyRelation { pub schema: DFSchemaRef, } +/// A named temporary relation with a known schema. +#[derive(Clone)] +pub struct NamedRelation { + /// The relation name + pub name: String, + /// The schema description + pub schema: DFSchemaRef, +} + /// Values expression. See /// [Postgres VALUES](https://www.postgresql.org/docs/current/queries-values.html) /// documentation for more details. @@ -1346,6 +1365,8 @@ pub struct Distinct { /// A variadic query operation #[derive(Clone)] pub struct RecursiveQuery { + /// Name of the query + pub name: String, /// The static term pub static_term: Arc, /// The recursive term diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs index 4d8617153c21..c9a13f658d8f 100644 --- a/datafusion/expr/src/utils.rs +++ b/datafusion/expr/src/utils.rs @@ -514,13 +514,14 @@ pub fn from_plan( LogicalPlan::Distinct(Distinct { .. }) => Ok(LogicalPlan::Distinct(Distinct { input: Arc::new(inputs[0].clone()), })), - LogicalPlan::RecursiveQuery(RecursiveQuery { is_distinct, .. }) => { - Ok(LogicalPlan::RecursiveQuery(RecursiveQuery { - static_term: Arc::new(inputs[0].clone()), - recursive_term: Arc::new(inputs[1].clone()), - is_distinct: *is_distinct, - })) - } + LogicalPlan::RecursiveQuery(RecursiveQuery { + name, is_distinct, .. + }) => Ok(LogicalPlan::RecursiveQuery(RecursiveQuery { + name: name.clone(), + static_term: Arc::new(inputs[0].clone()), + recursive_term: Arc::new(inputs[1].clone()), + is_distinct: *is_distinct, + })), LogicalPlan::Analyze(a) => { assert!(expr.is_empty()); assert_eq!(inputs.len(), 1); @@ -545,6 +546,7 @@ pub fn from_plan( Ok(plan.clone()) } LogicalPlan::EmptyRelation(_) + | LogicalPlan::NamedRelation(_) | LogicalPlan::TableScan { .. } | LogicalPlan::CreateExternalTable(_) | LogicalPlan::DropTable(_) diff --git a/datafusion/optimizer/src/common_subexpr_eliminate.rs b/datafusion/optimizer/src/common_subexpr_eliminate.rs index 822ac6682d03..711cad0aed37 100644 --- a/datafusion/optimizer/src/common_subexpr_eliminate.rs +++ b/datafusion/optimizer/src/common_subexpr_eliminate.rs @@ -220,6 +220,7 @@ fn optimize( | LogicalPlan::TableScan { .. } | LogicalPlan::Values(_) | LogicalPlan::EmptyRelation(_) + | LogicalPlan::NamedRelation(_) | LogicalPlan::Subquery(_) | LogicalPlan::SubqueryAlias(_) | LogicalPlan::Limit(_) diff --git a/datafusion/optimizer/src/projection_push_down.rs b/datafusion/optimizer/src/projection_push_down.rs index 927d2967b805..e69557e40000 100644 --- a/datafusion/optimizer/src/projection_push_down.rs +++ b/datafusion/optimizer/src/projection_push_down.rs @@ -488,6 +488,7 @@ fn optimize_plan( | LogicalPlan::Filter { .. } | LogicalPlan::Repartition(_) | LogicalPlan::EmptyRelation(_) + | LogicalPlan::NamedRelation(_) | LogicalPlan::Subquery(_) | LogicalPlan::Values(_) | LogicalPlan::Sort { .. } diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index 1134631468af..3824ad927a63 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -1186,6 +1186,9 @@ impl AsLogicalPlan for LogicalPlanNode { )), }) } + LogicalPlan::NamedRelation(_) => Err(proto_error( + "LogicalPlan serde is not yet implemented for NamedRelation", + )), LogicalPlan::RecursiveQuery(_) => Err(proto_error( "LogicalPlan serde is not yet implemented for RecursiveQuery", )), diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 526855735fbf..e0b74f56c73d 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -387,7 +387,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { outer_query_schema, )?; - // Since the recursive CTE includes a component that references a + // Since the recursive CTEs include a component that references a // table with its name, like the example below: // // WITH RECURSIVE values(n) AS ( @@ -398,17 +398,26 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { // WHERE n < 100 // ) // - // The planner for the recursive term will complain that 'values' (self - // reference) is not defined. For getting around this, while preserving - // the actual schema of the whole operation, we'll register an empty relation - // as a CTE with the name 'values'. This will allow the SQL planner to - // handle the more complex cases (like joins and what not), and when we'll - // get it to the proper form once we are compiling it into the final form. - let term_scan = LogicalPlanBuilder::empty_with_schema( - false, + // We need a temporary 'relation' to be referenced and used. PostgreSQL + // calls this a 'working table', but it is entirely an implementation + // detail and a 'real' table with that name might not even exist (as + // in the case of DataFusion). + // + // Since we can't simply register a table during planning stage (it is + // an execution problem), we'll use a relation object that preserves the + // schema of the input perfectly and also knows which recursive CTE it is + // bound to. + + let named_rel = LogicalPlanBuilder::named_relation( + cte_name.as_str(), static_plan.schema().clone(), - ); - ctes.insert(cte_name.clone(), term_scan.build()?); + ) + .build()?; + + // For all the self references in the variadic term, we'll replace it + // with the temporary relation we created above by temporarily registering + // it as a CTE. + ctes.insert(cte_name.clone(), named_rel); let recursive_plan = self.set_expr_to_plan( *right, @@ -419,11 +428,15 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { )?; let final_plan = LogicalPlanBuilder::from(static_plan) - .to_recursive_query(recursive_plan, all)? + .to_recursive_query( + cte_name.clone(), + recursive_plan, + !all, + )? .build()?; - // This is where we are replacing the temporary CTE we registered with the actual - // one. + // Once everything done, we can deregister our temporary relation and replace it + // with the actual CTE plan. ctes.insert(cte_name.clone(), final_plan); } _ => {