Skip to content

Commit 9f18613

Browse files
Jiayu Liujimexist
authored andcommitted
implement window functions with partition by
1 parent aead7f8 commit 9f18613

File tree

4 files changed

+78
-53
lines changed

4 files changed

+78
-53
lines changed

datafusion/src/execution/context.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,9 @@ pub struct ExecutionConfig {
624624
/// Should DataFusion repartition data using the aggregate keys to execute aggregates in parallel
625625
/// using the provided `concurrency` level
626626
pub repartition_aggregations: bool,
627+
/// Should DataFusion repartition data using the partition keys to execute window functions in
628+
/// parallel using the provided `concurrency` level
629+
pub repartition_windows: bool,
627630
}
628631

629632
impl Default for ExecutionConfig {
@@ -652,6 +655,7 @@ impl Default for ExecutionConfig {
652655
information_schema: false,
653656
repartition_joins: true,
654657
repartition_aggregations: true,
658+
repartition_windows: true,
655659
}
656660
}
657661
}
@@ -742,11 +746,18 @@ impl ExecutionConfig {
742746
self.repartition_joins = enabled;
743747
self
744748
}
749+
745750
/// Enables or disables the use of repartitioning for aggregations to improve parallelism
746751
pub fn with_repartition_aggregations(mut self, enabled: bool) -> Self {
747752
self.repartition_aggregations = enabled;
748753
self
749754
}
755+
756+
/// Enables or disables the use of repartitioning for window functions to improve parallelism
757+
pub fn with_repartition_windows(mut self, enabled: bool) -> Self {
758+
self.repartition_windows = enabled;
759+
self
760+
}
750761
}
751762

752763
/// Holds per-execution properties and data (such as starting timestamps, etc).

datafusion/src/physical_plan/planner.rs

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ use crate::physical_plan::{
4444
};
4545
use crate::prelude::JoinType;
4646
use crate::scalar::ScalarValue;
47-
use crate::sql::utils::generate_sort_key;
47+
use crate::sql::utils::{generate_sort_key, window_expr_common_partition_keys};
4848
use crate::variable::VarType;
4949
use crate::{
5050
error::{DataFusionError, Result},
@@ -264,6 +264,38 @@ impl DefaultPhysicalPlanner {
264264
"Impossibly got empty window expression".to_owned(),
265265
));
266266
}
267+
268+
let input_exec = self.create_initial_plan(input, ctx_state)?;
269+
270+
// at this moment we are guaranteed by the logical planner
271+
// to have all the window_expr to have equal sort key
272+
let partition_keys = window_expr_common_partition_keys(window_expr)?;
273+
274+
let can_repartition = !partition_keys.is_empty()
275+
&& ctx_state.config.concurrency > 1
276+
&& ctx_state.config.repartition_windows;
277+
278+
let input_exec = if can_repartition {
279+
let partition_keys = partition_keys
280+
.iter()
281+
.map(|e| {
282+
self.create_physical_expr(
283+
e,
284+
input.schema(),
285+
&input_exec.schema(),
286+
ctx_state,
287+
)
288+
})
289+
.collect::<Result<Vec<Arc<dyn PhysicalExpr>>>>()?;
290+
Arc::new(RepartitionExec::try_new(
291+
input_exec,
292+
Partitioning::Hash(partition_keys, ctx_state.config.concurrency),
293+
)?)
294+
} else {
295+
input_exec
296+
};
297+
298+
// add a sort phase
267299
let get_sort_keys = |expr: &Expr| match expr {
268300
Expr::WindowFunction {
269301
ref partition_by,
@@ -272,7 +304,6 @@ impl DefaultPhysicalPlanner {
272304
} => generate_sort_key(partition_by, order_by),
273305
_ => unreachable!(),
274306
};
275-
276307
let sort_keys = get_sort_keys(&window_expr[0]);
277308
if window_expr.len() > 1 {
278309
debug_assert!(
@@ -283,7 +314,6 @@ impl DefaultPhysicalPlanner {
283314
);
284315
}
285316

286-
let input_exec = self.create_initial_plan(input, ctx_state)?;
287317
let logical_input_schema = input.schema();
288318

289319
let input_exec = if sort_keys.is_empty() {
@@ -310,7 +340,11 @@ impl DefaultPhysicalPlanner {
310340
_ => unreachable!(),
311341
})
312342
.collect::<Result<Vec<_>>>()?;
313-
Arc::new(SortExec::try_new(sort_keys, input_exec)?)
343+
Arc::new(if can_repartition {
344+
SortExec::new_with_partitioning(sort_keys, input_exec, true)
345+
} else {
346+
SortExec::try_new(sort_keys, input_exec)?
347+
})
314348
};
315349

316350
let physical_input_schema = input_exec.schema();

datafusion/src/physical_plan/windows.rs

Lines changed: 5 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -407,11 +407,14 @@ impl ExecutionPlan for WindowAggExec {
407407

408408
/// Get the output partitioning of this plan
409409
fn output_partitioning(&self) -> Partitioning {
410-
Partitioning::UnknownPartitioning(1)
410+
// because we can have repartitioning using the partition keys
411+
// this would be either 1 or more than 1 depending on the presense of
412+
// repartitioning
413+
self.input.output_partitioning()
411414
}
412415

413416
fn required_child_distribution(&self) -> Distribution {
414-
Distribution::SinglePartition
417+
Distribution::UnspecifiedDistribution
415418
}
416419

417420
fn with_new_children(
@@ -431,22 +434,7 @@ impl ExecutionPlan for WindowAggExec {
431434
}
432435

433436
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream> {
434-
if 0 != partition {
435-
return Err(DataFusionError::Internal(format!(
436-
"WindowAggExec invalid partition {}",
437-
partition
438-
)));
439-
}
440-
441-
// window needs to operate on a single partition currently
442-
if 1 != self.input.output_partitioning().partition_count() {
443-
return Err(DataFusionError::Internal(
444-
"WindowAggExec requires a single input partition".to_owned(),
445-
));
446-
}
447-
448437
let input = self.input.execute(partition).await?;
449-
450438
let stream = Box::pin(WindowAggStream::new(
451439
self.schema.clone(),
452440
self.window_expr.clone(),
@@ -583,38 +571,6 @@ mod tests {
583571
Ok((input, schema))
584572
}
585573

586-
#[tokio::test]
587-
async fn window_function_input_partition() -> Result<()> {
588-
let (input, schema) = create_test_schema(4)?;
589-
590-
let window_exec = Arc::new(WindowAggExec::try_new(
591-
vec![create_window_expr(
592-
&WindowFunction::AggregateFunction(AggregateFunction::Count),
593-
"count".to_owned(),
594-
&[col("c3", &schema)?],
595-
&[],
596-
&[],
597-
Some(WindowFrame::default()),
598-
schema.as_ref(),
599-
)?],
600-
input,
601-
schema.clone(),
602-
)?);
603-
604-
let result = collect(window_exec).await;
605-
606-
assert!(result.is_err());
607-
if let Some(DataFusionError::Internal(msg)) = result.err() {
608-
assert_eq!(
609-
msg,
610-
"WindowAggExec requires a single input partition".to_owned()
611-
);
612-
} else {
613-
unreachable!("Expect an internal error to happen");
614-
}
615-
Ok(())
616-
}
617-
618574
#[tokio::test]
619575
async fn window_function() -> Result<()> {
620576
let (input, schema) = create_test_schema(1)?;

datafusion/src/sql/utils.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,30 @@ pub(crate) fn generate_sort_key(
462462
sort_key
463463
}
464464

465+
/// given a slice of window expressions sharing the same sort key, find their common partition
466+
/// keys.
467+
pub(crate) fn window_expr_common_partition_keys(
468+
window_exprs: &[Expr],
469+
) -> Result<&[Expr]> {
470+
let all_partition_keys = window_exprs
471+
.iter()
472+
.map(|expr| match expr {
473+
Expr::WindowFunction { partition_by, .. } => Ok(partition_by),
474+
expr => Err(DataFusionError::Execution(format!(
475+
"Impossibly got non-window expr {:?}",
476+
expr
477+
))),
478+
})
479+
.collect::<Result<Vec<_>>>()?;
480+
let result = all_partition_keys
481+
.iter()
482+
.min_by_key(|s| s.len())
483+
.ok_or_else(|| {
484+
DataFusionError::Execution("No window expressions found".to_owned())
485+
})?;
486+
Ok(result)
487+
}
488+
465489
/// group a slice of window expression expr by their order by expressions
466490
pub(crate) fn group_window_expr_by_sort_keys(
467491
window_expr: &[Expr],

0 commit comments

Comments
 (0)