From 5804f21d7043f55e6e8775c17a3d14f6f413fc27 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Thu, 26 Sep 2024 12:38:04 -0700 Subject: [PATCH 1/6] Minor: Silence compiler warnings for `parquet::file::metadata::reader` (#6457) * silence compiler warnings * add async_tests module --- parquet/src/file/metadata/reader.rs | 32 ++++++++++++++++------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 4a5a1bc93c4f..9e00c6860434 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -638,15 +638,6 @@ impl ParquetMetaDataReader { mod tests { use super::*; use bytes::Bytes; - #[cfg(feature = "async")] - use futures::future::BoxFuture; - #[cfg(feature = "async")] - use futures::FutureExt; - use std::fs::File; - #[cfg(feature = "async")] - use std::future::Future; - use std::io::{Read, Seek, SeekFrom}; - use std::sync::atomic::{AtomicUsize, Ordering}; use crate::basic::SortOrder; use crate::basic::Type; @@ -824,11 +815,27 @@ mod tests { "EOF: Parquet file too small. Size is 1728 but need 1729" ); } +} + +#[cfg(feature = "async")] +#[cfg(test)] +mod async_tests { + use super::*; + use bytes::Bytes; + use futures::future::BoxFuture; + use futures::FutureExt; + use std::fs::File; + use std::future::Future; + use std::io::{Read, Seek, SeekFrom}; + use std::ops::Range; + use std::sync::atomic::{AtomicUsize, Ordering}; + + use crate::arrow::async_reader::MetadataFetch; + use crate::file::reader::Length; + use crate::util::test_common::file_util::get_test_file; - #[cfg(feature = "async")] struct MetadataFetchFn(F); - #[cfg(feature = "async")] impl MetadataFetch for MetadataFetchFn where F: FnMut(Range) -> Fut + Send, @@ -839,7 +846,6 @@ mod tests { } } - #[cfg(feature = "async")] fn read_range(file: &mut File, range: Range) -> Result { file.seek(SeekFrom::Start(range.start as _))?; let len = range.end - range.start; @@ -848,7 +854,6 @@ mod tests { Ok(buf.into()) } - #[cfg(feature = "async")] #[tokio::test] async fn test_simple() { let mut file = get_test_file("nulls.snappy.parquet"); @@ -934,7 +939,6 @@ mod tests { assert_eq!(err, "Parquet error: Invalid Parquet file. Corrupt footer"); } - #[cfg(feature = "async")] #[tokio::test] async fn test_page_index() { let mut file = get_test_file("alltypes_tiny_pages.parquet"); From ebcc4a585136cd1d9696c38c41f71c9ced181f57 Mon Sep 17 00:00:00 2001 From: Enrico Minack Date: Fri, 27 Sep 2024 10:47:30 +0200 Subject: [PATCH 2/6] Align buffers in ArrayData.child_data as well (#6462) --- arrow-data/src/data.rs | 49 ++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs index 8c9e002e219b..8af2a91cf159 100644 --- a/arrow-data/src/data.rs +++ b/arrow-data/src/data.rs @@ -693,15 +693,21 @@ impl ArrayData { /// /// This can be useful for when interacting with data sent over IPC or FFI, that may /// not meet the minimum alignment requirements + /// + /// This also aligns buffers of children data pub fn align_buffers(&mut self) { let layout = layout(&self.data_type); for (buffer, spec) in self.buffers.iter_mut().zip(&layout.buffers) { if let BufferSpec::FixedWidth { alignment, .. } = spec { if buffer.as_ptr().align_offset(*alignment) != 0 { - *buffer = Buffer::from_slice_ref(buffer.as_ref()) + *buffer = Buffer::from_slice_ref(buffer.as_ref()); } } } + // align children data recursively + for data in self.child_data.iter_mut() { + data.align_buffers() + } } /// "cheap" validation of an `ArrayData`. Ensures buffers are @@ -1961,7 +1967,7 @@ impl From for ArrayDataBuilder { #[cfg(test)] mod tests { use super::*; - use arrow_schema::Field; + use arrow_schema::{Field, Fields}; // See arrow/tests/array_data_validation.rs for test of array validation @@ -2224,6 +2230,7 @@ mod tests { }; data.validate_full().unwrap(); + // break alignment in data data.buffers[0] = sliced; let err = data.validate().unwrap_err(); @@ -2236,6 +2243,44 @@ mod tests { data.validate_full().unwrap(); } + #[test] + fn test_alignment_struct() { + let buffer = Buffer::from_vec(vec![1_i32, 2_i32, 3_i32]); + let sliced = buffer.slice(1); + + let child_data = ArrayData { + data_type: DataType::Int32, + len: 0, + offset: 0, + buffers: vec![buffer], + child_data: vec![], + nulls: None, + }; + + let schema = DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, false)])); + let mut data = ArrayData { + data_type: schema, + len: 0, + offset: 0, + buffers: vec![], + child_data: vec![child_data], + nulls: None, + }; + data.validate_full().unwrap(); + + // break alignment in child data + data.child_data[0].buffers[0] = sliced; + let err = data.validate().unwrap_err(); + + assert_eq!( + err.to_string(), + "Invalid argument error: Misaligned buffers[0] in array of type Int32, offset from expected alignment of 4 by 1" + ); + + data.align_buffers(); + data.validate_full().unwrap(); + } + #[test] fn test_null_view_types() { let array_len = 32; From 8e0aaadb98a55152259b462ce90ac5f9307a0a41 Mon Sep 17 00:00:00 2001 From: Makro <4398091+xmakro@users.noreply.github.com> Date: Sat, 28 Sep 2024 01:41:02 -0700 Subject: [PATCH 3/6] Parquet: Verify 32-bit CRC checksum when decoding pages (#6290) * Parquet: Verify 32-bit CRC checksum when decoding pages * Undo cargo toml * a * enable crc by default * a * Address comments * Add tests that verify crc checks * Document feature flag * Move documentation around * Update parquet/Cargo.toml Co-authored-by: Ed Seidl * Update parquet/src/file/serialized_reader.rs Co-authored-by: Ed Seidl * Add license * Run cargo +stable fmt --all * Revert MD034 * Applye readme suggestion --------- Co-authored-by: xmakro Co-authored-by: Ed Seidl --- parquet/Cargo.toml | 3 ++ parquet/README.md | 3 +- parquet/src/file/serialized_reader.rs | 9 ++++ parquet/tests/arrow_reader/checksum.rs | 73 ++++++++++++++++++++++++++ parquet/tests/arrow_reader/mod.rs | 2 + 5 files changed, 89 insertions(+), 1 deletion(-) create mode 100644 parquet/tests/arrow_reader/checksum.rs diff --git a/parquet/Cargo.toml b/parquet/Cargo.toml index b97b2a571646..1d38e67a0f02 100644 --- a/parquet/Cargo.toml +++ b/parquet/Cargo.toml @@ -68,6 +68,7 @@ twox-hash = { version = "1.6", default-features = false } paste = { version = "1.0" } half = { version = "2.1", default-features = false, features = ["num-traits"] } sysinfo = { version = "0.31.2", optional = true, default-features = false, features = ["system"] } +crc32fast = { version = "1.4.2", optional = true, default-features = false } [dev-dependencies] base64 = { version = "0.22", default-features = false, features = ["std"] } @@ -117,6 +118,8 @@ object_store = ["dep:object_store", "async"] zstd = ["dep:zstd", "zstd-sys"] # Display memory in example/write_parquet.rs sysinfo = ["dep:sysinfo"] +# Verify 32-bit CRC checksum when decoding parquet pages +crc = ["dep:crc32fast"] [[example]] name = "read_parquet" diff --git a/parquet/README.md b/parquet/README.md index 0360d15db14f..a0441ee6026d 100644 --- a/parquet/README.md +++ b/parquet/README.md @@ -60,6 +60,7 @@ The `parquet` crate provides the following features which may be enabled in your - `zstd` (default) - support for parquet using `zstd` compression - `snap` (default) - support for parquet using `snappy` compression - `cli` - parquet [CLI tools](https://github.com/apache/arrow-rs/tree/master/parquet/src/bin) +- `crc` - enables functionality to automatically verify checksums of each page (if present) when decoding - `experimental` - Experimental APIs which may change, even between minor releases ## Parquet Feature Status @@ -82,4 +83,4 @@ The `parquet` crate provides the following features which may be enabled in your ## License -Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0. +Licensed under the Apache License, Version 2.0: . diff --git a/parquet/src/file/serialized_reader.rs b/parquet/src/file/serialized_reader.rs index 6fb0f78c1613..b253b73a4fa0 100644 --- a/parquet/src/file/serialized_reader.rs +++ b/parquet/src/file/serialized_reader.rs @@ -390,6 +390,15 @@ pub(crate) fn decode_page( physical_type: Type, decompressor: Option<&mut Box>, ) -> Result { + // Verify the 32-bit CRC checksum of the page + #[cfg(feature = "crc")] + if let Some(expected_crc) = page_header.crc { + let crc = crc32fast::hash(&buffer); + if crc != expected_crc as u32 { + return Err(general_err!("Page CRC checksum mismatch")); + } + } + // When processing data page v2, depending on enabled compression for the // page, we should account for uncompressed data ('offset') of // repetition and definition levels. diff --git a/parquet/tests/arrow_reader/checksum.rs b/parquet/tests/arrow_reader/checksum.rs new file mode 100644 index 000000000000..c60908d8b95d --- /dev/null +++ b/parquet/tests/arrow_reader/checksum.rs @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! This file contains an end to end test for verifying checksums when reading parquet files. + +use std::path::PathBuf; + +use arrow::util::test_util::parquet_test_data; +use parquet::arrow::arrow_reader::ArrowReaderBuilder; + +#[test] +fn test_datapage_v1_corrupt_checksum() { + let errors = read_file_batch_errors("datapage_v1-corrupt-checksum.parquet"); + assert_eq!(errors, [ + Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()), + Ok(()), + Ok(()), + Err("Parquet argument error: Parquet error: Page CRC checksum mismatch".to_string()), + Err("Parquet argument error: Parquet error: Not all children array length are the same!".to_string()) + ]); +} + +#[test] +fn test_datapage_v1_uncompressed_checksum() { + let errors = read_file_batch_errors("datapage_v1-uncompressed-checksum.parquet"); + assert_eq!(errors, [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())]); +} + +#[test] +fn test_datapage_v1_snappy_compressed_checksum() { + let errors = read_file_batch_errors("datapage_v1-snappy-compressed-checksum.parquet"); + assert_eq!(errors, [Ok(()), Ok(()), Ok(()), Ok(()), Ok(())]); +} + +#[test] +fn test_plain_dict_uncompressed_checksum() { + let errors = read_file_batch_errors("plain-dict-uncompressed-checksum.parquet"); + assert_eq!(errors, [Ok(())]); +} +#[test] +fn test_rle_dict_snappy_checksum() { + let errors = read_file_batch_errors("rle-dict-snappy-checksum.parquet"); + assert_eq!(errors, [Ok(())]); +} + +/// Reads a file and returns a vector with one element per record batch. +/// The record batch data is replaced with () and errors are stringified. +fn read_file_batch_errors(name: &str) -> Vec> { + let path = PathBuf::from(parquet_test_data()).join(name); + println!("Reading file: {:?}", path); + let file = std::fs::File::open(&path).unwrap(); + let reader = ArrowReaderBuilder::try_new(file).unwrap().build().unwrap(); + reader + .map(|x| match x { + Ok(_) => Ok(()), + Err(e) => Err(e.to_string()), + }) + .collect() +} diff --git a/parquet/tests/arrow_reader/mod.rs b/parquet/tests/arrow_reader/mod.rs index cc4c8f3c977b..0e6783583cd5 100644 --- a/parquet/tests/arrow_reader/mod.rs +++ b/parquet/tests/arrow_reader/mod.rs @@ -36,6 +36,8 @@ use std::sync::Arc; use tempfile::NamedTempFile; mod bad_data; +#[cfg(feature = "crc")] +mod checksum; mod statistics; // returns a struct array with columns "int32_col", "float32_col" and "float64_col" with the specified values From f41c258246cd4bd9d89228cded9ed54dbd00faff Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sat, 28 Sep 2024 23:49:18 +0100 Subject: [PATCH 4/6] Update tonic-build requirement from =0.12.2 to =0.12.3 (#6473) * Update tonic-build requirement from =0.12.2 to =0.12.3 Updates the requirements on [tonic-build](https://github.com/hyperium/tonic) to permit the latest version. - [Release notes](https://github.com/hyperium/tonic/releases) - [Changelog](https://github.com/hyperium/tonic/blob/master/CHANGELOG.md) - [Commits](https://github.com/hyperium/tonic/compare/v0.12.2...v0.12.3) --- updated-dependencies: - dependency-name: tonic-build dependency-type: direct:production ... Signed-off-by: dependabot[bot] * regenerate vendored code --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Andrew Lamb --- arrow-flight/gen/Cargo.toml | 2 +- arrow-flight/src/arrow.flight.protocol.rs | 70 ++++++++++++----------- 2 files changed, 38 insertions(+), 34 deletions(-) diff --git a/arrow-flight/gen/Cargo.toml b/arrow-flight/gen/Cargo.toml index 08b53b729738..aa33f1f49096 100644 --- a/arrow-flight/gen/Cargo.toml +++ b/arrow-flight/gen/Cargo.toml @@ -34,4 +34,4 @@ publish = false # (and checked in) arrow.flight.protocol.rs from changing proc-macro2 = { version = "=1.0.86", default-features = false } prost-build = { version = "=0.13.3", default-features = false } -tonic-build = { version = "=0.12.2", default-features = false, features = ["transport", "prost"] } +tonic-build = { version = "=0.12.3", default-features = false, features = ["transport", "prost"] } diff --git a/arrow-flight/src/arrow.flight.protocol.rs b/arrow-flight/src/arrow.flight.protocol.rs index f1eb549d54aa..0cd4f6948b77 100644 --- a/arrow-flight/src/arrow.flight.protocol.rs +++ b/arrow-flight/src/arrow.flight.protocol.rs @@ -417,7 +417,13 @@ impl CancelStatus { } /// Generated client implementations. pub mod flight_service_client { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] use tonic::codegen::*; use tonic::codegen::http::Uri; /// @@ -521,8 +527,7 @@ pub mod flight_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -555,8 +560,7 @@ pub mod flight_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -590,8 +594,7 @@ pub mod flight_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -640,8 +643,7 @@ pub mod flight_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -672,8 +674,7 @@ pub mod flight_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -704,8 +705,7 @@ pub mod flight_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -736,8 +736,7 @@ pub mod flight_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -767,8 +766,7 @@ pub mod flight_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -801,8 +799,7 @@ pub mod flight_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -832,8 +829,7 @@ pub mod flight_service_client { .ready() .await .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, + tonic::Status::unknown( format!("Service was not ready: {}", e.into()), ) })?; @@ -852,7 +848,13 @@ pub mod flight_service_client { } /// Generated server implementations. pub mod flight_service_server { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] use tonic::codegen::*; /// Generated trait containing gRPC methods that should be implemented for use with FlightServiceServer. #[async_trait] @@ -1569,17 +1571,19 @@ pub mod flight_service_server { } _ => { Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", tonic::Code::Unimplemented as i32) - .header( - http::header::CONTENT_TYPE, - tonic::metadata::GRPC_CONTENT_TYPE, - ) - .body(empty_body()) - .unwrap(), - ) + let mut response = http::Response::new(empty_body()); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) }) } } From f0e39cc3cb2f61bbd8294119f4ad9f39545b99f4 Mon Sep 17 00:00:00 2001 From: Mustafa Akur <33904309+akurmustafa@users.noreply.github.com> Date: Sun, 29 Sep 2024 02:54:17 -0700 Subject: [PATCH 5/6] Add take_arrays util for getting entries from 2d arrays (#6475) * Add take_arrays util function * Update comments * Minor changes --- arrow-select/src/take.rs | 59 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/arrow-select/src/take.rs b/arrow-select/src/take.rs index ed7179fd36ce..6d037fc41984 100644 --- a/arrow-select/src/take.rs +++ b/arrow-select/src/take.rs @@ -94,6 +94,65 @@ pub fn take( } } +/// For each [ArrayRef] in the [`Vec`], take elements by index and create a new +/// [`Vec`] from those indices. +/// +/// ```text +/// ┌────────┬────────┐ +/// │ │ │ ┌────────┐ ┌────────┬────────┐ +/// │ A │ 1 │ │ │ │ │ │ +/// ├────────┼────────┤ │ 0 │ │ A │ 1 │ +/// │ │ │ ├────────┤ ├────────┼────────┤ +/// │ D │ 4 │ │ │ │ │ │ +/// ├────────┼────────┤ │ 2 │ take_arrays(values,indices) │ B │ 2 │ +/// │ │ │ ├────────┤ ├────────┼────────┤ +/// │ B │ 2 │ │ │ ───────────────────────────► │ │ │ +/// ├────────┼────────┤ │ 3 │ │ C │ 3 │ +/// │ │ │ ├────────┤ ├────────┼────────┤ +/// │ C │ 3 │ │ │ │ │ │ +/// ├────────┼────────┤ │ 1 │ │ D │ 4 │ +/// │ │ │ └────────┘ └────────┼────────┘ +/// │ E │ 5 │ +/// └────────┴────────┘ +/// values arrays indices array result +/// ``` +/// +/// # Errors +/// This function errors whenever: +/// * An index cannot be casted to `usize` (typically 32 bit architectures) +/// * An index is out of bounds and `options` is set to check bounds. +/// +/// # Safety +/// +/// When `options` is not set to check bounds, taking indexes after `len` will panic. +/// +/// # Examples +/// ``` +/// # use std::sync::Arc; +/// # use arrow_array::{StringArray, UInt32Array, cast::AsArray}; +/// # use arrow_select::take::{take, take_arrays}; +/// let string_values = Arc::new(StringArray::from(vec!["zero", "one", "two"])); +/// let values = Arc::new(UInt32Array::from(vec![0, 1, 2])); +/// +/// // Take items at index 2, and 1: +/// let indices = UInt32Array::from(vec![2, 1]); +/// let taken_arrays = take_arrays(&[string_values, values], &indices, None).unwrap(); +/// let taken_string = taken_arrays[0].as_string::(); +/// assert_eq!(*taken_string, StringArray::from(vec!["two", "one"])); +/// let taken_values = taken_arrays[1].as_primitive(); +/// assert_eq!(*taken_values, UInt32Array::from(vec![2, 1])); +/// ``` +pub fn take_arrays( + arrays: &[ArrayRef], + indices: &dyn Array, + options: Option, +) -> Result, ArrowError> { + arrays + .iter() + .map(|array| take(array.as_ref(), indices, options.clone())) + .collect() +} + /// Verifies that the non-null values of `indices` are all `< len` fn check_bounds( len: usize, From 3293a8c2f9062fca93bee2210d540a1d25155bf5 Mon Sep 17 00:00:00 2001 From: Ed Seidl Date: Mon, 30 Sep 2024 11:11:22 -0700 Subject: [PATCH 6/6] Deprecate `MetadataLoader` (#6474) * deprecate MetadataLoader * change signature of the load functions * fix up fetch_parquet_metadata * can now use self.meta.size directly * revert changes to load API * revert change to test code --- parquet/src/arrow/async_reader/metadata.rs | 12 +++++- parquet/src/arrow/async_reader/mod.rs | 8 ++-- parquet/src/arrow/async_reader/store.rs | 17 ++++---- parquet/src/file/metadata/reader.rs | 48 ++++++++++------------ parquet/src/file/metadata/writer.rs | 12 +++--- parquet/tests/arrow_reader/bad_data.rs | 17 ++++---- 6 files changed, 60 insertions(+), 54 deletions(-) diff --git a/parquet/src/arrow/async_reader/metadata.rs b/parquet/src/arrow/async_reader/metadata.rs index cd45d2abdbcd..b7fac6fe7c05 100644 --- a/parquet/src/arrow/async_reader/metadata.rs +++ b/parquet/src/arrow/async_reader/metadata.rs @@ -52,6 +52,7 @@ impl MetadataLoader { /// Create a new [`MetadataLoader`] by reading the footer information /// /// See [`fetch_parquet_metadata`] for the meaning of the individual parameters + #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")] pub async fn load(mut fetch: F, file_size: usize, prefetch: Option) -> Result { if file_size < FOOTER_SIZE { return Err(ParquetError::EOF(format!( @@ -108,6 +109,7 @@ impl MetadataLoader { } /// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`] + #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")] pub fn new(fetch: F, metadata: ParquetMetaData) -> Self { Self { fetch, @@ -120,6 +122,7 @@ impl MetadataLoader { /// /// * `column_index`: if true will load column index /// * `offset_index`: if true will load offset index + #[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")] pub async fn load_page_index(&mut self, column_index: bool, offset_index: bool) -> Result<()> { if !column_index && !offset_index { return Ok(()); @@ -226,6 +229,7 @@ where /// in the first request, instead of 8, and only issue further requests /// if additional bytes are needed. Providing a `prefetch` hint can therefore /// significantly reduce the number of `fetch` requests, and consequently latency +#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")] pub async fn fetch_parquet_metadata( fetch: F, file_size: usize, @@ -236,10 +240,14 @@ where Fut: Future> + Send, { let fetch = MetadataFetchFn(fetch); - let loader = MetadataLoader::load(fetch, file_size, prefetch).await?; - Ok(loader.finish()) + ParquetMetaDataReader::new() + .with_prefetch_hint(prefetch) + .load_and_finish(fetch, file_size) + .await } +// these tests are all replicated in parquet::file::metadata::reader +#[allow(deprecated)] #[cfg(test)] mod tests { use super::*; diff --git a/parquet/src/arrow/async_reader/mod.rs b/parquet/src/arrow/async_reader/mod.rs index 89e4d6adb552..5e8bdbc02eb1 100644 --- a/parquet/src/arrow/async_reader/mod.rs +++ b/parquet/src/arrow/async_reader/mod.rs @@ -212,6 +212,8 @@ impl ArrowReaderMetadata { input: &mut T, options: ArrowReaderOptions, ) -> Result { + // TODO: this is all rather awkward. It would be nice if AsyncFileReader::get_metadata + // took an argument to fetch the page indexes. let mut metadata = input.get_metadata().await?; if options.page_index @@ -219,9 +221,9 @@ impl ArrowReaderMetadata { && metadata.offset_index().is_none() { let m = Arc::try_unwrap(metadata).unwrap_or_else(|e| e.as_ref().clone()); - let mut loader = MetadataLoader::new(input, m); - loader.load_page_index(true, true).await?; - metadata = Arc::new(loader.finish()) + let mut reader = ParquetMetaDataReader::new_with_metadata(m).with_page_indexes(true); + reader.load_page_index(input).await?; + metadata = Arc::new(reader.finish()?) } Self::try_new(metadata, options) } diff --git a/parquet/src/arrow/async_reader/store.rs b/parquet/src/arrow/async_reader/store.rs index 77c00e91a3aa..e6b47856ebe8 100644 --- a/parquet/src/arrow/async_reader/store.rs +++ b/parquet/src/arrow/async_reader/store.rs @@ -24,9 +24,9 @@ use futures::{FutureExt, TryFutureExt}; use object_store::{ObjectMeta, ObjectStore}; -use crate::arrow::async_reader::{AsyncFileReader, MetadataLoader}; +use crate::arrow::async_reader::AsyncFileReader; use crate::errors::Result; -use crate::file::metadata::ParquetMetaData; +use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader}; /// Reads Parquet files in object storage using [`ObjectStore`]. /// @@ -124,15 +124,14 @@ impl AsyncFileReader for ParquetObjectReader { fn get_metadata(&mut self) -> BoxFuture<'_, Result>> { Box::pin(async move { - let preload_column_index = self.preload_column_index; - let preload_offset_index = self.preload_offset_index; let file_size = self.meta.size; - let prefetch = self.metadata_size_hint; - let mut loader = MetadataLoader::load(self, file_size, prefetch).await?; - loader - .load_page_index(preload_column_index, preload_offset_index) + let metadata = ParquetMetaDataReader::new() + .with_column_indexes(self.preload_column_index) + .with_offset_indexes(self.preload_offset_index) + .with_prefetch_hint(self.metadata_size_hint) + .load_and_finish(self, file_size) .await?; - Ok(Arc::new(loader.finish())) + Ok(Arc::new(metadata)) }) } } diff --git a/parquet/src/file/metadata/reader.rs b/parquet/src/file/metadata/reader.rs index 9e00c6860434..3fd2bd76f6b8 100644 --- a/parquet/src/file/metadata/reader.rs +++ b/parquet/src/file/metadata/reader.rs @@ -329,13 +329,18 @@ impl ParquetMetaDataReader { return Ok(()); } - self.load_page_index(fetch, remainder).await + self.load_page_index_with_remainder(fetch, remainder).await } /// Asynchronously fetch the page index structures when a [`ParquetMetaData`] has already /// been obtained. See [`Self::new_with_metadata()`]. #[cfg(feature = "async")] - pub async fn load_page_index( + pub async fn load_page_index(&mut self, fetch: F) -> Result<()> { + self.load_page_index_with_remainder(fetch, None).await + } + + #[cfg(feature = "async")] + async fn load_page_index_with_remainder( &mut self, mut fetch: F, remainder: Option<(usize, Bytes)>, @@ -836,7 +841,7 @@ mod async_tests { struct MetadataFetchFn(F); - impl MetadataFetch for MetadataFetchFn + impl<'a, F, Fut> MetadataFetch for &'a mut MetadataFetchFn where F: FnMut(Range) -> Fut + Send, Fut: Future> + Send, @@ -865,14 +870,14 @@ mod async_tests { let expected = expected.file_metadata().schema(); let fetch_count = AtomicUsize::new(0); - let mut fetch = |range| { + let fetch = |range| { fetch_count.fetch_add(1, Ordering::SeqCst); futures::future::ready(read_range(&mut file, range)) }; - let input = MetadataFetchFn(&mut fetch); + let mut f = MetadataFetchFn(fetch); let actual = ParquetMetaDataReader::new() - .load_and_finish(input, len) + .load_and_finish(&mut f, len) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); @@ -880,10 +885,9 @@ mod async_tests { // Metadata hint too small - below footer size fetch_count.store(0, Ordering::SeqCst); - let input = MetadataFetchFn(&mut fetch); let actual = ParquetMetaDataReader::new() .with_prefetch_hint(Some(7)) - .load_and_finish(input, len) + .load_and_finish(&mut f, len) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); @@ -891,10 +895,9 @@ mod async_tests { // Metadata hint too small fetch_count.store(0, Ordering::SeqCst); - let input = MetadataFetchFn(&mut fetch); let actual = ParquetMetaDataReader::new() .with_prefetch_hint(Some(10)) - .load_and_finish(input, len) + .load_and_finish(&mut f, len) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); @@ -902,10 +905,9 @@ mod async_tests { // Metadata hint too large fetch_count.store(0, Ordering::SeqCst); - let input = MetadataFetchFn(&mut fetch); let actual = ParquetMetaDataReader::new() .with_prefetch_hint(Some(500)) - .load_and_finish(input, len) + .load_and_finish(&mut f, len) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); @@ -913,26 +915,23 @@ mod async_tests { // Metadata hint exactly correct fetch_count.store(0, Ordering::SeqCst); - let input = MetadataFetchFn(&mut fetch); let actual = ParquetMetaDataReader::new() .with_prefetch_hint(Some(428)) - .load_and_finish(input, len) + .load_and_finish(&mut f, len) .await .unwrap(); assert_eq!(actual.file_metadata().schema(), expected); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); - let input = MetadataFetchFn(&mut fetch); let err = ParquetMetaDataReader::new() - .load_and_finish(input, 4) + .load_and_finish(&mut f, 4) .await .unwrap_err() .to_string(); assert_eq!(err, "EOF: file size of 4 is less than footer"); - let input = MetadataFetchFn(&mut fetch); let err = ParquetMetaDataReader::new() - .load_and_finish(input, 20) + .load_and_finish(&mut f, 20) .await .unwrap_err() .to_string(); @@ -949,42 +948,39 @@ mod async_tests { futures::future::ready(read_range(&mut file, range)) }; - let f = MetadataFetchFn(&mut fetch); + let mut f = MetadataFetchFn(&mut fetch); let mut loader = ParquetMetaDataReader::new().with_page_indexes(true); - loader.try_load(f, len).await.unwrap(); + loader.try_load(&mut f, len).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 3); let metadata = loader.finish().unwrap(); assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); // Prefetch just footer exactly fetch_count.store(0, Ordering::SeqCst); - let f = MetadataFetchFn(&mut fetch); let mut loader = ParquetMetaDataReader::new() .with_page_indexes(true) .with_prefetch_hint(Some(1729)); - loader.try_load(f, len).await.unwrap(); + loader.try_load(&mut f, len).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); let metadata = loader.finish().unwrap(); assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); // Prefetch more than footer but not enough fetch_count.store(0, Ordering::SeqCst); - let f = MetadataFetchFn(&mut fetch); let mut loader = ParquetMetaDataReader::new() .with_page_indexes(true) .with_prefetch_hint(Some(130649)); - loader.try_load(f, len).await.unwrap(); + loader.try_load(&mut f, len).await.unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 2); let metadata = loader.finish().unwrap(); assert!(metadata.offset_index().is_some() && metadata.column_index().is_some()); // Prefetch exactly enough fetch_count.store(0, Ordering::SeqCst); - let f = MetadataFetchFn(&mut fetch); let metadata = ParquetMetaDataReader::new() .with_page_indexes(true) .with_prefetch_hint(Some(130650)) - .load_and_finish(f, len) + .load_and_finish(&mut f, len) .await .unwrap(); assert_eq!(fetch_count.load(Ordering::SeqCst), 1); diff --git a/parquet/src/file/metadata/writer.rs b/parquet/src/file/metadata/writer.rs index db78606e42ea..44328c635fed 100644 --- a/parquet/src/file/metadata/writer.rs +++ b/parquet/src/file/metadata/writer.rs @@ -516,7 +516,7 @@ mod tests { /// Temporary function so we can test loading metadata with page indexes /// while we haven't fully figured out how to load it cleanly async fn load_metadata_from_bytes(file_size: usize, data: Bytes) -> ParquetMetaData { - use crate::arrow::async_reader::{MetadataFetch, MetadataLoader}; + use crate::arrow::async_reader::MetadataFetch; use crate::errors::Result as ParquetResult; use futures::future::BoxFuture; use futures::FutureExt; @@ -569,13 +569,11 @@ mod tests { Box::new(AsyncBytes::new(data)), file_size - metadata_length..file_size, ); - let metadata = MetadataLoader::load(&mut reader, file_size, None) + ParquetMetaDataReader::new() + .with_page_indexes(true) + .load_and_finish(&mut reader, file_size) .await - .unwrap(); - let loaded_metadata = metadata.finish(); - let mut metadata = MetadataLoader::new(&mut reader, loaded_metadata); - metadata.load_page_index(true, true).await.unwrap(); - metadata.finish() + .unwrap() } fn check_columns_are_equivalent(left: &ColumnChunkMetaData, right: &ColumnChunkMetaData) { diff --git a/parquet/tests/arrow_reader/bad_data.rs b/parquet/tests/arrow_reader/bad_data.rs index a73864070d9f..e2975c17c8b9 100644 --- a/parquet/tests/arrow_reader/bad_data.rs +++ b/parquet/tests/arrow_reader/bad_data.rs @@ -140,20 +140,23 @@ fn read_file(name: &str) -> Result { #[tokio::test] async fn bad_metadata_err() { use bytes::Bytes; - use parquet::arrow::async_reader::MetadataLoader; + use parquet::file::metadata::ParquetMetaDataReader; let metadata_buffer = Bytes::from_static(include_bytes!("bad_raw_metadata.bin")); let metadata_length = metadata_buffer.len(); let mut reader = std::io::Cursor::new(&metadata_buffer); - let mut loader = MetadataLoader::load(&mut reader, metadata_length, None) - .await - .unwrap(); - loader.load_page_index(false, false).await.unwrap(); - loader.load_page_index(false, true).await.unwrap(); + let mut loader = ParquetMetaDataReader::new(); + loader.try_load(&mut reader, metadata_length).await.unwrap(); + loader = loader.with_page_indexes(false); + loader.load_page_index(&mut reader).await.unwrap(); - let err = loader.load_page_index(true, false).await.unwrap_err(); + loader = loader.with_offset_indexes(true); + loader.load_page_index(&mut reader).await.unwrap(); + + loader = loader.with_column_indexes(true); + let err = loader.load_page_index(&mut reader).await.unwrap_err(); assert_eq!( err.to_string(),