Skip to content

Commit 073b640

Browse files
adriangbtobixdev
authored andcommitted
Push partition_statistics into DataSource (apache#18233)
Removes a downcast match in favor of use of the trait. This mirrors the changes to DataSourceExec to use partition_statistics instead of statistics from apache#15852
1 parent cadf157 commit 073b640

File tree

3 files changed

+159
-25
lines changed

3 files changed

+159
-25
lines changed

datafusion/datasource/src/file_scan_config.rs

Lines changed: 121 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -598,8 +598,39 @@ impl DataSource for FileScanConfig {
598598
SchedulingType::Cooperative
599599
}
600600

601-
fn statistics(&self) -> Result<Statistics> {
602-
Ok(self.projected_stats())
601+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
602+
if let Some(partition) = partition {
603+
// Get statistics for a specific partition
604+
if let Some(file_group) = self.file_groups.get(partition) {
605+
if let Some(stat) = file_group.file_statistics(None) {
606+
// Project the statistics based on the projection
607+
let table_cols_stats = self
608+
.projection_indices()
609+
.into_iter()
610+
.map(|idx| {
611+
if idx < self.file_schema().fields().len() {
612+
stat.column_statistics[idx].clone()
613+
} else {
614+
// TODO provide accurate stat for partition column
615+
// See https://github.com/apache/datafusion/issues/1186
616+
ColumnStatistics::new_unknown()
617+
}
618+
})
619+
.collect();
620+
621+
return Ok(Statistics {
622+
num_rows: stat.num_rows,
623+
total_byte_size: stat.total_byte_size,
624+
column_statistics: table_cols_stats,
625+
});
626+
}
627+
}
628+
// If no statistics available for this partition, return unknown
629+
Ok(Statistics::new_unknown(&self.projected_schema()))
630+
} else {
631+
// Return aggregate statistics across all partitions
632+
Ok(self.projected_stats())
633+
}
603634
}
604635

605636
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {
@@ -1603,7 +1634,7 @@ mod tests {
16031634
);
16041635

16051636
let source_statistics = conf.file_source.statistics().unwrap();
1606-
let conf_stats = conf.statistics().unwrap();
1637+
let conf_stats = conf.partition_statistics(None).unwrap();
16071638

16081639
// projection should be reflected in the file source statistics
16091640
assert_eq!(conf_stats.num_rows, Precision::Inexact(3));
@@ -2510,4 +2541,91 @@ mod tests {
25102541

25112542
Ok(())
25122543
}
2544+
2545+
#[test]
2546+
fn test_partition_statistics_projection() {
2547+
// This test verifies that partition_statistics applies projection correctly.
2548+
// The old implementation had a bug where it returned file group statistics
2549+
// without applying the projection, returning all column statistics instead
2550+
// of just the projected ones.
2551+
2552+
use crate::source::DataSourceExec;
2553+
use datafusion_physical_plan::ExecutionPlan;
2554+
2555+
// Create a schema with 4 columns
2556+
let schema = Arc::new(Schema::new(vec![
2557+
Field::new("col0", DataType::Int32, false),
2558+
Field::new("col1", DataType::Int32, false),
2559+
Field::new("col2", DataType::Int32, false),
2560+
Field::new("col3", DataType::Int32, false),
2561+
]));
2562+
2563+
// Create statistics for all 4 columns
2564+
let file_group_stats = Statistics {
2565+
num_rows: Precision::Exact(100),
2566+
total_byte_size: Precision::Exact(1024),
2567+
column_statistics: vec![
2568+
ColumnStatistics {
2569+
null_count: Precision::Exact(0),
2570+
..ColumnStatistics::new_unknown()
2571+
},
2572+
ColumnStatistics {
2573+
null_count: Precision::Exact(5),
2574+
..ColumnStatistics::new_unknown()
2575+
},
2576+
ColumnStatistics {
2577+
null_count: Precision::Exact(10),
2578+
..ColumnStatistics::new_unknown()
2579+
},
2580+
ColumnStatistics {
2581+
null_count: Precision::Exact(15),
2582+
..ColumnStatistics::new_unknown()
2583+
},
2584+
],
2585+
};
2586+
2587+
// Create a file group with statistics
2588+
let file_group = FileGroup::new(vec![PartitionedFile::new("test.parquet", 1024)])
2589+
.with_statistics(Arc::new(file_group_stats));
2590+
2591+
// Create a FileScanConfig with projection: only keep columns 0 and 2
2592+
let config = FileScanConfigBuilder::new(
2593+
ObjectStoreUrl::parse("test:///").unwrap(),
2594+
Arc::clone(&schema),
2595+
Arc::new(MockSource::default()),
2596+
)
2597+
.with_projection(Some(vec![0, 2])) // Only project columns 0 and 2
2598+
.with_file_groups(vec![file_group])
2599+
.build();
2600+
2601+
// Create a DataSourceExec from the config
2602+
let exec = DataSourceExec::from_data_source(config);
2603+
2604+
// Get statistics for partition 0
2605+
let partition_stats = exec.partition_statistics(Some(0)).unwrap();
2606+
2607+
// Verify that only 2 columns are in the statistics (the projected ones)
2608+
assert_eq!(
2609+
partition_stats.column_statistics.len(),
2610+
2,
2611+
"Expected 2 column statistics (projected), but got {}",
2612+
partition_stats.column_statistics.len()
2613+
);
2614+
2615+
// Verify the column statistics are for columns 0 and 2
2616+
assert_eq!(
2617+
partition_stats.column_statistics[0].null_count,
2618+
Precision::Exact(0),
2619+
"First projected column should be col0 with 0 nulls"
2620+
);
2621+
assert_eq!(
2622+
partition_stats.column_statistics[1].null_count,
2623+
Precision::Exact(10),
2624+
"Second projected column should be col2 with 10 nulls"
2625+
);
2626+
2627+
// Verify row count and byte size are preserved
2628+
assert_eq!(partition_stats.num_rows, Precision::Exact(100));
2629+
assert_eq!(partition_stats.total_byte_size, Precision::Exact(1024));
2630+
}
25132631
}

datafusion/datasource/src/memory.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use std::collections::BinaryHeap;
2121
use std::fmt;
2222
use std::fmt::Debug;
2323
use std::ops::Deref;
24+
use std::slice::from_ref;
2425
use std::sync::Arc;
2526

2627
use crate::sink::DataSink;
@@ -192,12 +193,27 @@ impl DataSource for MemorySourceConfig {
192193
SchedulingType::Cooperative
193194
}
194195

195-
fn statistics(&self) -> Result<Statistics> {
196-
Ok(common::compute_record_batch_statistics(
197-
&self.partitions,
198-
&self.schema,
199-
self.projection.clone(),
200-
))
196+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
197+
if let Some(partition) = partition {
198+
// Compute statistics for a specific partition
199+
if let Some(batches) = self.partitions.get(partition) {
200+
Ok(common::compute_record_batch_statistics(
201+
from_ref(batches),
202+
&self.schema,
203+
self.projection.clone(),
204+
))
205+
} else {
206+
// Invalid partition index
207+
Ok(Statistics::new_unknown(&self.projected_schema))
208+
}
209+
} else {
210+
// Compute statistics across all partitions
211+
Ok(common::compute_record_batch_statistics(
212+
&self.partitions,
213+
&self.schema,
214+
self.projection.clone(),
215+
))
216+
}
201217
}
202218

203219
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn DataSource>> {

datafusion/datasource/src/source.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,21 @@ pub trait DataSource: Send + Sync + Debug {
151151
fn scheduling_type(&self) -> SchedulingType {
152152
SchedulingType::NonCooperative
153153
}
154-
fn statistics(&self) -> Result<Statistics>;
154+
155+
/// Returns statistics for a specific partition, or aggregate statistics
156+
/// across all partitions if `partition` is `None`.
157+
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>;
158+
159+
/// Returns aggregate statistics across all partitions.
160+
///
161+
/// # Deprecated
162+
/// Use [`Self::partition_statistics`] instead, which provides more fine-grained
163+
/// control over statistics retrieval (per-partition or aggregate).
164+
#[deprecated(since = "51.0.0", note = "Use partition_statistics instead")]
165+
fn statistics(&self) -> Result<Statistics> {
166+
self.partition_statistics(None)
167+
}
168+
155169
/// Return a copy of this DataSource with a new fetch limit
156170
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn DataSource>>;
157171
fn fetch(&self) -> Option<usize>;
@@ -285,21 +299,7 @@ impl ExecutionPlan for DataSourceExec {
285299
}
286300

287301
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
288-
if let Some(partition) = partition {
289-
let mut statistics = Statistics::new_unknown(&self.schema());
290-
if let Some(file_config) =
291-
self.data_source.as_any().downcast_ref::<FileScanConfig>()
292-
{
293-
if let Some(file_group) = file_config.file_groups.get(partition) {
294-
if let Some(stat) = file_group.file_statistics(None) {
295-
statistics = stat.clone();
296-
}
297-
}
298-
}
299-
Ok(statistics)
300-
} else {
301-
Ok(self.data_source.statistics()?)
302-
}
302+
self.data_source.partition_statistics(partition)
303303
}
304304

305305
fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {

0 commit comments

Comments
 (0)