Skip to content

Commit 3a5216a

Browse files
committed
remove more
1 parent 1aeeb2a commit 3a5216a

File tree

6 files changed

+31
-64
lines changed

6 files changed

+31
-64
lines changed

datafusion/core/src/datasource/physical_plan/arrow_file.rs

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
use std::any::Any;
1919
use std::sync::Arc;
2020

21-
use crate::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
21+
use crate::datasource::physical_plan::{FileOpenFuture, FileOpener};
2222
use crate::error::Result;
2323
use datafusion_datasource::as_file_source;
2424
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
@@ -123,20 +123,13 @@ pub struct ArrowOpener {
123123

124124
impl FileOpener for ArrowOpener {
125125
fn open(&self, file: PartitionedFile) -> Result<FileOpenFuture> {
126-
let file_meta = FileMeta {
127-
object_meta: file.object_meta.clone(),
128-
range: file.range.clone(),
129-
extensions: file.extensions.clone(),
130-
metadata_size_hint: file.metadata_size_hint,
131-
};
132-
133126
let object_store = Arc::clone(&self.object_store);
134127
let projection = self.projection.clone();
135128
Ok(Box::pin(async move {
136-
let range = file_meta.range.clone();
129+
let range = file.range.clone();
137130
match range {
138131
None => {
139-
let r = object_store.get(file_meta.location()).await?;
132+
let r = object_store.get(&file.object_meta.location).await?;
140133
match r.payload {
141134
#[cfg(not(target_arch = "wasm32"))]
142135
GetResultPayload::File(file, _) => {
@@ -167,7 +160,7 @@ impl FileOpener for ArrowOpener {
167160
..Default::default()
168161
};
169162
let get_result = object_store
170-
.get_opts(file_meta.location(), get_option)
163+
.get_opts(&file.object_meta.location, get_option)
171164
.await?;
172165
let footer_len_buf = get_result.bytes().await?;
173166
let footer_len = arrow_ipc::reader::read_footer_length(
@@ -179,7 +172,7 @@ impl FileOpener for ArrowOpener {
179172
..Default::default()
180173
};
181174
let get_result = object_store
182-
.get_opts(file_meta.location(), get_option)
175+
.get_opts(&file.object_meta.location, get_option)
183176
.await?;
184177
let footer_buf = get_result.bytes().await?;
185178
let footer = arrow_ipc::root_as_footer(
@@ -207,7 +200,7 @@ impl FileOpener for ArrowOpener {
207200
})
208201
.collect_vec();
209202
let dict_results = object_store
210-
.get_ranges(file_meta.location(), &dict_ranges)
203+
.get_ranges(&file.object_meta.location, &dict_ranges)
211204
.await?;
212205
for (dict_block, dict_result) in
213206
footer.dictionaries().iter().flatten().zip(dict_results)
@@ -240,7 +233,7 @@ impl FileOpener for ArrowOpener {
240233
.collect_vec();
241234

242235
let recordbatch_results = object_store
243-
.get_ranges(file_meta.location(), &recordbatch_ranges)
236+
.get_ranges(&file.object_meta.location, &recordbatch_ranges)
244237
.await?;
245238

246239
Ok(futures::stream::iter(

datafusion/core/src/datasource/physical_plan/mod.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@ pub use csv::{CsvOpener, CsvSource};
4343
pub use datafusion_datasource::file::FileSource;
4444
pub use datafusion_datasource::file_groups::FileGroup;
4545
pub use datafusion_datasource::file_groups::FileGroupPartitioner;
46-
pub use datafusion_datasource::file_meta::FileMeta;
4746
pub use datafusion_datasource::file_scan_config::{
4847
wrap_partition_type_in_dict, wrap_partition_value_in_dict, FileScanConfig,
4948
FileScanConfigBuilder,

datafusion/datasource-avro/src/source.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,7 @@ mod private {
145145
use super::*;
146146

147147
use bytes::Buf;
148-
use datafusion_datasource::{
149-
file_meta::FileMeta, file_stream::FileOpenFuture, PartitionedFile,
150-
};
148+
use datafusion_datasource::{file_stream::FileOpenFuture, PartitionedFile};
151149
use futures::StreamExt;
152150
use object_store::{GetResultPayload, ObjectStore};
153151

@@ -158,17 +156,10 @@ mod private {
158156

159157
impl FileOpener for AvroOpener {
160158
fn open(&self, file: PartitionedFile) -> Result<FileOpenFuture> {
161-
let file_meta = FileMeta {
162-
object_meta: file.object_meta.clone(),
163-
range: file.range.clone(),
164-
extensions: file.extensions.clone(),
165-
metadata_size_hint: file.metadata_size_hint,
166-
};
167-
168159
let config = Arc::clone(&self.config);
169160
let object_store = Arc::clone(&self.object_store);
170161
Ok(Box::pin(async move {
171-
let r = object_store.get(file_meta.location()).await?;
162+
let r = object_store.get(&file.object_meta.location).await?;
172163
match r.payload {
173164
GetResultPayload::File(file, _) => {
174165
let reader = config.open(file)?;

datafusion/datasource-csv/src/source.rs

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use std::task::Poll;
2626

2727
use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
2828
use datafusion_datasource::file_compression_type::FileCompressionType;
29-
use datafusion_datasource::file_meta::FileMeta;
3029
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
3130
use datafusion_datasource::{
3231
as_file_source, calculate_range, FileRange, ListingTableUrl, PartitionedFile,
@@ -338,18 +337,11 @@ impl FileOpener for CsvOpener {
338337
/// A},1,2,3,4,5,6,7,8,9\n
339338
/// The lines read would be: [1, 2]
340339
fn open(&self, file: PartitionedFile) -> Result<FileOpenFuture> {
341-
let file_meta = FileMeta {
342-
object_meta: file.object_meta.clone(),
343-
range: file.range.clone(),
344-
extensions: file.extensions.clone(),
345-
metadata_size_hint: file.metadata_size_hint,
346-
};
347-
348340
// `self.config.has_header` controls whether to skip reading the 1st line header
349341
// If the .csv file is read in parallel and this `CsvOpener` is only reading some middle
350342
// partition, then don't skip first line
351343
let mut csv_has_header = self.config.has_header;
352-
if let Some(FileRange { start, .. }) = file_meta.range {
344+
if let Some(FileRange { start, .. }) = file.range {
353345
if start != 0 {
354346
csv_has_header = false;
355347
}
@@ -363,7 +355,7 @@ impl FileOpener for CsvOpener {
363355

364356
let file_compression_type = self.file_compression_type.to_owned();
365357

366-
if file_meta.range.is_some() {
358+
if file.range.is_some() {
367359
assert!(
368360
!file_compression_type.is_compressed(),
369361
"Reading compressed .csv in parallel is not supported"
@@ -376,8 +368,7 @@ impl FileOpener for CsvOpener {
376368
Ok(Box::pin(async move {
377369
// Current partition contains bytes [start_byte, end_byte) (might contain incomplete lines at boundaries)
378370

379-
let calculated_range =
380-
calculate_range(&file_meta, &store, terminator).await?;
371+
let calculated_range = calculate_range(&file, &store, terminator).await?;
381372

382373
let range = match calculated_range {
383374
RangeCalculation::Range(None) => None,
@@ -394,19 +385,20 @@ impl FileOpener for CsvOpener {
394385
..Default::default()
395386
};
396387

397-
let result = store.get_opts(file_meta.location(), options).await?;
388+
let result = store.get_opts(&file.object_meta.location, options).await?;
398389

399390
match result.payload {
400391
#[cfg(not(target_arch = "wasm32"))]
401-
GetResultPayload::File(mut file, _) => {
402-
let is_whole_file_scanned = file_meta.range.is_none();
392+
GetResultPayload::File(mut result_file, _) => {
393+
let is_whole_file_scanned = file.range.is_none();
403394
let decoder = if is_whole_file_scanned {
404395
// Don't seek if no range as breaks FIFO files
405-
file_compression_type.convert_read(file)?
396+
file_compression_type.convert_read(result_file)?
406397
} else {
407-
file.seek(SeekFrom::Start(result.range.start as _))?;
398+
result_file.seek(SeekFrom::Start(result.range.start as _))?;
408399
file_compression_type.convert_read(
409-
file.take((result.range.end - result.range.start) as u64),
400+
result_file
401+
.take((result.range.end - result.range.start) as u64),
410402
)?
411403
};
412404

datafusion/datasource-json/src/source.rs

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ use datafusion_common::error::{DataFusionError, Result};
2828
use datafusion_common_runtime::JoinSet;
2929
use datafusion_datasource::decoder::{deserialize_stream, DecoderDeserializer};
3030
use datafusion_datasource::file_compression_type::FileCompressionType;
31-
use datafusion_datasource::file_meta::FileMeta;
3231
use datafusion_datasource::file_stream::{FileOpenFuture, FileOpener};
3332
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
3433
use datafusion_datasource::{
@@ -177,20 +176,13 @@ impl FileOpener for JsonOpener {
177176
/// 1. The first line of the partition is the line in which the index of the first character >= `start`.
178177
/// 2. The last line of the partition is the line in which the byte at position `end - 1` resides.
179178
fn open(&self, file: PartitionedFile) -> Result<FileOpenFuture> {
180-
let file_meta = FileMeta {
181-
object_meta: file.object_meta.clone(),
182-
range: file.range.clone(),
183-
extensions: file.extensions.clone(),
184-
metadata_size_hint: file.metadata_size_hint,
185-
};
186-
187179
let store = Arc::clone(&self.object_store);
188180
let schema = Arc::clone(&self.projected_schema);
189181
let batch_size = self.batch_size;
190182
let file_compression_type = self.file_compression_type.to_owned();
191183

192184
Ok(Box::pin(async move {
193-
let calculated_range = calculate_range(&file_meta, &store, None).await?;
185+
let calculated_range = calculate_range(&file, &store, None).await?;
194186

195187
let range = match calculated_range {
196188
RangeCalculation::Range(None) => None,
@@ -207,17 +199,18 @@ impl FileOpener for JsonOpener {
207199
..Default::default()
208200
};
209201

210-
let result = store.get_opts(file_meta.location(), options).await?;
202+
let result = store.get_opts(&file.object_meta.location, options).await?;
211203

212204
match result.payload {
213205
#[cfg(not(target_arch = "wasm32"))]
214-
GetResultPayload::File(mut file, _) => {
215-
let bytes = match file_meta.range {
216-
None => file_compression_type.convert_read(file)?,
206+
GetResultPayload::File(mut result_file, _) => {
207+
let bytes = match file.range {
208+
None => file_compression_type.convert_read(result_file)?,
217209
Some(_) => {
218-
file.seek(SeekFrom::Start(result.range.start as _))?;
210+
result_file.seek(SeekFrom::Start(result.range.start as _))?;
219211
let limit = result.range.end - result.range.start;
220-
file_compression_type.convert_read(file.take(limit as u64))?
212+
file_compression_type
213+
.convert_read(result_file.take(limit as u64))?
221214
}
222215
};
223216

datafusion/datasource/src/mod.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ use chrono::TimeZone;
5454
use datafusion_common::stats::Precision;
5555
use datafusion_common::{exec_datafusion_err, ColumnStatistics, Result};
5656
use datafusion_common::{ScalarValue, Statistics};
57-
use file_meta::FileMeta;
5857
use futures::{Stream, StreamExt};
5958
use object_store::{path::Path, ObjectMeta};
6059
use object_store::{GetOptions, GetRange, ObjectStore};
@@ -258,15 +257,15 @@ pub enum RangeCalculation {
258257
///
259258
/// Returns an `Error` if any part of the range calculation fails, such as issues in reading from the object store or invalid range boundaries.
260259
pub async fn calculate_range(
261-
file_meta: &FileMeta,
260+
file: &PartitionedFile,
262261
store: &Arc<dyn ObjectStore>,
263262
terminator: Option<u8>,
264263
) -> Result<RangeCalculation> {
265-
let location = file_meta.location();
266-
let file_size = file_meta.object_meta.size;
264+
let location = &file.object_meta.location;
265+
let file_size = file.object_meta.size;
267266
let newline = terminator.unwrap_or(b'\n');
268267

269-
match file_meta.range {
268+
match file.range {
270269
None => Ok(RangeCalculation::Range(None)),
271270
Some(FileRange { start, end }) => {
272271
let start: u64 = start.try_into().map_err(|_| {

0 commit comments

Comments
 (0)