Skip to content

Commit 58f51c7

Browse files
committed
Implement partition_statistics API for InterleaveExec
1 parent 1c86ec7 commit 58f51c7

File tree

2 files changed

+61
-6
lines changed

2 files changed

+61
-6
lines changed

datafusion/core/tests/physical_optimizer/partition_statistics.rs

Lines changed: 60 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ mod test {
5050
use datafusion_physical_plan::projection::ProjectionExec;
5151
use datafusion_physical_plan::repartition::RepartitionExec;
5252
use datafusion_physical_plan::sorts::sort::SortExec;
53-
use datafusion_physical_plan::union::UnionExec;
53+
use datafusion_physical_plan::union::{InterleaveExec, UnionExec};
5454
use datafusion_physical_plan::{
5555
execute_stream_partitioned, get_plan_string, ExecutionPlan,
5656
ExecutionPlanProperties,
@@ -68,6 +68,7 @@ mod test {
6868
/// - Second partition: [1, 2]
6969
/// - Each row is 110 bytes in size
7070
///
71+
/// @param create_table_sql Optional parameter to set the create table SQL
7172
/// @param target_partition Optional parameter to set the target partitions
7273
/// @return ExecutionPlan representing the scan of the table with statistics
7374
async fn create_scan_exec_with_statistics(
@@ -385,6 +386,64 @@ mod test {
385386
Ok(())
386387
}
387388

389+
#[tokio::test]
390+
async fn test_statistics_by_partition_of_interleave() -> Result<()> {
391+
let scan1 = create_scan_exec_with_statistics(None, Some(1)).await;
392+
let scan2 = create_scan_exec_with_statistics(None, Some(1)).await;
393+
394+
// Create same hash partitioning on the 'id' column as InterleaveExec
395+
// requires all children have a consistent hash partitioning
396+
let hash_expr1 = vec![col("id", &scan1.schema())?];
397+
let repartition1 = Arc::new(RepartitionExec::try_new(
398+
scan1,
399+
Partitioning::Hash(hash_expr1, 2),
400+
)?);
401+
let hash_expr2 = vec![col("id", &scan2.schema())?];
402+
let repartition2 = Arc::new(RepartitionExec::try_new(
403+
scan2,
404+
Partitioning::Hash(hash_expr2, 2),
405+
)?);
406+
407+
let interleave: Arc<dyn ExecutionPlan> =
408+
Arc::new(InterleaveExec::try_new(vec![repartition1, repartition2])?);
409+
410+
// Verify the result of partition statistics
411+
let stats = (0..interleave.output_partitioning().partition_count())
412+
.map(|idx| interleave.partition_statistics(Some(idx)))
413+
.collect::<Result<Vec<_>>>()?;
414+
assert_eq!(stats.len(), 2);
415+
416+
let expected_stats = Statistics {
417+
num_rows: Precision::Inexact(4),
418+
total_byte_size: Precision::Inexact(220),
419+
column_statistics: vec![
420+
ColumnStatistics::new_unknown(),
421+
ColumnStatistics::new_unknown(),
422+
],
423+
};
424+
assert_eq!(stats[0], expected_stats);
425+
assert_eq!(stats[1], expected_stats);
426+
427+
// Verify the execution results
428+
let partitions = execute_stream_partitioned(
429+
interleave.clone(),
430+
Arc::new(TaskContext::default()),
431+
)?;
432+
assert_eq!(partitions.len(), 2);
433+
434+
let mut partition_row_counts = Vec::new();
435+
for partition_stream in partitions.into_iter() {
436+
let results: Vec<RecordBatch> = partition_stream.try_collect().await?;
437+
let total_rows: usize = results.iter().map(|batch| batch.num_rows()).sum();
438+
partition_row_counts.push(total_rows);
439+
}
440+
assert_eq!(partition_row_counts.len(), 2);
441+
assert_eq!(partition_row_counts[0], 2);
442+
assert_eq!(partition_row_counts[1], 6);
443+
444+
Ok(())
445+
}
446+
388447
#[tokio::test]
389448
async fn test_statistic_by_partition_of_cross_join() -> Result<()> {
390449
let left_scan = create_scan_exec_with_statistics(None, Some(1)).await;
@@ -551,7 +610,6 @@ mod test {
551610
let _ = plan_string.swap_remove(1);
552611
let expected_plan = vec![
553612
"AggregateExec: mode=Partial, gby=[id@0 as id, 1 + id@0 as expr], aggr=[COUNT(c)]",
554-
//" DataSourceExec: file_groups={2 groups: [[.../datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-01/j5fUeSDQo22oPyPU.parquet, .../datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-02/j5fUeSDQo22oPyPU.parquet], [.../datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-03/j5fUeSDQo22oPyPU.parquet, .../datafusion/core/tests/data/test_statistics_per_partition/date=2025-03-04/j5fUeSDQo22oPyPU.parquet]]}, projection=[id, date], file_type=parquet
555613
];
556614
assert_eq!(plan_string, expected_plan);
557615

datafusion/physical-plan/src/union.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -498,13 +498,10 @@ impl ExecutionPlan for InterleaveExec {
498498
}
499499

500500
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
501-
if partition.is_some() {
502-
return Ok(Statistics::new_unknown(&self.schema()));
503-
}
504501
let stats = self
505502
.inputs
506503
.iter()
507-
.map(|stat| stat.partition_statistics(None))
504+
.map(|stat| stat.partition_statistics(partition))
508505
.collect::<Result<Vec<_>>>()?;
509506

510507
Ok(stats

0 commit comments

Comments
 (0)