Skip to content

Commit 0ac4a3a

Browse files
author
Jiayu Liu
committed
allow window aggr to be parallelizable
1 parent 64d2bf3 commit 0ac4a3a

File tree

5 files changed

+22
-21
lines changed

5 files changed

+22
-21
lines changed

datafusion/src/execution/context.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,9 @@ pub struct ExecutionConfig {
640640
/// Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel
641641
/// using the provided `concurrency` level
642642
pub repartition_aggregations: bool,
643+
/// Should DataFusion repartition data using the partition keys to execute window functions in
644+
/// parallel using the provided `concurrency` level
645+
pub repartition_windows: bool,
643646
}
644647

645648
impl ExecutionConfig {
@@ -669,6 +672,7 @@ impl ExecutionConfig {
669672
information_schema: false,
670673
repartition_joins: true,
671674
repartition_aggregations: true,
675+
repartition_windows: true,
672676
}
673677
}
674678

@@ -752,11 +756,18 @@ impl ExecutionConfig {
752756
self.repartition_joins = enabled;
753757
self
754758
}
759+
755760
/// Enables or disables the use of repartitioning for aggregations to improve parallelism
756761
pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
757762
self.repartition_aggregations = enabled;
758763
self
759764
}
765+
766+
/// Enables or disables the use of repartitioning for window functions to improve parallelism
767+
pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
768+
self.repartition_windows = enabled;
769+
self
770+
}
760771
}
761772

762773
/// Holds per-execution properties and data (such as starting timestamps, etc).

datafusion/src/physical_plan/planner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ impl DefaultPhysicalPlanner {
159159

160160
let can_repartition = !partition_keys.is_empty()
161161
&& ctx_state.config.concurrency > 1
162-
&& ctx_state.config.repartition_aggregations;
162+
&& ctx_state.config.repartition_windows;
163163

164164
let input_exec = if can_repartition {
165165
let partition_keys = partition_keys

datafusion/src/physical_plan/windows.rs

Lines changed: 5 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -412,11 +412,14 @@ impl ExecutionPlan for WindowAggExec {
412412

413413
/// Get the output partitioning of this plan
414414
fn output_partitioning(&self) -> Partitioning {
415-
Partitioning::UnknownPartitioning(1)
415+
// because we can have repartitioning using the partition keys
416+
// this would be either 1 or more than 1 depending on the presense of
417+
// repartitioning
418+
self.input.output_partitioning()
416419
}
417420

418421
fn required_child_distribution(&self) -> Distribution {
419-
Distribution::SinglePartition
422+
Distribution::UnspecifiedDistribution
420423
}
421424

422425
fn with_new_children(
@@ -436,22 +439,7 @@ impl ExecutionPlan for WindowAggExec {
436439
}
437440

438441
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
439-
if 0 != partition {
440-
return Err(DataFusionError::Internal(format!(
441-
"WindowAggExec invalid partition {}",
442-
partition
443-
)));
444-
}
445-
446-
// window needs to operate on a single partition currently
447-
if 1 != self.input.output_partitioning().partition_count() {
448-
return Err(DataFusionError::Internal(
449-
"WindowAggExec requires a single input partition".to_owned(),
450-
));
451-
}
452-
453442
let input = self.input.execute(partition).await?;
454-
455443
let stream = Box::pin(WindowAggStream::new(
456444
self.schema.clone(),
457445
self.window_expr.clone(),

datafusion/src/sql/planner.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -709,7 +709,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
709709
groups.sort_by(|(key_a, _), (key_b, _)| key_a.len().cmp(&key_b.len()));
710710
groups.reverse();
711711
for (_, exprs) in groups {
712-
let window_exprs: Vec<Expr> = exprs.into_iter().cloned().collect();
712+
let window_exprs = exprs.into_iter().cloned().collect();
713+
// the partition and sort itself is done at physical level, see physical_planner's
714+
// fn create_initial_plan
713715
plan = LogicalPlanBuilder::from(&plan)
714716
.window(window_exprs)?
715717
.build()?;

datafusion/src/sql/utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ pub(crate) fn generate_sort_key(
463463
/// keys.
464464
pub(crate) fn window_expr_common_partition_keys(
465465
window_exprs: &[Expr],
466-
) -> Result<Vec<Expr>> {
466+
) -> Result<&[Expr]> {
467467
let all_partition_keys = window_exprs
468468
.iter()
469469
.map(|expr| match expr {
@@ -477,7 +477,7 @@ pub(crate) fn window_expr_common_partition_keys(
477477
let result = all_partition_keys.iter().min_by_key(|s| s.len()).ok_or(
478478
DataFusionError::Execution("No window expressions found".to_owned()),
479479
)?;
480-
Ok(result.to_vec())
480+
Ok(result)
481481
}
482482

483483
/// group a slice of window expression expr by their order by expressions

0 commit comments

Comments
 (0)