Skip to content

Commit 34993f2

Browse files
authored
Always apply per-file schema during parquet read (#18)
1 parent ad627d8 commit 34993f2

File tree

1 file changed

+41
-20
lines changed

1 file changed

+41
-20
lines changed

datafusion/datasource-parquet/src/opener.rs

Lines changed: 41 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ impl FileOpener for ParquetOpener {
9191

9292
let metadata_size_hint = file_meta.metadata_size_hint.or(self.metadata_size_hint);
9393

94-
let mut reader: Box<dyn AsyncFileReader> =
94+
let mut async_file_reader: Box<dyn AsyncFileReader> =
9595
self.parquet_file_reader_factory.create_reader(
9696
self.partition_index,
9797
file_meta,
@@ -121,23 +121,40 @@ impl FileOpener for ParquetOpener {
121121
let enable_page_index = self.enable_page_index;
122122

123123
Ok(Box::pin(async move {
124-
// Don't load the page index yet - we will decide later if we need it
125-
let options = ArrowReaderOptions::new().with_page_index(false);
126-
124+
// Don't load the page index yet. Since it is not stored inline in
125+
// the footer, loading the page index if it is not needed will do
126+
// unecessary I/O. We decide later if it is needed to evaluate the
127+
// pruning predicates. Thus default to not requesting if from the
128+
// underlying reader.
129+
let mut options = ArrowReaderOptions::new().with_page_index(false);
127130
let mut metadata_timer = file_metrics.metadata_load_time.timer();
128-
let mut metadata =
129-
ArrowReaderMetadata::load_async(&mut reader, options.clone()).await?;
131+
132+
// Begin by loading the metadata from the underlying reader (note
133+
// the returned metadata may actually include page indexes as some
134+
// readers may return page indexes even when not requested -- for
135+
// example when they are cached)
136+
let mut reader_metadata =
137+
ArrowReaderMetadata::load_async(&mut async_file_reader, options.clone())
138+
.await?;
139+
130140
// Note about schemas: we are actually dealing with **3 different schemas** here:
131141
// - The table schema as defined by the TableProvider. This is what the user sees, what they get when they `SELECT * FROM table`, etc.
132142
// - The "virtual" file schema: this is the table schema minus any hive partition columns and projections. This is what the file schema is coerced to.
133143
// - The physical file schema: this is the schema as defined by the parquet file. This is what the parquet file actually contains.
134-
let mut physical_file_schema = Arc::clone(metadata.schema());
144+
let mut physical_file_schema = Arc::clone(reader_metadata.schema());
135145

136-
// read with view types
146+
// The schema loaded from the file may not be the same as the
147+
// desired schema (for example if we want to instruct the parquet
148+
// reader to read strings using Utf8View instead). Update if necessary
137149
if let Some(merged) =
138150
apply_file_schema_type_coercions(&table_schema, &physical_file_schema)
139151
{
140152
physical_file_schema = Arc::new(merged);
153+
options = options.with_schema(Arc::clone(&physical_file_schema));
154+
reader_metadata = ArrowReaderMetadata::try_new(
155+
Arc::clone(reader_metadata.metadata()),
156+
options.clone(),
157+
)?;
141158
}
142159

143160
// Build predicates for this specific file
@@ -147,23 +164,25 @@ impl FileOpener for ParquetOpener {
147164
&predicate_creation_errors,
148165
);
149166

150-
// Now check if we should load the page index
167+
// The page index is not stored inline in the parquet footer so the
168+
// code above may not have raed the page index structures yet. If we
169+
// need them for reading and they aren't yet loaded, we need to load them now.
151170
if should_enable_page_index(enable_page_index, &page_pruning_predicate) {
152-
metadata = load_page_index(
153-
metadata,
154-
&mut reader,
171+
reader_metadata = load_page_index(
172+
reader_metadata,
173+
&mut async_file_reader,
155174
// Since we're manually loading the page index the option here should not matter but we pass it in for consistency
156-
ArrowReaderOptions::new()
157-
.with_page_index(true)
158-
.with_schema(Arc::clone(&physical_file_schema)),
175+
options.with_page_index(true),
159176
)
160177
.await?;
161178
}
162179

163180
metadata_timer.stop();
164181

165-
let mut builder =
166-
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata);
182+
let mut builder = ParquetRecordBatchStreamBuilder::new_with_metadata(
183+
async_file_reader,
184+
reader_metadata,
185+
);
167186

168187
let (schema_mapping, adapted_projections) =
169188
schema_adapter.map_schema(&physical_file_schema)?;
@@ -372,12 +391,14 @@ fn build_pruning_predicates(
372391
(pruning_predicate, Some(page_pruning_predicate))
373392
}
374393

394+
/// Returns a `ArrowReaderMetadata` with the page index loaded, loading
395+
/// it from the underlying `AsyncFileReader` if necessary.
375396
async fn load_page_index<T: AsyncFileReader>(
376-
arrow_reader: ArrowReaderMetadata,
397+
reader_metadata: ArrowReaderMetadata,
377398
input: &mut T,
378399
options: ArrowReaderOptions,
379400
) -> Result<ArrowReaderMetadata> {
380-
let parquet_metadata = arrow_reader.metadata();
401+
let parquet_metadata = reader_metadata.metadata();
381402
let missing_column_index = parquet_metadata.column_index().is_none();
382403
let missing_offset_index = parquet_metadata.offset_index().is_none();
383404
// You may ask yourself: why are we even checking if the page index is already loaded here?
@@ -397,6 +418,6 @@ async fn load_page_index<T: AsyncFileReader>(
397418
Ok(new_arrow_reader)
398419
} else {
399420
// No need to load the page index again, just return the existing metadata
400-
Ok(arrow_reader)
421+
Ok(reader_metadata)
401422
}
402423
}

0 commit comments

Comments
 (0)