Skip to content

Commit 809cd80

Browse files
author
Jiayu Liu
committed
allow window aggr to be parallelizable
1 parent 13daed7 commit 809cd80

File tree

5 files changed

+32
-65
lines changed

5 files changed

+32
-65
lines changed

datafusion/src/execution/context.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,9 @@ pub struct ExecutionConfig {
624624
/// Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel
625625
/// using the provided `concurrency` level
626626
pub repartition_aggregations: bool,
627+
/// Should DataFusion repartition data using the partition keys to execute window functions in
628+
/// parallel using the provided `concurrency` level
629+
pub repartition_windows: bool,
627630
}
628631

629632
impl Default for ExecutionConfig {
@@ -652,6 +655,7 @@ impl Default for ExecutionConfig {
652655
information_schema: false,
653656
repartition_joins: true,
654657
repartition_aggregations: true,
658+
repartition_windows: true,
655659
}
656660
}
657661
}
@@ -742,11 +746,18 @@ impl ExecutionConfig {
742746
self.repartition_joins = enabled;
743747
self
744748
}
749+
745750
/// Enables or disables the use of repartitioning for aggregations to improve parallelism
746751
pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
747752
self.repartition_aggregations = enabled;
748753
self
749754
}
755+
756+
/// Enables or disables the use of repartitioning for window functions to improve parallelism
757+
pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
758+
self.repartition_windows = enabled;
759+
self
760+
}
750761
}
751762

752763
/// Holds per-execution properties and data (such as starting timestamps, etc).

datafusion/src/logical_plan/builder.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -352,14 +352,6 @@ impl LogicalPlanBuilder {
352352
}
353353

354354
/// Apply a window
355-
///
356-
/// NOTE: this feature is under development and this API will be changing
357-
///
358-
/// - https://github.com/apache/arrow-datafusion/issues/359 basic structure
359-
/// - https://github.com/apache/arrow-datafusion/issues/298 empty over clause
360-
/// - https://github.com/apache/arrow-datafusion/issues/299 with partition clause
361-
/// - https://github.com/apache/arrow-datafusion/issues/360 with order by
362-
/// - https://github.com/apache/arrow-datafusion/issues/361 with window frame
363355
pub fn window(&self, window_expr: Vec<Expr>) -> Result<Self> {
364356
let all_expr = window_expr.iter();
365357
validate_unique_names("Windows", all_expr.clone(), self.plan.schema())?;

datafusion/src/physical_plan/planner.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -273,13 +273,18 @@ impl DefaultPhysicalPlanner {
273273

274274
let can_repartition = !partition_keys.is_empty()
275275
&& ctx_state.config.concurrency > 1
276-
&& ctx_state.config.repartition_aggregations;
276+
&& ctx_state.config.repartition_windows;
277277

278278
let input_exec = if can_repartition {
279279
let partition_keys = partition_keys
280280
.iter()
281281
.map(|e| {
282-
self.create_physical_expr(e, &input_exec.schema(), ctx_state)
282+
self.create_physical_expr(
283+
e,
284+
input.schema(),
285+
&input_exec.schema(),
286+
ctx_state,
287+
)
283288
})
284289
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
285290
Arc::new(RepartitionExec::try_new(
@@ -337,7 +342,7 @@ impl DefaultPhysicalPlanner {
337342
.map(|e| {
338343
self.create_window_expr(
339344
e,
340-
&logical_input_schema,
345+
logical_input_schema,
341346
&physical_input_schema,
342347
ctx_state,
343348
)

datafusion/src/physical_plan/windows.rs

Lines changed: 5 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -412,11 +412,14 @@ impl ExecutionPlan for WindowAggExec {
412412

413413
/// Get the output partitioning of this plan
414414
fn output_partitioning(&self) -> Partitioning {
415-
Partitioning::UnknownPartitioning(1)
415+
// because we can have repartitioning using the partition keys
416+
// this would be either 1 or more than 1 depending on the presense of
417+
// repartitioning
418+
self.input.output_partitioning()
416419
}
417420

418421
fn required_child_distribution(&self) -> Distribution {
419-
Distribution::SinglePartition
422+
Distribution::UnspecifiedDistribution
420423
}
421424

422425
fn with_new_children(
@@ -436,22 +439,7 @@ impl ExecutionPlan for WindowAggExec {
436439
}
437440

438441
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
439-
if 0 != partition {
440-
return Err(DataFusionError::Internal(format!(
441-
"WindowAggExec invalid partition {}",
442-
partition
443-
)));
444-
}
445-
446-
// window needs to operate on a single partition currently
447-
if 1 != self.input.output_partitioning().partition_count() {
448-
return Err(DataFusionError::Internal(
449-
"WindowAggExec requires a single input partition".to_owned(),
450-
));
451-
}
452-
453442
let input = self.input.execute(partition).await?;
454-
455443
let stream = Box::pin(WindowAggStream::new(
456444
self.schema.clone(),
457445
self.window_expr.clone(),
@@ -591,38 +579,6 @@ mod tests {
591579
Ok((input, schema))
592580
}
593581

594-
#[tokio::test]
595-
async fn window_function_input_partition() -> Result<()> {
596-
let (input, schema) = create_test_schema(4)?;
597-
598-
let window_exec = Arc::new(WindowAggExec::try_new(
599-
vec![create_window_expr(
600-
&WindowFunction::AggregateFunction(AggregateFunction::Count),
601-
"count".to_owned(),
602-
&[col("c3", &schema)?],
603-
&[],
604-
&[],
605-
Some(WindowFrame::default()),
606-
schema.as_ref(),
607-
)?],
608-
input,
609-
schema.clone(),
610-
)?);
611-
612-
let result = collect(window_exec).await;
613-
614-
assert!(result.is_err());
615-
if let Some(DataFusionError::Internal(msg)) = result.err() {
616-
assert_eq!(
617-
msg,
618-
"WindowAggExec requires a single input partition".to_owned()
619-
);
620-
} else {
621-
unreachable!("Expect an internal error to happen");
622-
}
623-
Ok(())
624-
}
625-
626582
#[tokio::test]
627583
async fn window_function() -> Result<()> {
628584
let (input, schema) = create_test_schema(1)?;

datafusion/src/sql/utils.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ pub(crate) fn generate_sort_key(
463463
/// keys.
464464
pub(crate) fn window_expr_common_partition_keys(
465465
window_exprs: &[Expr],
466-
) -> Result<Vec<Expr>> {
466+
) -> Result<&[Expr]> {
467467
let all_partition_keys = window_exprs
468468
.iter()
469469
.map(|expr| match expr {
@@ -474,10 +474,13 @@ pub(crate) fn window_expr_common_partition_keys(
474474
))),
475475
})
476476
.collect::<Result<Vec<_>>>()?;
477-
let result = all_partition_keys.iter().min_by_key(|s| s.len()).ok_or(
478-
DataFusionError::Execution("No window expressions found".to_owned()),
479-
)?;
480-
Ok(result.to_vec())
477+
let result = all_partition_keys
478+
.iter()
479+
.min_by_key(|s| s.len())
480+
.ok_or_else(|| {
481+
DataFusionError::Execution("No window expressions found".to_owned())
482+
})?;
483+
Ok(result)
481484
}
482485

483486
/// group a slice of window expression expr by their order by expressions

0 commit comments

Comments
 (0)