Skip to content

Commit

Permalink
Implement recursive query execution
Browse files Browse the repository at this point in the history
  • Loading branch information
isidentical committed Oct 4, 2022
1 parent e89e1fc commit 4b8c5e6
Show file tree
Hide file tree
Showing 15 changed files with 690 additions and 41 deletions.
38 changes: 38 additions & 0 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use std::{
};

use arrow::datatypes::{DataType, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;

use crate::catalog::{
Expand Down Expand Up @@ -118,7 +119,9 @@ use datafusion_sql::{
parser::DFParser,
planner::{ContextProvider, SqlToRel},
};
use parking_lot::Mutex;
use parquet::file::properties::WriterProperties;
use tokio::sync::mpsc::Receiver as SingleChannelReceiver;
use uuid::Uuid;

use super::options::{
Expand Down Expand Up @@ -1736,6 +1739,8 @@ pub enum TaskProperties {
KVPairs(HashMap<String, String>),
}

type RelationHandler = SingleChannelReceiver<ArrowResult<RecordBatch>>;

/// Task Execution Context
pub struct TaskContext {
/// Session Id
Expand All @@ -1750,6 +1755,8 @@ pub struct TaskContext {
aggregate_functions: HashMap<String, Arc<AggregateUDF>>,
/// Runtime environment associated with this task context
runtime: Arc<RuntimeEnv>,
/// Registered relation handlers
relation_handlers: Mutex<HashMap<String, RelationHandler>>,
}

impl TaskContext {
Expand All @@ -1769,6 +1776,7 @@ impl TaskContext {
scalar_functions,
aggregate_functions,
runtime,
relation_handlers: Mutex::new(HashMap::new()),
}
}

Expand Down Expand Up @@ -1824,6 +1832,34 @@ impl TaskContext {
pub fn runtime_env(&self) -> Arc<RuntimeEnv> {
self.runtime.clone()
}

/// Register a new relation handler. If a handler with the same name already exists
/// this function will return an error.
pub fn push_relation_handler(
&self,
name: String,
handler: RelationHandler,
) -> Result<()> {
let mut handlers = self.relation_handlers.lock();
if handlers.contains_key(&name) {
return Err(DataFusionError::Internal(format!(
"Relation handler {} already registered",
name
)));
}
handlers.insert(name, handler);
Ok(())
}

/// Retrieve the relation handler for the given name. It will remove the handler from
/// the storage if it exists, and return it as is.
pub fn pop_relation_handler(&self, name: String) -> Result<RelationHandler> {
let mut handlers = self.relation_handlers.lock();

handlers.remove(name.as_str()).ok_or_else(|| {
DataFusionError::Internal(format!("Relation handler {} not registered", name))
})
}
}

/// Create a new task context instance from SessionContext
Expand All @@ -1846,6 +1882,7 @@ impl From<&SessionContext> for TaskContext {
scalar_functions,
aggregate_functions,
runtime,
relation_handlers: Mutex::new(HashMap::new()),
}
}
}
Expand All @@ -1865,6 +1902,7 @@ impl From<&SessionState> for TaskContext {
scalar_functions,
aggregate_functions,
runtime,
relation_handlers: Mutex::new(HashMap::new()),
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions datafusion/core/src/logical_plan/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ pub use datafusion_expr::{
Aggregate, Analyze, CreateCatalog, CreateCatalogSchema, CreateExternalTable,
CreateMemoryTable, CreateView, CrossJoin, Distinct, DropTable, EmptyRelation,
Explain, Extension, Filter, Join, JoinConstraint, JoinType, Limit, LogicalPlan,
Partitioning, PlanType, PlanVisitor, Projection, RecursiveQuery, Repartition,
Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan, ToStringifiedPlan,
Union, UserDefinedLogicalNode, Values, Window,
NamedRelation, Partitioning, PlanType, PlanVisitor, Projection, RecursiveQuery,
Repartition, Sort, StringifiedPlan, Subquery, SubqueryAlias, TableScan,
ToStringifiedPlan, Union, UserDefinedLogicalNode, Values, Window,
},
TableProviderFilterPushDown, TableSource,
};
162 changes: 162 additions & 0 deletions datafusion/core/src/physical_plan/continuance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines the continuance query plan

use std::any::Any;
use std::sync::Arc;

use crate::error::{DataFusionError, Result};
use crate::physical_plan::{
DisplayFormatType, Distribution, ExecutionPlan, Partitioning,
};
use arrow::datatypes::SchemaRef;

use super::expressions::PhysicalSortExpr;
use super::stream::RecordBatchReceiverStream;
use super::{
metrics::{ExecutionPlanMetricsSet, MetricsSet},
SendableRecordBatchStream, Statistics,
};

use crate::execution::context::TaskContext;

/// A temporary "working table" operation wehre the input data will be
/// taken from the named handle during the execution and will be re-published
/// as is (kind of like a mirror).
///
/// Most notably used in the implementation of recursive queries where the
/// underlying relation does not exist yet but the data will come as the previous
/// term is evaluated.
#[derive(Debug)]
pub struct ContinuanceExec {
/// Name of the relation handler
name: String,
/// The schema of the stream
schema: SchemaRef,
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
}

impl ContinuanceExec {
/// Create a new execution plan for a continuance stream. The given relation
/// handler must exist in the task context before calling [`execute`] on this
/// plan.
pub fn new(name: String, schema: SchemaRef) -> Self {
Self {
name,
schema,
metrics: ExecutionPlanMetricsSet::new(),
}
}
}

impl ExecutionPlan for ContinuanceExec {
fn as_any(&self) -> &dyn Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}

fn required_child_distribution(&self) -> Distribution {
Distribution::UnspecifiedDistribution
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
}

fn relies_on_input_order(&self) -> bool {
false
}

fn maintains_input_order(&self) -> bool {
false
}

fn benefits_from_input_partitioning(&self) -> bool {
false
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
}

fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(ContinuanceExec::new(
self.name.clone(),
self.schema.clone(),
)))
}

/// This plan does not come with any special streams, but rather we use
/// the existing [`RecordBatchReceiverStream`] to receive the data from
/// the registered handle.
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
// Continuance streams must be the plan base.
if partition != 0 {
return Err(DataFusionError::Internal(format!(
"ContinuanceExec got an invalid partition {} (expected 0)",
partition
)));
}

// The relation handler must be already registered by the
// parent op.
let receiver = context.pop_relation_handler(self.name.clone())?;
Ok(RecordBatchReceiverStream::create_without_handle(
&self.schema,
receiver,
))
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default => {
write!(f, "ContinuanceExec: name={}", self.name)
}
}
}

fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}

fn statistics(&self) -> Statistics {
Statistics::default()
}
}

#[cfg(test)]
mod tests {}
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,7 @@ pub mod analyze;
pub mod coalesce_batches;
pub mod coalesce_partitions;
pub mod common;
pub mod continuance;
pub mod cross_join;
pub mod display;
pub mod empty;
Expand All @@ -563,6 +564,7 @@ pub mod memory;
pub mod metrics;
pub mod planner;
pub mod projection;
pub mod recursive_query;
pub mod repartition;
pub mod sort_merge_join;
pub mod sorts;
Expand Down
30 changes: 25 additions & 5 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use crate::datasource::source_as_provider;
use crate::execution::context::{ExecutionProps, SessionState};
use crate::logical_expr::utils::generate_sort_key;
use crate::logical_plan::plan::{
Aggregate, Distinct, EmptyRelation, Filter, Join, Projection, Sort, SubqueryAlias,
TableScan, Window,
Aggregate, Distinct, EmptyRelation, Filter, Join, NamedRelation, Projection, Sort,
SubqueryAlias, TableScan, Window,
};
use crate::logical_plan::{
unalias, unnormalize_cols, CrossJoin, DFSchema, Expr, LogicalPlan,
Expand All @@ -40,13 +40,15 @@ use crate::logical_plan::{Limit, Values};
use crate::physical_expr::create_physical_expr;
use crate::physical_optimizer::optimizer::PhysicalOptimizerRule;
use crate::physical_plan::aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy};
use crate::physical_plan::continuance::ContinuanceExec;
use crate::physical_plan::cross_join::CrossJoinExec;
use crate::physical_plan::explain::ExplainExec;
use crate::physical_plan::expressions::{Column, PhysicalSortExpr};
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::hash_join::HashJoinExec;
use crate::physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
use crate::physical_plan::projection::ProjectionExec;
use crate::physical_plan::recursive_query::RecursiveQueryExec;
use crate::physical_plan::repartition::RepartitionExec;
use crate::physical_plan::sorts::sort::SortExec;
use crate::physical_plan::windows::WindowAggExec;
Expand Down Expand Up @@ -1028,9 +1030,27 @@ impl DefaultPhysicalPlanner {

Ok(Arc::new(GlobalLimitExec::new(input, *skip, *fetch)))
}
LogicalPlan::RecursiveQuery(RecursiveQuery { .. }) => {
dbg!(logical_plan);
todo!();
LogicalPlan::RecursiveQuery(RecursiveQuery { name, static_term, recursive_term, is_distinct }) => {
let static_term = self.create_initial_plan(static_term, session_state).await?;
let recursive_term = self.create_initial_plan(recursive_term, session_state).await?;

Ok(Arc::new(RecursiveQueryExec::new(name.clone(), static_term, recursive_term, *is_distinct)))
}
LogicalPlan::NamedRelation(NamedRelation {name, schema}) => {
// Named relations is how we represent access to any sort of dynamic data provider. They
// differ from tables in the sense that they can start existing dynamically during the
// execution of a query and then disappear before it even finishes.
//
// This system allows us to replicate the tricky behavior of classical databases where a
// temporary "working table" (as it is called in Postgres) can be used when dealing with
// complex operations (such as recursive CTEs) and then can be dropped. Since DataFusion
// at its core is heavily stream-based and vectorized, we try to avoid using 'real' tables
// and let the streams take care of the data flow in this as well.

// Since the actual "input"'s will be only available to us at runtime (through task context)
// we can't really do any sort of meaningful validation here.
let schema = SchemaRef::new(schema.as_ref().to_owned().into());
Ok(Arc::new(ContinuanceExec::new(name.clone(), schema)))
}
LogicalPlan::CreateExternalTable(_) => {
// There is no default plan for "CREATE EXTERNAL
Expand Down
Loading

0 comments on commit 4b8c5e6

Please sign in to comment.