Skip to content

Commit e2d7126

Browse files
adriangbclaude
andcommitted
Move statistics handling into FileScanConfig
This commit moves statistics handling from individual FileSource implementations into FileScanConfig, simplifying the FileSource trait. Changes: - Remove statistics() and with_statistics() methods from FileSource trait - Remove with_projection() method from FileSource trait (statistics PR only) - Add statistics field to FileScanConfig struct - Add statistics() method to FileScanConfig to retrieve statistics - Update FileScanConfigBuilder to properly handle statistics - Remove projected_statistics field from all FileSource implementations: - ParquetSource - CsvSource - JsonSource - AvroSource - ArrowFileSource and ArrowStreamFileSource - MockSource (test utility) - Update test utilities and assertions to use config.statistics() instead of file_source.statistics() - Update proto serialization to use config.statistics() Part of apache#18627 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent cfb26b6 commit e2d7126

File tree

10 files changed

+48
-191
lines changed

10 files changed

+48
-191
lines changed

datafusion/core/tests/physical_optimizer/test_utils.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,7 @@ pub(crate) fn parquet_exec_with_stats(file_size: u64) -> Arc<DataSourceExec> {
131131
.with_statistics(statistics)
132132
.build();
133133

134-
assert_eq!(
135-
config.file_source.statistics().unwrap().num_rows,
136-
Precision::Inexact(10000)
137-
);
134+
assert_eq!(config.statistics().num_rows, Precision::Inexact(10000));
138135
DataSourceExec::from_data_source(config)
139136
}
140137

datafusion/datasource-arrow/src/source.rs

Lines changed: 9 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ use object_store::{GetOptions, GetRange, GetResultPayload, ObjectStore};
5656
pub(crate) struct ArrowFileSource {
5757
table_schema: TableSchema,
5858
metrics: ExecutionPlanMetricsSet,
59-
projected_statistics: Option<Statistics>,
6059
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
6160
}
6261

@@ -66,7 +65,6 @@ impl ArrowFileSource {
6665
Self {
6766
table_schema: table_schema.into(),
6867
metrics: ExecutionPlanMetricsSet::new(),
69-
projected_statistics: None,
7068
schema_adapter_factory: None,
7169
}
7270
}
@@ -103,26 +101,16 @@ impl FileSource for ArrowFileSource {
103101
Arc::new(Self { ..self.clone() })
104102
}
105103

106-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
107-
let mut conf = self.clone();
108-
conf.projected_statistics = Some(statistics);
109-
Arc::new(conf)
110-
}
104+
111105

112-
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
113-
Arc::new(Self { ..self.clone() })
106+
)
114107
}
115108

116109
fn metrics(&self) -> &ExecutionPlanMetricsSet {
117110
&self.metrics
118111
}
119112

120-
fn statistics(&self) -> Result<Statistics> {
121-
let statistics = &self.projected_statistics;
122-
Ok(statistics
123-
.clone()
124-
.expect("projected_statistics must be set"))
125-
}
113+
126114

127115
fn file_type(&self) -> &str {
128116
"arrow"
@@ -148,7 +136,6 @@ impl FileSource for ArrowFileSource {
148136
pub(crate) struct ArrowStreamFileSource {
149137
table_schema: TableSchema,
150138
metrics: ExecutionPlanMetricsSet,
151-
projected_statistics: Option<Statistics>,
152139
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
153140
}
154141

@@ -158,7 +145,6 @@ impl ArrowStreamFileSource {
158145
Self {
159146
table_schema: table_schema.into(),
160147
metrics: ExecutionPlanMetricsSet::new(),
161-
projected_statistics: None,
162148
schema_adapter_factory: None,
163149
}
164150
}
@@ -191,14 +177,9 @@ impl FileSource for ArrowStreamFileSource {
191177
Arc::new(Self { ..self.clone() })
192178
}
193179

194-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
195-
let mut conf = self.clone();
196-
conf.projected_statistics = Some(statistics);
197-
Arc::new(conf)
198-
}
180+
199181

200-
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
201-
Arc::new(Self { ..self.clone() })
182+
)
202183
}
203184

204185
fn repartitioned(
@@ -224,12 +205,7 @@ impl FileSource for ArrowStreamFileSource {
224205
&self.metrics
225206
}
226207

227-
fn statistics(&self) -> Result<Statistics> {
228-
let statistics = &self.projected_statistics;
229-
Ok(statistics
230-
.clone()
231-
.expect("projected_statistics must be set"))
232-
}
208+
233209

234210
fn file_type(&self) -> &str {
235211
"arrow_stream"
@@ -484,25 +460,17 @@ impl FileSource for ArrowSource {
484460
})
485461
}
486462

487-
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
488-
Arc::new(Self {
489-
inner: self.inner.with_projection(config),
490-
})
463+
)
491464
}
492465

493-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
494-
Arc::new(Self {
495-
inner: self.inner.with_statistics(statistics),
496-
})
466+
)
497467
}
498468

499469
fn metrics(&self) -> &ExecutionPlanMetricsSet {
500470
self.inner.metrics()
501471
}
502472

503-
fn statistics(&self) -> Result<Statistics> {
504-
self.inner.statistics()
505-
}
473+
506474

507475
fn file_type(&self) -> &str {
508476
self.inner.file_type()

datafusion/datasource-avro/src/source.rs

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ use std::sync::Arc;
2323
use crate::avro_to_arrow::Reader as AvroReader;
2424

2525
use datafusion_common::error::Result;
26-
use datafusion_common::Statistics;
2726
use datafusion_datasource::file::FileSource;
2827
use datafusion_datasource::file_scan_config::FileScanConfig;
2928
use datafusion_datasource::file_stream::FileOpener;
@@ -41,7 +40,6 @@ pub struct AvroSource {
4140
batch_size: Option<usize>,
4241
projection: Option<Vec<String>>,
4342
metrics: ExecutionPlanMetricsSet,
44-
projected_statistics: Option<Statistics>,
4543
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
4644
}
4745

@@ -53,7 +51,6 @@ impl AvroSource {
5351
batch_size: None,
5452
projection: None,
5553
metrics: ExecutionPlanMetricsSet::new(),
56-
projected_statistics: None,
5754
schema_adapter_factory: None,
5855
}
5956
}
@@ -95,28 +92,15 @@ impl FileSource for AvroSource {
9592
Arc::new(conf)
9693
}
9794

98-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
99-
let mut conf = self.clone();
100-
conf.projected_statistics = Some(statistics);
101-
Arc::new(conf)
102-
}
95+
10396

104-
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
105-
let mut conf = self.clone();
106-
conf.projection = config.projected_file_column_names();
107-
Arc::new(conf)
108-
}
97+
10998

11099
fn metrics(&self) -> &ExecutionPlanMetricsSet {
111100
&self.metrics
112101
}
113102

114-
fn statistics(&self) -> Result<Statistics> {
115-
let statistics = &self.projected_statistics;
116-
Ok(statistics
117-
.clone()
118-
.expect("projected_statistics must be set"))
119-
}
103+
120104

121105
fn file_type(&self) -> &str {
122106
"avro"

datafusion/datasource-csv/src/source.rs

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use datafusion_datasource::{
3434

3535
use arrow::csv;
3636
use datafusion_common::config::CsvOptions;
37-
use datafusion_common::{DataFusionError, Result, Statistics};
37+
use datafusion_common::{DataFusionError, Result};
3838
use datafusion_common_runtime::JoinSet;
3939
use datafusion_datasource::file::FileSource;
4040
use datafusion_datasource::file_scan_config::FileScanConfig;
@@ -90,7 +90,6 @@ pub struct CsvSource {
9090
table_schema: TableSchema,
9191
file_projection: Option<Vec<usize>>,
9292
metrics: ExecutionPlanMetricsSet,
93-
projected_statistics: Option<Statistics>,
9493
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
9594
}
9695

@@ -103,7 +102,6 @@ impl CsvSource {
103102
batch_size: None,
104103
file_projection: None,
105104
metrics: ExecutionPlanMetricsSet::new(),
106-
projected_statistics: None,
107105
schema_adapter_factory: None,
108106
}
109107
}
@@ -266,27 +264,14 @@ impl FileSource for CsvSource {
266264
Arc::new(conf)
267265
}
268266

269-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
270-
let mut conf = self.clone();
271-
conf.projected_statistics = Some(statistics);
272-
Arc::new(conf)
273-
}
267+
274268

275-
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
276-
let mut conf = self.clone();
277-
conf.file_projection = config.file_column_projection_indices();
278-
Arc::new(conf)
279-
}
269+
280270

281271
fn metrics(&self) -> &ExecutionPlanMetricsSet {
282272
&self.metrics
283273
}
284-
fn statistics(&self) -> Result<Statistics> {
285-
let statistics = &self.projected_statistics;
286-
Ok(statistics
287-
.clone()
288-
.expect("projected_statistics must be set"))
289-
}
274+
290275
fn file_type(&self) -> &str {
291276
"csv"
292277
}

datafusion/datasource-json/src/source.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ pub struct JsonSource {
7979
table_schema: datafusion_datasource::TableSchema,
8080
batch_size: Option<usize>,
8181
metrics: ExecutionPlanMetricsSet,
82-
projected_statistics: Option<Statistics>,
8382
schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
8483
}
8584

@@ -90,7 +89,6 @@ impl JsonSource {
9089
table_schema: table_schema.into(),
9190
batch_size: None,
9291
metrics: ExecutionPlanMetricsSet::new(),
93-
projected_statistics: None,
9492
schema_adapter_factory: None,
9593
}
9694
}
@@ -133,27 +131,10 @@ impl FileSource for JsonSource {
133131
Arc::new(conf)
134132
}
135133

136-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
137-
let mut conf = self.clone();
138-
conf.projected_statistics = Some(statistics);
139-
Arc::new(conf)
140-
}
141-
142-
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
143-
Arc::new(Self { ..self.clone() })
144-
}
145-
146134
fn metrics(&self) -> &ExecutionPlanMetricsSet {
147135
&self.metrics
148136
}
149137

150-
fn statistics(&self) -> Result<Statistics> {
151-
let statistics = &self.projected_statistics;
152-
Ok(statistics
153-
.clone()
154-
.expect("projected_statistics must be set to call"))
155-
}
156-
157138
fn file_type(&self) -> &str {
158139
"json"
159140
}

datafusion/datasource-parquet/src/source.rs

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use datafusion_datasource::schema_adapter::{
3737

3838
use arrow::datatypes::TimeUnit;
3939
use datafusion_common::config::TableParquetOptions;
40-
use datafusion_common::{DataFusionError, Statistics};
40+
use datafusion_common::{DataFusionError};
4141
use datafusion_datasource::file::FileSource;
4242
use datafusion_datasource::file_scan_config::FileScanConfig;
4343
use datafusion_datasource::TableSchema;
@@ -286,7 +286,6 @@ pub struct ParquetSource {
286286
pub(crate) batch_size: Option<usize>,
287287
/// Optional hint for the size of the parquet metadata
288288
pub(crate) metadata_size_hint: Option<usize>,
289-
pub(crate) projected_statistics: Option<Statistics>,
290289
#[cfg(feature = "parquet_encryption")]
291290
pub(crate) encryption_factory: Option<Arc<dyn EncryptionFactory>>,
292291
}
@@ -307,7 +306,6 @@ impl ParquetSource {
307306
schema_adapter_factory: None,
308307
batch_size: None,
309308
metadata_size_hint: None,
310-
projected_statistics: None,
311309
#[cfg(feature = "parquet_encryption")]
312310
encryption_factory: None,
313311
}
@@ -624,37 +622,10 @@ impl FileSource for ParquetSource {
624622
Arc::new(conf)
625623
}
626624

627-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
628-
let mut conf = self.clone();
629-
conf.projected_statistics = Some(statistics);
630-
Arc::new(conf)
631-
}
632-
633-
fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
634-
Arc::new(Self { ..self.clone() })
635-
}
636-
637625
fn metrics(&self) -> &ExecutionPlanMetricsSet {
638626
&self.metrics
639627
}
640628

641-
fn statistics(&self) -> datafusion_common::Result<Statistics> {
642-
let statistics = &self.projected_statistics;
643-
let statistics = statistics
644-
.clone()
645-
.expect("projected_statistics must be set");
646-
// When filters are pushed down, we have no way of knowing the exact statistics.
647-
// Note that pruning predicate is also a kind of filter pushdown.
648-
// (bloom filters use `pruning_predicate` too).
649-
// Because filter pushdown may happen dynamically as long as there is a predicate
650-
// if we have *any* predicate applied, we can't guarantee the statistics are exact.
651-
if self.filter().is_some() {
652-
Ok(statistics.to_inexact())
653-
} else {
654-
Ok(statistics)
655-
}
656-
}
657-
658629
fn file_type(&self) -> &str {
659630
"parquet"
660631
}

datafusion/datasource/src/file.rs

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ use crate::file_scan_config::FileScanConfig;
2727
use crate::file_stream::FileOpener;
2828
use crate::schema_adapter::SchemaAdapterFactory;
2929
use datafusion_common::config::ConfigOptions;
30-
use datafusion_common::{not_impl_err, Result, Statistics};
30+
use datafusion_common::{not_impl_err, Result};
3131
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
3232
use datafusion_physical_plan::filter_pushdown::{FilterPushdownPropagation, PushedDown};
3333
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
@@ -66,18 +66,12 @@ pub trait FileSource: Send + Sync {
6666
fn table_schema(&self) -> &crate::table_schema::TableSchema;
6767
/// Initialize new type with batch size configuration
6868
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
69-
/// Initialize new instance with projection information
70-
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>;
71-
/// Initialize new instance with projected statistics
72-
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>;
7369
/// Returns the filter expression that will be applied during the file scan.
7470
fn filter(&self) -> Option<Arc<dyn PhysicalExpr>> {
7571
None
7672
}
7773
/// Return execution plan metrics
7874
fn metrics(&self) -> &ExecutionPlanMetricsSet;
79-
/// Return projected statistics
80-
fn statistics(&self) -> Result<Statistics>;
8175
/// String representation of file source such as "csv", "json", "parquet"
8276
fn file_type(&self) -> &str;
8377
/// Format FileType specific information

0 commit comments

Comments
 (0)