Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Matt/recursive cte eliminate distribution and coalesce in recursive children #2

Draft
wants to merge 15 commits into
base: matt/recursive-cte
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions datafusion/common/src/dfschema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,11 @@ impl DFField {
self.field = f.into();
self
}

pub fn with_qualifier(mut self, qualifier: impl Into<OwnedTableReference>) -> Self {
self.qualifier = Some(qualifier.into());
self
}
}

impl From<FieldRef> for DFField {
Expand Down
119 changes: 111 additions & 8 deletions datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
//! in bigger batches to avoid overhead with small batches

use crate::config::ConfigOptions;
use crate::physical_optimizer::utils::get_plan_string;
use crate::{
error::Result,
physical_optimizer::PhysicalOptimizerRule,
Expand All @@ -27,9 +28,14 @@ use crate::{
repartition::RepartitionExec, Partitioning,
},
};
use datafusion_common::tree_node::{Transformed, TreeNode};
use datafusion_common::tree_node::{DynTreeNode, Transformed, TreeNode, VisitRecursion};
use datafusion_physical_plan::ExecutionPlan;
use itertools::Itertools;
use std::fmt::{self, Formatter};
use std::sync::Arc;

use super::utils::is_recursive_query;

/// Optimizer rule that introduces CoalesceBatchesExec to avoid overhead with small batches that
/// are produced by highly selective filters
#[derive(Default)]
Expand All @@ -41,6 +47,98 @@ impl CoalesceBatches {
Self::default()
}
}

struct CoalesceContext {
plan: Arc<dyn ExecutionPlan>,
// keep track of whether we've encountered a RecursiveQuery
has_recursive_ancestor: bool,
}

impl CoalesceContext {
/// Only use this method at the root of the plan.
/// All other contexts should be created using `new_descendent`.
fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
Self {
has_recursive_ancestor: is_recursive_query(&plan),
plan,
}
}

/// Creates a new context for a descendent of this context.
/// The descendent will inherit the `has_recursive_ancestor` flag from this context.
fn new_descendent(&self, descendent_plan: Arc<dyn ExecutionPlan>) -> Self {
let ancestor = self;
Self {
has_recursive_ancestor: ancestor.has_recursive_ancestor
|| is_recursive_query(&descendent_plan),
plan: descendent_plan,
}
}

/// Computes distribution tracking contexts for every child of the plan.
fn children(&self) -> Vec<CoalesceContext> {
self.plan
.children()
.into_iter()
.map(|child| self.new_descendent(child))
.collect()
}
}

impl TreeNode for CoalesceContext {
fn apply_children<F>(&self, op: &mut F) -> Result<VisitRecursion>
where
F: FnMut(&Self) -> Result<VisitRecursion>,
{
for child in self.children() {
match op(&child)? {
VisitRecursion::Continue => {}
VisitRecursion::Skip => return Ok(VisitRecursion::Continue),
VisitRecursion::Stop => return Ok(VisitRecursion::Stop),
}
}
Ok(VisitRecursion::Continue)
}

fn map_children<F>(self, transform: F) -> Result<Self>
where
F: FnMut(Self) -> Result<Self>,
{
let children = self.children();
if children.is_empty() {
Ok(self)
} else {
let new_children = children
.into_iter()
.map(transform)
.collect::<Result<Vec<_>>>()?;

Ok(self.new_descendent(
self.plan.with_new_arc_children(
self.plan.clone(),
new_children
.into_iter()
.map(|CoalesceContext { plan, .. }| plan)
.collect(),
)?,
))
}
}
}

impl fmt::Display for CoalesceContext {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let plan_string = get_plan_string(&self.plan);
write!(f, "plan: {:?}", plan_string)?;
write!(
f,
"has_recursive_ancestor: {:?}",
self.has_recursive_ancestor
)?;
write!(f, "")
}
}

impl PhysicalOptimizerRule for CoalesceBatches {
fn optimize(
&self,
Expand All @@ -52,8 +150,12 @@ impl PhysicalOptimizerRule for CoalesceBatches {
}

let target_batch_size = config.execution.batch_size;
plan.transform_up(&|plan| {
let plan_any = plan.as_any();
let ctx = CoalesceContext::new(plan);
let CoalesceContext { plan, .. } = ctx.transform_up(&|ctx| {
if ctx.has_recursive_ancestor {
return Ok(Transformed::No(ctx));
}
let plan_any = ctx.plan.as_any();
// The goal here is to detect operators that could produce small batches and only
// wrap those ones with a CoalesceBatchesExec operator. An alternate approach here
// would be to build the coalescing logic directly into the operators
Expand All @@ -71,14 +173,15 @@ impl PhysicalOptimizerRule for CoalesceBatches {
})
.unwrap_or(false);
if wrap_in_coalesce {
Ok(Transformed::Yes(Arc::new(CoalesceBatchesExec::new(
plan,
target_batch_size,
Ok(Transformed::Yes(ctx.new_descendent(Arc::new(
CoalesceBatchesExec::new(ctx.plan.clone(), target_batch_size),
))))
} else {
Ok(Transformed::No(plan))
Ok(Transformed::No(ctx))
}
})
})?;

Ok(plan)
}

fn name(&self) -> &str {
Expand Down
50 changes: 48 additions & 2 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ use datafusion_physical_plan::windows::{get_best_fitting_window, BoundedWindowAg

use itertools::izip;

use super::utils::is_recursive_query;

/// The `EnforceDistribution` rule ensures that distribution requirements are
/// met. In doing so, this rule will increase the parallelism in the plan by
/// introducing repartitioning operators to the physical plan.
Expand Down Expand Up @@ -1088,6 +1090,7 @@ fn remove_dist_changing_operators(
let DistributionContext {
mut plan,
mut distribution_onwards,
has_recursive_ancestor,
} = distribution_context;

// Remove any distribution changing operators at the beginning:
Expand All @@ -1107,6 +1110,7 @@ fn remove_dist_changing_operators(
Ok(DistributionContext {
plan,
distribution_onwards,
has_recursive_ancestor,
})
}

Expand Down Expand Up @@ -1176,6 +1180,9 @@ fn ensure_distribution(
dist_context: DistributionContext,
config: &ConfigOptions,
) -> Result<Transformed<DistributionContext>> {
if dist_context.has_recursive_ancestor {
return Ok(Transformed::No(dist_context));
}
let target_partitions = config.execution.target_partitions;
// When `false`, round robin repartition will not be added to increase parallelism
let enable_round_robin = config.optimizer.enable_round_robin_repartition;
Expand All @@ -1196,6 +1203,7 @@ fn ensure_distribution(
let DistributionContext {
mut plan,
mut distribution_onwards,
has_recursive_ancestor,
} = remove_dist_changing_operators(dist_context)?;

if let Some(exec) = plan.as_any().downcast_ref::<WindowAggExec>() {
Expand Down Expand Up @@ -1365,6 +1373,7 @@ fn ensure_distribution(
plan.with_new_children(new_children)?
},
distribution_onwards,
has_recursive_ancestor,
};
Ok(Transformed::Yes(new_distribution_context))
}
Expand All @@ -1379,18 +1388,43 @@ struct DistributionContext {
/// Keep track of associations for each child of the plan. If `None`,
/// there is no distribution changing operator in its descendants.
distribution_onwards: Vec<Option<ExecTree>>,
// keep track of whether we've encountered a RecursiveQuery
has_recursive_ancestor: bool,
}

impl DistributionContext {
/// Creates an empty context.
/// Only use this method at the root of the plan.
/// All other contexts should be created using `new_descendent`.
fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
let length = plan.children().len();
DistributionContext {
has_recursive_ancestor: is_recursive_query(&plan),
plan,
distribution_onwards: vec![None; length],
}
}

/// Creates a new context from a descendent plan.
/// Importantly, this function propagates the `has_recursive_ancestor` flag.
fn new_descendent(&self, descendent_plan: Arc<dyn ExecutionPlan>) -> Self {
let ancestor = self;

let mut new_ctx = Self::new(descendent_plan);
new_ctx.has_recursive_ancestor |= ancestor.has_recursive_ancestor;
new_ctx
}

/// Creates a new context from a descendent context.
/// Importantly, this function propagates the `has_recursive_ancestor` flag.
fn new_descendent_from_ctx(&self, ctx: Self) -> Self {
let ancestor = self;

let mut ctx = ctx;
ctx.has_recursive_ancestor |= ancestor.has_recursive_ancestor;
ctx
}

/// Constructs a new context from children contexts.
fn new_from_children_nodes(
children_nodes: Vec<DistributionContext>,
Expand All @@ -1410,6 +1444,7 @@ impl DistributionContext {
// that change distribution, or preserves the existing
// distribution (starting from an operator that change distribution).
distribution_onwards,
..
} = context;
if plan.children().is_empty() {
// Plan has no children, there is nothing to propagate.
Expand Down Expand Up @@ -1462,6 +1497,7 @@ impl DistributionContext {
})
.collect();
Ok(DistributionContext {
has_recursive_ancestor: is_recursive_query(&parent_plan),
plan: with_new_children_if_necessary(parent_plan, children_plans)?.into(),
distribution_onwards,
})
Expand All @@ -1472,7 +1508,7 @@ impl DistributionContext {
self.plan
.children()
.into_iter()
.map(DistributionContext::new)
.map(|child| self.new_descendent(child))
.collect()
}
}
Expand Down Expand Up @@ -1504,7 +1540,12 @@ impl TreeNode for DistributionContext {
.into_iter()
.map(transform)
.collect::<Result<Vec<_>>>()?;
DistributionContext::new_from_children_nodes(children_nodes, self.plan)

DistributionContext::new_from_children_nodes(
children_nodes,
self.plan.clone(),
)
.map(|ctx| self.new_descendent_from_ctx(ctx))
}
}
}
Expand All @@ -1514,6 +1555,11 @@ impl fmt::Display for DistributionContext {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let plan_string = get_plan_string(&self.plan);
write!(f, "plan: {:?}", plan_string)?;
write!(
f,
"has_recursive_ancestor: {:?}",
self.has_recursive_ancestor,
)?;
for (idx, child) in self.distribution_onwards.iter().enumerate() {
if let Some(child) = child {
write!(f, "idx:{:?}, exec_tree:{}", idx, child)?;
Expand Down
9 changes: 9 additions & 0 deletions datafusion/core/src/physical_optimizer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@

//! Collection of utility functions that are leveraged by the query optimizer rules

use datafusion_expr::RecursiveQuery;
use datafusion_physical_plan::recursive_query::RecursiveQueryExec;
use itertools::concat;
use std::borrow::Borrow;
use std::collections::HashSet;
use std::fmt;
use std::fmt::Formatter;
use std::sync::Arc;
Expand Down Expand Up @@ -155,6 +160,10 @@ pub fn is_repartition(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<RepartitionExec>()
}

pub fn is_recursive_query(plan: &Arc<dyn ExecutionPlan>) -> bool {
plan.as_any().is::<RecursiveQueryExec>()
}

/// Utility function yielding a string representation of the given [`ExecutionPlan`].
pub fn get_plan_string(plan: &Arc<dyn ExecutionPlan>) -> Vec<String> {
let formatted = displayable(plan.as_ref()).indent(true).to_string();
Expand Down
Loading