Skip to content

Commit 7d3c7d8

Browse files
authored
implement AggregateExec.partition_statistics (#15954)
* implement `AggregateExec.partition_statistics` * address comment * AggregateMode::Final -> AggregateMode::FinalPartitioned * add testcase for group by key is empty * refine testcase * address comment
1 parent a92c5e9 commit 7d3c7d8

File tree

3 files changed

+307
-41
lines changed

3 files changed

+307
-41
lines changed

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 268 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,26 @@ mod test {
2828
use datafusion_execution::config::SessionConfig;
2929
use datafusion_execution::TaskContext;
3030
use datafusion_expr_common::operator::Operator;
31-
use datafusion_physical_expr::expressions::{binary, lit, Column};
31+
use datafusion_functions_aggregate::count::count_udaf;
32+
use datafusion_physical_expr::aggregate::AggregateExprBuilder;
33+
use datafusion_physical_expr::expressions::{binary, col, lit, Column};
3234
use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
3335
use datafusion_physical_expr_common::sort_expr::{LexOrdering, PhysicalSortExpr};
36+
use datafusion_physical_plan::aggregates::{
37+
AggregateExec, AggregateMode, PhysicalGroupBy,
38+
};
3439
use datafusion_physical_plan::coalesce_batches::CoalesceBatchesExec;
3540
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
41+
use datafusion_physical_plan::empty::EmptyExec;
3642
use datafusion_physical_plan::filter::FilterExec;
3743
use datafusion_physical_plan::joins::CrossJoinExec;
3844
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
3945
use datafusion_physical_plan::projection::ProjectionExec;
4046
use datafusion_physical_plan::sorts::sort::SortExec;
4147
use datafusion_physical_plan::union::UnionExec;
4248
use datafusion_physical_plan::{
43-
execute_stream_partitioned, ExecutionPlan, ExecutionPlanProperties,
49+
execute_stream_partitioned, get_plan_string, ExecutionPlan,
50+
ExecutionPlanProperties,
4451
};
4552
use futures::TryStreamExt;
4653
use std::sync::Arc;
@@ -130,10 +137,16 @@ mod test {
130137
}
131138
}
132139

140+
#[derive(PartialEq, Eq, Debug)]
141+
enum ExpectedStatistics {
142+
Empty, // row_count == 0
143+
NonEmpty(i32, i32, usize), // (min_id, max_id, row_count)
144+
}
145+
133146
/// Helper function to validate that statistics from statistics_by_partition match the actual data
134147
async fn validate_statistics_with_data(
135148
plan: Arc<dyn ExecutionPlan>,
136-
expected_stats: Vec<(i32, i32, usize)>, // (min_id, max_id, row_count)
149+
expected_stats: Vec<ExpectedStatistics>,
137150
id_column_index: usize,
138151
) -> Result<()> {
139152
let ctx = TaskContext::default();
@@ -163,8 +176,11 @@ mod test {
163176
}
164177
}
165178

166-
if row_count > 0 {
167-
actual_stats.push((min_id, max_id, row_count));
179+
if row_count == 0 {
180+
actual_stats.push(ExpectedStatistics::Empty);
181+
} else {
182+
actual_stats
183+
.push(ExpectedStatistics::NonEmpty(min_id, max_id, row_count));
168184
}
169185
}
170186

@@ -201,8 +217,8 @@ mod test {
201217

202218
// Check the statistics_by_partition with real results
203219
let expected_stats = vec![
204-
(3, 4, 2), // (min_id, max_id, row_count) for first partition
205-
(1, 2, 2), // (min_id, max_id, row_count) for second partition
220+
ExpectedStatistics::NonEmpty(3, 4, 2), // (min_id, max_id, row_count) for first partition
221+
ExpectedStatistics::NonEmpty(1, 2, 2), // (min_id, max_id, row_count) for second partition
206222
];
207223
validate_statistics_with_data(scan, expected_stats, 0).await?;
208224

@@ -230,7 +246,10 @@ mod test {
230246
assert_eq!(statistics[1], expected_statistic_partition_2);
231247

232248
// Check the statistics_by_partition with real results
233-
let expected_stats = vec![(3, 4, 2), (1, 2, 2)];
249+
let expected_stats = vec![
250+
ExpectedStatistics::NonEmpty(3, 4, 2),
251+
ExpectedStatistics::NonEmpty(1, 2, 2),
252+
];
234253
validate_statistics_with_data(projection, expected_stats, 0).await?;
235254
Ok(())
236255
}
@@ -258,7 +277,7 @@ mod test {
258277
assert_eq!(statistics.len(), 1);
259278
assert_eq!(statistics[0], expected_statistic_partition);
260279
// Check the statistics_by_partition with real results
261-
let expected_stats = vec![(1, 4, 4)];
280+
let expected_stats = vec![ExpectedStatistics::NonEmpty(1, 4, 4)];
262281
validate_statistics_with_data(sort_exec.clone(), expected_stats, 0).await?;
263282

264283
// Sort with preserve_partitioning
@@ -289,7 +308,10 @@ mod test {
289308
assert_eq!(statistics[1], expected_statistic_partition_2);
290309

291310
// Check the statistics_by_partition with real results
292-
let expected_stats = vec![(3, 4, 2), (1, 2, 2)];
311+
let expected_stats = vec![
312+
ExpectedStatistics::NonEmpty(3, 4, 2),
313+
ExpectedStatistics::NonEmpty(1, 2, 2),
314+
];
293315
validate_statistics_with_data(sort_exec, expected_stats, 0).await?;
294316
Ok(())
295317
}
@@ -362,7 +384,12 @@ mod test {
362384
assert_eq!(statistics[3], expected_statistic_partition_2);
363385

364386
// Check the statistics_by_partition with real results
365-
let expected_stats = vec![(3, 4, 2), (1, 2, 2), (3, 4, 2), (1, 2, 2)];
387+
let expected_stats = vec![
388+
ExpectedStatistics::NonEmpty(3, 4, 2),
389+
ExpectedStatistics::NonEmpty(1, 2, 2),
390+
ExpectedStatistics::NonEmpty(3, 4, 2),
391+
ExpectedStatistics::NonEmpty(1, 2, 2),
392+
];
366393
validate_statistics_with_data(union_exec, expected_stats, 0).await?;
367394
Ok(())
368395
}
@@ -408,7 +435,10 @@ mod test {
408435
assert_eq!(statistics[1], expected_statistic_partition_2);
409436

410437
// Check the statistics_by_partition with real results
411-
let expected_stats = vec![(1, 4, 8), (1, 4, 8)];
438+
let expected_stats = vec![
439+
ExpectedStatistics::NonEmpty(1, 4, 8),
440+
ExpectedStatistics::NonEmpty(1, 4, 8),
441+
];
412442
validate_statistics_with_data(cross_join, expected_stats, 0).await?;
413443
Ok(())
414444
}
@@ -431,7 +461,10 @@ mod test {
431461
assert_eq!(statistics[1], expected_statistic_partition_2);
432462

433463
// Check the statistics_by_partition with real results
434-
let expected_stats = vec![(3, 4, 2), (1, 2, 2)];
464+
let expected_stats = vec![
465+
ExpectedStatistics::NonEmpty(3, 4, 2),
466+
ExpectedStatistics::NonEmpty(1, 2, 2),
467+
];
435468
validate_statistics_with_data(coalesce_batches, expected_stats, 0).await?;
436469
Ok(())
437470
}
@@ -450,7 +483,7 @@ mod test {
450483
assert_eq!(statistics[0], expected_statistic_partition);
451484

452485
// Check the statistics_by_partition with real results
453-
let expected_stats = vec![(1, 4, 4)];
486+
let expected_stats = vec![ExpectedStatistics::NonEmpty(1, 4, 4)];
454487
validate_statistics_with_data(coalesce_partitions, expected_stats, 0).await?;
455488
Ok(())
456489
}
@@ -487,4 +520,225 @@ mod test {
487520
assert_eq!(statistics[0], expected_statistic_partition);
488521
Ok(())
489522
}
523+
524+
#[tokio::test]
525+
async fn test_statistic_by_partition_of_agg() -> Result<()> {
526+
let scan = create_scan_exec_with_statistics(None, Some(2)).await;
527+
528+
let scan_schema = scan.schema();
529+
530+
// select id, 1+id, count(*) from t group by id, 1+id
531+
let group_by = PhysicalGroupBy::new_single(vec![
532+
(col("id", &scan_schema)?, "id".to_string()),
533+
(
534+
binary(
535+
lit(1),
536+
Operator::Plus,
537+
col("id", &scan_schema)?,
538+
&scan_schema,
539+
)?,
540+
"expr".to_string(),
541+
),
542+
]);
543+
544+
let aggr_expr = vec![AggregateExprBuilder::new(count_udaf(), vec![lit(1)])
545+
.schema(Arc::clone(&scan_schema))
546+
.alias(String::from("COUNT(c)"))
547+
.build()
548+
.map(Arc::new)?];
549+
550+
let aggregate_exec_partial = Arc::new(AggregateExec::try_new(
551+
AggregateMode::Partial,
552+
group_by.clone(),
553+
aggr_expr.clone(),
554+
vec![None],
555+
Arc::clone(&scan),
556+
scan_schema.clone(),
557+
)?) as _;
558+
559+
let mut plan_string = get_plan_string(&aggregate_exec_partial);
560+
let _ = plan_string.swap_remove(1);
561+
let expected_plan = vec![
562+
"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)]",
563+
//" DataSourceExec: file_groups={2 groups: [[.../datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, .../datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [.../datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, .../datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id, date], file_type=parquet
564+
];
565+
assert_eq!(plan_string, expected_plan);
566+
567+
let p0_statistics = aggregate_exec_partial.partition_statistics(Some(0))?;
568+
569+
let expected_p0_statistics = Statistics {
570+
num_rows: Precision::Inexact(2),
571+
total_byte_size: Precision::Absent,
572+
column_statistics: vec![
573+
ColumnStatistics {
574+
null_count: Precision::Absent,
575+
max_value: Precision::Exact(ScalarValue::Int32(Some(4))),
576+
min_value: Precision::Exact(ScalarValue::Int32(Some(3))),
577+
sum_value: Precision::Absent,
578+
distinct_count: Precision::Absent,
579+
},
580+
ColumnStatistics::new_unknown(),
581+
ColumnStatistics::new_unknown(),
582+
],
583+
};
584+
585+
assert_eq!(&p0_statistics, &expected_p0_statistics);
586+
587+
let expected_p1_statistics = Statistics {
588+
num_rows: Precision::Inexact(2),
589+
total_byte_size: Precision::Absent,
590+
column_statistics: vec![
591+
ColumnStatistics {
592+
null_count: Precision::Absent,
593+
max_value: Precision::Exact(ScalarValue::Int32(Some(2))),
594+
min_value: Precision::Exact(ScalarValue::Int32(Some(1))),
595+
sum_value: Precision::Absent,
596+
distinct_count: Precision::Absent,
597+
},
598+
ColumnStatistics::new_unknown(),
599+
ColumnStatistics::new_unknown(),
600+
],
601+
};
602+
603+
let p1_statistics = aggregate_exec_partial.partition_statistics(Some(1))?;
604+
assert_eq!(&p1_statistics, &expected_p1_statistics);
605+
606+
validate_statistics_with_data(
607+
aggregate_exec_partial.clone(),
608+
vec![
609+
ExpectedStatistics::NonEmpty(3, 4, 2),
610+
ExpectedStatistics::NonEmpty(1, 2, 2),
611+
],
612+
0,
613+
)
614+
.await?;
615+
616+
let agg_final = Arc::new(AggregateExec::try_new(
617+
AggregateMode::FinalPartitioned,
618+
group_by.clone(),
619+
aggr_expr.clone(),
620+
vec![None],
621+
aggregate_exec_partial.clone(),
622+
aggregate_exec_partial.schema(),
623+
)?);
624+
625+
let p0_statistics = agg_final.partition_statistics(Some(0))?;
626+
assert_eq!(&p0_statistics, &expected_p0_statistics);
627+
628+
let p1_statistics = agg_final.partition_statistics(Some(1))?;
629+
assert_eq!(&p1_statistics, &expected_p1_statistics);
630+
631+
validate_statistics_with_data(
632+
agg_final.clone(),
633+
vec![
634+
ExpectedStatistics::NonEmpty(3, 4, 2),
635+
ExpectedStatistics::NonEmpty(1, 2, 2),
636+
],
637+
0,
638+
)
639+
.await?;
640+
641+
// select id, 1+id, count(*) from empty_table group by id, 1+id
642+
let empty_table =
643+
Arc::new(EmptyExec::new(scan_schema.clone()).with_partitions(2));
644+
645+
let agg_partial = Arc::new(AggregateExec::try_new(
646+
AggregateMode::Partial,
647+
group_by.clone(),
648+
aggr_expr.clone(),
649+
vec![None],
650+
empty_table.clone(),
651+
scan_schema.clone(),
652+
)?) as _;
653+
654+
let agg_plan = get_plan_string(&agg_partial).remove(0);
655+
assert_eq!("AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)]",agg_plan);
656+
657+
let empty_stat = Statistics {
658+
num_rows: Precision::Exact(0),
659+
total_byte_size: Precision::Absent,
660+
column_statistics: vec![
661+
ColumnStatistics::new_unknown(),
662+
ColumnStatistics::new_unknown(),
663+
ColumnStatistics::new_unknown(),
664+
],
665+
};
666+
667+
assert_eq!(&empty_stat, &agg_partial.partition_statistics(Some(0))?);
668+
assert_eq!(&empty_stat, &agg_partial.partition_statistics(Some(1))?);
669+
validate_statistics_with_data(
670+
agg_partial.clone(),
671+
vec![ExpectedStatistics::Empty, ExpectedStatistics::Empty],
672+
0,
673+
)
674+
.await?;
675+
676+
let agg_partial = Arc::new(AggregateExec::try_new(
677+
AggregateMode::Partial,
678+
group_by.clone(),
679+
aggr_expr.clone(),
680+
vec![None],
681+
empty_table.clone(),
682+
scan_schema.clone(),
683+
)?);
684+
685+
let agg_final = Arc::new(AggregateExec::try_new(
686+
AggregateMode::FinalPartitioned,
687+
group_by.clone(),
688+
aggr_expr.clone(),
689+
vec![None],
690+
agg_partial.clone(),
691+
agg_partial.schema(),
692+
)?);
693+
694+
assert_eq!(&empty_stat, &agg_final.partition_statistics(Some(0))?);
695+
assert_eq!(&empty_stat, &agg_final.partition_statistics(Some(1))?);
696+
697+
validate_statistics_with_data(
698+
agg_final,
699+
vec![ExpectedStatistics::Empty, ExpectedStatistics::Empty],
700+
0,
701+
)
702+
.await?;
703+
704+
// select count(*) from empty_table
705+
let agg_partial = Arc::new(AggregateExec::try_new(
706+
AggregateMode::Partial,
707+
PhysicalGroupBy::default(),
708+
aggr_expr.clone(),
709+
vec![None],
710+
empty_table.clone(),
711+
scan_schema.clone(),
712+
)?);
713+
714+
let coalesce = Arc::new(CoalescePartitionsExec::new(agg_partial.clone()));
715+
716+
let agg_final = Arc::new(AggregateExec::try_new(
717+
AggregateMode::Final,
718+
PhysicalGroupBy::default(),
719+
aggr_expr.clone(),
720+
vec![None],
721+
coalesce.clone(),
722+
coalesce.schema(),
723+
)?);
724+
725+
let expect_stat = Statistics {
726+
num_rows: Precision::Exact(1),
727+
total_byte_size: Precision::Absent,
728+
column_statistics: vec![ColumnStatistics::new_unknown()],
729+
};
730+
731+
assert_eq!(&expect_stat, &agg_final.partition_statistics(Some(0))?);
732+
733+
// Verify that the aggregate final result has exactly one partition with one row
734+
let mut partitions = execute_stream_partitioned(
735+
agg_final.clone(),
736+
Arc::new(TaskContext::default()),
737+
)?;
738+
assert_eq!(1, partitions.len());
739+
let result: Vec<RecordBatch> = partitions.remove(0).try_collect().await?;
740+
assert_eq!(1, result[0].num_rows());
741+
742+
Ok(())
743+
}
490744
}

0 commit comments

Comments
 (0)