Skip to content

Commit

Permalink
cleanup in enforce_distribution.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
matthewgapp committed Sep 20, 2023
1 parent b084809 commit 9527ed0
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 28 deletions.
8 changes: 6 additions & 2 deletions datafusion/core/src/physical_optimizer/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,17 @@ struct CoalesceContext {
}

impl CoalesceContext {
/// Only use this method at the root of the plan.
/// All other contexts should be created using `new_descendent`.
fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
Self {
has_recursive_ancestor: is_recursive_query(&plan),
plan,
}
}

/// Creates a new context for a descendent of this context.
/// The descendent will inherit the `has_recursive_ancestor` flag from this context.
fn new_descendent(&self, descendent_plan: Arc<dyn ExecutionPlan>) -> Self {
Self {
has_recursive_ancestor: self.has_recursive_ancestor
Expand Down Expand Up @@ -142,14 +146,14 @@ impl PhysicalOptimizerRule for CoalesceBatches {
config: &ConfigOptions,
) -> Result<Arc<dyn crate::physical_plan::ExecutionPlan>> {
if !config.execution.coalesce_batches {
// return Ok(plan);
return Ok(plan);
}

let target_batch_size = config.execution.batch_size;
let ctx = CoalesceContext::new(plan);
let CoalesceContext { plan, .. } = ctx.transform_up(&|ctx| {
if ctx.has_recursive_ancestor {
// return Ok(Transformed::No(ctx));
return Ok(Transformed::No(ctx));
}
let plan_any = ctx.plan.as_any();
// The goal here is to detect operators that could produce small batches and only
Expand Down
49 changes: 23 additions & 26 deletions datafusion/core/src/physical_optimizer/enforce_distribution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1241,10 +1241,6 @@ fn ensure_distribution(
dist_context: DistributionContext,
config: &ConfigOptions,
) -> Result<Transformed<DistributionContext>> {
let mut dist_context = dist_context;
if is_recursive_query(&dist_context.plan) {
dist_context.has_recursive_ancestor = true;
}
if dist_context.has_recursive_ancestor {
return Ok(Transformed::No(dist_context));
}
Expand Down Expand Up @@ -1436,7 +1432,7 @@ fn ensure_distribution(
plan.clone().with_new_children(new_children)?
},
distribution_onwards,
has_recursive_ancestor: has_recursive_ancestor || is_recursive_query(&plan),
has_recursive_ancestor,
};
Ok(Transformed::Yes(new_distribution_context))
}
Expand All @@ -1457,6 +1453,8 @@ struct DistributionContext {

impl DistributionContext {
/// Creates an empty context.
/// Only use this method at the root of the plan.
/// All other contexts should be created using `new_descendent`.
fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
let length = plan.children().len();
DistributionContext {
Expand All @@ -1466,13 +1464,19 @@ impl DistributionContext {
}
}

fn new_from_plan_with_parent(
parent: Arc<dyn ExecutionPlan>,
cur_plan: Arc<dyn ExecutionPlan>,
) -> Self {
let mut ctx = Self::new(cur_plan);
ctx.has_recursive_ancestor =
is_recursive_query(&parent) || ctx.has_recursive_ancestor;
/// Creates a new context from a descendent plan.
/// Importantly, this function propagates the `has_recursive_ancestor` flag.
fn new_descendent(&self, descendent_plan: Arc<dyn ExecutionPlan>) -> Self {
let mut ctx = Self::new(descendent_plan);
ctx.has_recursive_ancestor |= self.has_recursive_ancestor;
ctx
}

/// Creates a new context from a descendent context.
/// Importantly, this function propagates the `has_recursive_ancestor` flag.
fn new_descendent_from_ctx(&self, ctx: Self) -> Self {
let mut ctx = ctx;
ctx.has_recursive_ancestor |= self.has_recursive_ancestor;
ctx
}

Expand Down Expand Up @@ -1547,7 +1551,6 @@ impl DistributionContext {
}
})
.collect();

Ok(DistributionContext {
has_recursive_ancestor: is_recursive_query(&parent_plan),
plan: with_new_children_if_necessary(parent_plan, children_plans)?.into(),
Expand All @@ -1560,15 +1563,7 @@ impl DistributionContext {
self.plan
.children()
.into_iter()
.map(|child| {
let mut ctx = DistributionContext::new_from_plan_with_parent(
self.plan.clone(),
child,
);
ctx.has_recursive_ancestor =
self.has_recursive_ancestor || ctx.has_recursive_ancestor;
ctx
})
.map(|child| self.new_descendent(child))
.collect()
}
}
Expand Down Expand Up @@ -1600,10 +1595,12 @@ impl TreeNode for DistributionContext {
.into_iter()
.map(transform)
.collect::<Result<Vec<_>>>()?;
let mut ctx =
DistributionContext::new_from_children_nodes(children_nodes, self.plan)?;
ctx.has_recursive_ancestor |= self.has_recursive_ancestor;
Ok(ctx)

DistributionContext::new_from_children_nodes(
children_nodes,
self.plan.clone(),
)
.map(|ctx| self.new_descendent_from_ctx(ctx))
}
}
}
Expand Down

0 comments on commit 9527ed0

Please sign in to comment.