Skip to content

Commit bb71637

Browse files
author
Jiayu Liu
committed
allow window aggr to be parallelizable
1 parent e6bda7d commit bb71637

File tree

5 files changed

+26
-64
lines changed

5 files changed

+26
-64
lines changed

datafusion/src/execution/context.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,9 @@ pub struct ExecutionConfig {
640640
/// Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel
641641
/// using the provided `concurrency` level
642642
pub repartition_aggregations: bool,
643+
/// Should DataFusion repartition data using the partition keys to execute window functions in
644+
/// parallel using the provided `concurrency` level
645+
pub repartition_windows: bool,
643646
}
644647

645648
impl Default for ExecutionConfig {
@@ -668,6 +671,7 @@ impl Default for ExecutionConfig {
668671
information_schema: false,
669672
repartition_joins: true,
670673
repartition_aggregations: true,
674+
repartition_windows: true,
671675
}
672676
}
673677
}
@@ -758,11 +762,18 @@ impl ExecutionConfig {
758762
self.repartition_joins = enabled;
759763
self
760764
}
765+
761766
/// Enables or disables the use of repartitioning for aggregations to improve parallelism
762767
pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
763768
self.repartition_aggregations = enabled;
764769
self
765770
}
771+
772+
/// Enables or disables the use of repartitioning for window functions to improve parallelism
773+
pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
774+
self.repartition_windows = enabled;
775+
self
776+
}
766777
}
767778

768779
/// Holds per-execution properties and data (such as starting timestamps, etc).

datafusion/src/logical_plan/builder.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -289,14 +289,6 @@ impl LogicalPlanBuilder {
289289
}
290290

291291
/// Apply a window
292-
///
293-
/// NOTE: this feature is under development and this API will be changing
294-
///
295-
/// - https://github.com/apache/arrow-datafusion/issues/359 basic structure
296-
/// - https://github.com/apache/arrow-datafusion/issues/298 empty over clause
297-
/// - https://github.com/apache/arrow-datafusion/issues/299 with partition clause
298-
/// - https://github.com/apache/arrow-datafusion/issues/360 with order by
299-
/// - https://github.com/apache/arrow-datafusion/issues/361 with window frame
300292
pub fn window(&self, window_expr: Vec<Expr>) -> Result<Self> {
301293
let all_expr = window_expr.iter();
302294
validate_unique_names("Windows", all_expr.clone(), self.plan.schema())?;

datafusion/src/physical_plan/planner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ impl DefaultPhysicalPlanner {
158158

159159
let can_repartition = !partition_keys.is_empty()
160160
&& ctx_state.config.concurrency > 1
161-
&& ctx_state.config.repartition_aggregations;
161+
&& ctx_state.config.repartition_windows;
162162

163163
let input_exec = if can_repartition {
164164
let partition_keys = partition_keys
@@ -221,7 +221,7 @@ impl DefaultPhysicalPlanner {
221221
.map(|e| {
222222
self.create_window_expr(
223223
e,
224-
&logical_input_schema,
224+
logical_input_schema,
225225
&physical_input_schema,
226226
ctx_state,
227227
)

datafusion/src/physical_plan/windows.rs

Lines changed: 5 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -412,11 +412,14 @@ impl ExecutionPlan for WindowAggExec {
412412

413413
/// Get the output partitioning of this plan
414414
fn output_partitioning(&self) -> Partitioning {
415-
Partitioning::UnknownPartitioning(1)
415+
// because we can have repartitioning using the partition keys
416+
// this would be either 1 or more than 1 depending on the presense of
417+
// repartitioning
418+
self.input.output_partitioning()
416419
}
417420

418421
fn required_child_distribution(&self) -> Distribution {
419-
Distribution::SinglePartition
422+
Distribution::UnspecifiedDistribution
420423
}
421424

422425
fn with_new_children(
@@ -436,22 +439,7 @@ impl ExecutionPlan for WindowAggExec {
436439
}
437440

438441
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
439-
if 0 != partition {
440-
return Err(DataFusionError::Internal(format!(
441-
"WindowAggExec invalid partition {}",
442-
partition
443-
)));
444-
}
445-
446-
// window needs to operate on a single partition currently
447-
if 1 != self.input.output_partitioning().partition_count() {
448-
return Err(DataFusionError::Internal(
449-
"WindowAggExec requires a single input partition".to_owned(),
450-
));
451-
}
452-
453442
let input = self.input.execute(partition).await?;
454-
455443
let stream = Box::pin(WindowAggStream::new(
456444
self.schema.clone(),
457445
self.window_expr.clone(),
@@ -591,38 +579,6 @@ mod tests {
591579
Ok((input, schema))
592580
}
593581

594-
#[tokio::test]
595-
async fn window_function_input_partition() -> Result<()> {
596-
let (input, schema) = create_test_schema(4)?;
597-
598-
let window_exec = Arc::new(WindowAggExec::try_new(
599-
vec![create_window_expr(
600-
&WindowFunction::AggregateFunction(AggregateFunction::Count),
601-
"count".to_owned(),
602-
&[col("c3")],
603-
&[],
604-
&[],
605-
Some(WindowFrame::default()),
606-
schema.as_ref(),
607-
)?],
608-
input,
609-
schema.clone(),
610-
)?);
611-
612-
let result = collect(window_exec).await;
613-
614-
assert!(result.is_err());
615-
if let Some(DataFusionError::Internal(msg)) = result.err() {
616-
assert_eq!(
617-
msg,
618-
"WindowAggExec requires a single input partition".to_owned()
619-
);
620-
} else {
621-
unreachable!("Expect an internal error to happen");
622-
}
623-
Ok(())
624-
}
625-
626582
#[tokio::test]
627583
async fn window_function() -> Result<()> {
628584
let (input, schema) = create_test_schema(1)?;

datafusion/src/sql/utils.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,7 @@ pub(crate) fn generate_sort_key(
463463
/// keys.
464464
pub(crate) fn window_expr_common_partition_keys(
465465
window_exprs: &[Expr],
466-
) -> Result<Vec<Expr>> {
466+
) -> Result<&[Expr]> {
467467
let all_partition_keys = window_exprs
468468
.iter()
469469
.map(|expr| match expr {
@@ -474,10 +474,13 @@ pub(crate) fn window_expr_common_partition_keys(
474474
))),
475475
})
476476
.collect::<Result<Vec<_>>>()?;
477-
let result = all_partition_keys.iter().min_by_key(|s| s.len()).ok_or(
478-
DataFusionError::Execution("No window expressions found".to_owned()),
479-
)?;
480-
Ok(result.to_vec())
477+
let result = all_partition_keys
478+
.iter()
479+
.min_by_key(|s| s.len())
480+
.ok_or_else(|| {
481+
DataFusionError::Execution("No window expressions found".to_owned())
482+
})?;
483+
Ok(result)
481484
}
482485

483486
/// group a slice of window expression expr by their order by expressions

0 commit comments

Comments
 (0)