Skip to content

Commit 2e94064

Browse files
committed
Push partition_statistics into DataSource (apache#18233)
Removes a downcast match in favor of use of the trait. This mirrors the changes to DataSourceExec to use partition_statistics instead of statistics from apache#15852
1 parent 3c44c15 commit 2e94064

File tree

3 files changed

+159
-25
lines changed

3 files changed

+159
-25
lines changed

datafusion/datasource/src/file_scan_config.rs

Lines changed: 121 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -611,8 +611,39 @@ impl DataSource for FileScanConfig {
611611
SchedulingType::Cooperative
612612
}
613613

614-
fn statistics(&self) -> Result<Statistics> {
615-
Ok(self.projected_stats())
614+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
615+
if let Some(partition) = partition {
616+
// Get statistics for a specific partition
617+
if let Some(file_group) = self.file_groups.get(partition) {
618+
if let Some(stat) = file_group.file_statistics(None) {
619+
// Project the statistics based on the projection
620+
let table_cols_stats = self
621+
.projection_indices()
622+
.into_iter()
623+
.map(|idx| {
624+
if idx < self.file_schema().fields().len() {
625+
stat.column_statistics[idx].clone()
626+
} else {
627+
// TODO provide accurate stat for partition column
628+
// See https://github.com/apache/datafusion/issues/1186
629+
ColumnStatistics::new_unknown()
630+
}
631+
})
632+
.collect();
633+
634+
return Ok(Statistics {
635+
num_rows: stat.num_rows,
636+
total_byte_size: stat.total_byte_size,
637+
column_statistics: table_cols_stats,
638+
});
639+
}
640+
}
641+
// If no statistics available for this partition, return unknown
642+
Ok(Statistics::new_unknown(&self.projected_schema()))
643+
} else {
644+
// Return aggregate statistics across all partitions
645+
Ok(self.projected_stats())
646+
}
616647
}
617648

618649
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
@@ -1637,7 +1668,7 @@ mod tests {
16371668
);
16381669

16391670
let source_statistics = conf.file_source.statistics().unwrap();
1640-
let conf_stats = conf.statistics().unwrap();
1671+
let conf_stats = conf.partition_statistics(None).unwrap();
16411672

16421673
// projection should be reflected in the file source statistics
16431674
assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
@@ -2544,4 +2575,91 @@ mod tests {
25442575

25452576
Ok(())
25462577
}
2578+
2579+
#[test]
2580+
fn test_partition_statistics_projection() {
2581+
// This test verifies that partition_statistics applies projection correctly.
2582+
// The old implementation had a bug where it returned file group statistics
2583+
// without applying the projection, returning all column statistics instead
2584+
// of just the projected ones.
2585+
2586+
use crate::source::DataSourceExec;
2587+
use datafusion_physical_plan::ExecutionPlan;
2588+
2589+
// Create a schema with 4 columns
2590+
let schema = Arc::new(Schema::new(vec![
2591+
Field::new("col0", DataType::Int32, false),
2592+
Field::new("col1", DataType::Int32, false),
2593+
Field::new("col2", DataType::Int32, false),
2594+
Field::new("col3", DataType::Int32, false),
2595+
]));
2596+
2597+
// Create statistics for all 4 columns
2598+
let file_group_stats = Statistics {
2599+
num_rows: Precision::Exact(100),
2600+
total_byte_size: Precision::Exact(1024),
2601+
column_statistics: vec![
2602+
ColumnStatistics {
2603+
null_count: Precision::Exact(0),
2604+
..ColumnStatistics::new_unknown()
2605+
},
2606+
ColumnStatistics {
2607+
null_count: Precision::Exact(5),
2608+
..ColumnStatistics::new_unknown()
2609+
},
2610+
ColumnStatistics {
2611+
null_count: Precision::Exact(10),
2612+
..ColumnStatistics::new_unknown()
2613+
},
2614+
ColumnStatistics {
2615+
null_count: Precision::Exact(15),
2616+
..ColumnStatistics::new_unknown()
2617+
},
2618+
],
2619+
};
2620+
2621+
// Create a file group with statistics
2622+
let file_group = FileGroup::new(vec![PartitionedFile::new("test.parquet", 1024)])
2623+
.with_statistics(Arc::new(file_group_stats));
2624+
2625+
// Create a FileScanConfig with projection: only keep columns 0 and 2
2626+
let config = FileScanConfigBuilder::new(
2627+
ObjectStoreUrl::parse("test:///").unwrap(),
2628+
Arc::clone(&schema),
2629+
Arc::new(MockSource::default()),
2630+
)
2631+
.with_projection(Some(vec![0, 2])) // Only project columns 0 and 2
2632+
.with_file_groups(vec![file_group])
2633+
.build();
2634+
2635+
// Create a DataSourceExec from the config
2636+
let exec = DataSourceExec::from_data_source(config);
2637+
2638+
// Get statistics for partition 0
2639+
let partition_stats = exec.partition_statistics(Some(0)).unwrap();
2640+
2641+
// Verify that only 2 columns are in the statistics (the projected ones)
2642+
assert_eq!(
2643+
partition_stats.column_statistics.len(),
2644+
2,
2645+
"Expected 2 column statistics (projected), but got {}",
2646+
partition_stats.column_statistics.len()
2647+
);
2648+
2649+
// Verify the column statistics are for columns 0 and 2
2650+
assert_eq!(
2651+
partition_stats.column_statistics[0].null_count,
2652+
Precision::Exact(0),
2653+
"First projected column should be col0 with 0 nulls"
2654+
);
2655+
assert_eq!(
2656+
partition_stats.column_statistics[1].null_count,
2657+
Precision::Exact(10),
2658+
"Second projected column should be col2 with 10 nulls"
2659+
);
2660+
2661+
// Verify row count and byte size are preserved
2662+
assert_eq!(partition_stats.num_rows, Precision::Exact(100));
2663+
assert_eq!(partition_stats.total_byte_size, Precision::Exact(1024));
2664+
}
25472665
}

datafusion/datasource/src/memory.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::collections::BinaryHeap;
2121
use std::fmt;
2222
use std::fmt::Debug;
2323
use std::ops::Deref;
24+
use std::slice::from_ref;
2425
use std::sync::Arc;
2526

2627
use crate::sink::DataSink;
@@ -195,12 +196,27 @@ impl DataSource for MemorySourceConfig {
195196
SchedulingType::Cooperative
196197
}
197198

198-
fn statistics(&self) -> Result<Statistics> {
199-
Ok(common::compute_record_batch_statistics(
200-
&self.partitions,
201-
&self.schema,
202-
self.projection.clone(),
203-
))
199+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
200+
if let Some(partition) = partition {
201+
// Compute statistics for a specific partition
202+
if let Some(batches) = self.partitions.get(partition) {
203+
Ok(common::compute_record_batch_statistics(
204+
from_ref(batches),
205+
&self.schema,
206+
self.projection.clone(),
207+
))
208+
} else {
209+
// Invalid partition index
210+
Ok(Statistics::new_unknown(&self.projected_schema))
211+
}
212+
} else {
213+
// Compute statistics across all partitions
214+
Ok(common::compute_record_batch_statistics(
215+
&self.partitions,
216+
&self.schema,
217+
self.projection.clone(),
218+
))
219+
}
204220
}
205221

206222
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {

datafusion/datasource/src/source.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,21 @@ pub trait DataSource: Send + Sync + Debug {
151151
fn scheduling_type(&self) -> SchedulingType {
152152
SchedulingType::NonCooperative
153153
}
154-
fn statistics(&self) -> Result<Statistics>;
154+
155+
/// Returns statistics for a specific partition, or aggregate statistics
156+
/// across all partitions if `partition` is `None`.
157+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>;
158+
159+
/// Returns aggregate statistics across all partitions.
160+
///
161+
/// # Deprecated
162+
/// Use [`Self::partition_statistics`] instead, which provides more fine-grained
163+
/// control over statistics retrieval (per-partition or aggregate).
164+
#[deprecated(since = "51.0.0", note = "Use partition_statistics instead")]
165+
fn statistics(&self) -> Result<Statistics> {
166+
self.partition_statistics(None)
167+
}
168+
155169
/// Return a copy of this DataSource with a new fetch limit
156170
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
157171
fn fetch(&self) -> Option<usize>;
@@ -285,21 +299,7 @@ impl ExecutionPlan for DataSourceExec {
285299
}
286300

287301
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
288-
if let Some(partition) = partition {
289-
let mut statistics = Statistics::new_unknown(&self.schema());
290-
if let Some(file_config) =
291-
self.data_source.as_any().downcast_ref::<FileScanConfig>()
292-
{
293-
if let Some(file_group) = file_config.file_groups.get(partition) {
294-
if let Some(stat) = file_group.file_statistics(None) {
295-
statistics = stat.clone();
296-
}
297-
}
298-
}
299-
Ok(statistics)
300-
} else {
301-
Ok(self.data_source.statistics()?)
302-
}
302+
self.data_source.partition_statistics(partition)
303303
}
304304

305305
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {

0 commit comments

Comments
 (0)