Skip to content

Commit 8b6adb7

Browse files
committed
clean up duplicate information in FileOpener trait (apache#17956)
* clean up duplicate information in FileOpener trait * remove unused struct * remove more * rename paramter * more renames * minimize diff * fix renames * fix renames * more fixes
1 parent 04926ae commit 8b6adb7

File tree

29 files changed

+283
-317
lines changed

29 files changed

+283
-317
lines changed

datafusion-examples/examples/advanced_parquet_index.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use datafusion::common::{
3030
use datafusion::datasource::listing::PartitionedFile;
3131
use datafusion::datasource::physical_plan::parquet::ParquetAccessPlan;
3232
use datafusion::datasource::physical_plan::{
33-
FileMeta, FileScanConfigBuilder, ParquetFileReaderFactory, ParquetSource,
33+
FileScanConfigBuilder, ParquetFileReaderFactory, ParquetSource,
3434
};
3535
use datafusion::datasource::TableProvider;
3636
use datafusion::execution::object_store::ObjectStoreUrl;
@@ -555,15 +555,16 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
555555
fn create_reader(
556556
&self,
557557
_partition_index: usize,
558-
file_meta: FileMeta,
558+
partitioned_file: PartitionedFile,
559559
metadata_size_hint: Option<usize>,
560560
_metrics: &ExecutionPlanMetricsSet,
561561
) -> Result<Box<dyn AsyncFileReader + Send>> {
562562
// for this example we ignore the partition index and metrics
563563
// but in a real system you would likely use them to report details on
564564
// the performance of the reader.
565-
let filename = file_meta
566-
.location()
565+
let filename = partition_file
566+
.object_meta
567+
.location
567568
.parts()
568569
.last()
569570
.expect("No path in location")
@@ -572,8 +573,8 @@ impl ParquetFileReaderFactory for CachedParquetFileReaderFactory {
572573

573574
let object_store = Arc::clone(&self.object_store);
574575
let mut inner =
575-
ParquetObjectReader::new(object_store, file_meta.object_meta.location)
576-
.with_file_size(file_meta.object_meta.size);
576+
ParquetObjectReader::new(object_store, partition_file.object_meta.location)
577+
.with_file_size(partition_file.object_meta.size);
577578

578579
if let Some(hint) = metadata_size_hint {
579580
inner = inner.with_footer_size_hint(hint)

datafusion-testing

Submodule datafusion-testing updated 88 files

datafusion/core/"/var/folders/jj/m8gr41dn2g96dzjtlk522g200000gn/T/.tmpPJDwAs/target.csv"

Whitespace-only changes.

datafusion/core/"/var/folders/jj/m8gr41dn2g96dzjtlk522g200000gn/T/.tmpua8p7n/target.csv"

Whitespace-only changes.

datafusion/core/src/dataframe/parquet.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ mod tests {
247247
Ok(())
248248
}
249249

250+
#[rstest::rstest]
250251
#[cfg(feature = "parquet_encryption")]
251252
#[tokio::test]
252253
async fn roundtrip_parquet_with_encryption() -> Result<()> {

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::any::Any;
1919
use std::sync::Arc;
2020

21-
use crate::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
21+
use crate::datasource::physical_plan::{FileOpenFuture, FileOpener};
2222
use crate::error::Result;
2323
use datafusion_datasource::as_file_source;
2424
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
@@ -122,18 +122,16 @@ pub struct ArrowOpener {
122122
}
123123

124124
impl FileOpener for ArrowOpener {
125-
fn open(
126-
&self,
127-
file_meta: FileMeta,
128-
_file: PartitionedFile,
129-
) -> Result<FileOpenFuture> {
125+
fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
130126
let object_store = Arc::clone(&self.object_store);
131127
let projection = self.projection.clone();
132128
Ok(Box::pin(async move {
133-
let range = file_meta.range.clone();
129+
let range = partitioned_file.range.clone();
134130
match range {
135131
None => {
136-
let r = object_store.get(file_meta.location()).await?;
132+
let r = object_store
133+
.get(&partitioned_file.object_meta.location)
134+
.await?;
137135
match r.payload {
138136
#[cfg(not(target_arch = "wasm32"))]
139137
GetResultPayload::File(file, _) => {
@@ -164,7 +162,7 @@ impl FileOpener for ArrowOpener {
164162
..Default::default()
165163
};
166164
let get_result = object_store
167-
.get_opts(file_meta.location(), get_option)
165+
.get_opts(&partitioned_file.object_meta.location, get_option)
168166
.await?;
169167
let footer_len_buf = get_result.bytes().await?;
170168
let footer_len = arrow_ipc::reader::read_footer_length(
@@ -176,7 +174,7 @@ impl FileOpener for ArrowOpener {
176174
..Default::default()
177175
};
178176
let get_result = object_store
179-
.get_opts(file_meta.location(), get_option)
177+
.get_opts(&partitioned_file.object_meta.location, get_option)
180178
.await?;
181179
let footer_buf = get_result.bytes().await?;
182180
let footer = arrow_ipc::root_as_footer(
@@ -204,7 +202,7 @@ impl FileOpener for ArrowOpener {
204202
})
205203
.collect_vec();
206204
let dict_results = object_store
207-
.get_ranges(file_meta.location(), &dict_ranges)
205+
.get_ranges(&partitioned_file.object_meta.location, &dict_ranges)
208206
.await?;
209207
for (dict_block, dict_result) in
210208
footer.dictionaries().iter().flatten().zip(dict_results)
@@ -237,7 +235,10 @@ impl FileOpener for ArrowOpener {
237235
.collect_vec();
238236

239237
let recordbatch_results = object_store
240-
.get_ranges(file_meta.location(), &recordbatch_ranges)
238+
.get_ranges(
239+
&partitioned_file.object_meta.location,
240+
&recordbatch_ranges,
241+
)
241242
.await?;
242243

243244
Ok(futures::stream::iter(

datafusion/core/src/datasource/physical_plan/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ pub use csv::{CsvOpener, CsvSource};
4343
pub use datafusion_datasource::file::FileSource;
4444
pub use datafusion_datasource::file_groups::FileGroup;
4545
pub use datafusion_datasource::file_groups::FileGroupPartitioner;
46-
pub use datafusion_datasource::file_meta::FileMeta;
4746
pub use datafusion_datasource::file_scan_config::{
4847
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
4948
FileScanConfigBuilder,

datafusion/core/src/datasource/physical_plan/parquet.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ mod tests {
5050
use datafusion_common::test_util::{batches_to_sort_string, batches_to_string};
5151
use datafusion_common::{assert_contains, Result, ScalarValue};
5252
use datafusion_datasource::file_format::FileFormat;
53-
use datafusion_datasource::file_meta::FileMeta;
5453
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
5554
use datafusion_datasource::source::DataSourceExec;
5655

@@ -2207,7 +2206,7 @@ mod tests {
22072206
fn create_reader(
22082207
&self,
22092208
partition_index: usize,
2210-
file_meta: FileMeta,
2209+
partitioned_file: PartitionedFile,
22112210
metadata_size_hint: Option<usize>,
22122211
metrics: &ExecutionPlanMetricsSet,
22132212
) -> Result<Box<dyn parquet::arrow::async_reader::AsyncFileReader + Send>>
@@ -2218,7 +2217,7 @@ mod tests {
22182217
.push(metadata_size_hint);
22192218
self.inner.create_reader(
22202219
partition_index,
2221-
file_meta,
2220+
partitioned_file,
22222221
metadata_size_hint,
22232222
metrics,
22242223
)

datafusion/core/tests/parquet/custom_reader.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use arrow::record_batch::RecordBatch;
2626
use datafusion::datasource::listing::PartitionedFile;
2727
use datafusion::datasource::object_store::ObjectStoreUrl;
2828
use datafusion::datasource::physical_plan::{
29-
FileMeta, ParquetFileMetrics, ParquetFileReaderFactory, ParquetSource,
29+
ParquetFileMetrics, ParquetFileReaderFactory, ParquetSource,
3030
};
3131
use datafusion::physical_plan::collect;
3232
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
@@ -119,11 +119,11 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory {
119119
fn create_reader(
120120
&self,
121121
partition_index: usize,
122-
file_meta: FileMeta,
122+
partitioned_file: PartitionedFile,
123123
metadata_size_hint: Option<usize>,
124124
metrics: &ExecutionPlanMetricsSet,
125125
) -> Result<Box<dyn AsyncFileReader + Send>> {
126-
let metadata = file_meta
126+
let metadata = partitioned_file
127127
.extensions
128128
.as_ref()
129129
.expect("has user defined metadata");
@@ -135,13 +135,13 @@ impl ParquetFileReaderFactory for InMemoryParquetFileReaderFactory {
135135

136136
let parquet_file_metrics = ParquetFileMetrics::new(
137137
partition_index,
138-
file_meta.location().as_ref(),
138+
partitioned_file.object_meta.location.as_ref(),
139139
metrics,
140140
);
141141

142142
Ok(Box::new(ParquetFileReader {
143143
store: Arc::clone(&self.0),
144-
meta: file_meta.object_meta,
144+
meta: partitioned_file.object_meta,
145145
metrics: parquet_file_metrics,
146146
metadata_size_hint,
147147
}))

datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use arrow::{array::RecordBatch, compute::concat_batches};
2020
use datafusion::{datasource::object_store::ObjectStoreUrl, physical_plan::PhysicalExpr};
2121
use datafusion_common::{config::ConfigOptions, internal_err, Result, Statistics};
2222
use datafusion_datasource::{
23-
file::FileSource, file_meta::FileMeta, file_scan_config::FileScanConfig,
23+
file::FileSource, file_scan_config::FileScanConfig,
2424
file_scan_config::FileScanConfigBuilder, file_stream::FileOpenFuture,
2525
file_stream::FileOpener, schema_adapter::DefaultSchemaAdapterFactory,
2626
schema_adapter::SchemaAdapterFactory, source::DataSourceExec, PartitionedFile,
@@ -56,11 +56,7 @@ pub struct TestOpener {
5656
}
5757

5858
impl FileOpener for TestOpener {
59-
fn open(
60-
&self,
61-
_file_meta: FileMeta,
62-
_file: PartitionedFile,
63-
) -> Result<FileOpenFuture> {
59+
fn open(&self, _partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
6460
let mut batches = self.batches.clone();
6561
if let Some(batch_size) = self.batch_size {
6662
let batch = concat_batches(&batches[0].schema(), &batches)?;

0 commit comments

Comments
 (0)