Skip to content

Commit

Permalink
Merge branch 'master' into parquet-uuid-schema
Browse files Browse the repository at this point in the history
  • Loading branch information
mbrobbel committed Sep 30, 2024
2 parents 7896455 + 3293a8c commit 374d017
Show file tree
Hide file tree
Showing 15 changed files with 311 additions and 105 deletions.
49 changes: 47 additions & 2 deletions arrow-data/src/data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -693,15 +693,21 @@ impl ArrayData {
///
/// This can be useful for when interacting with data sent over IPC or FFI, that may
/// not meet the minimum alignment requirements
///
/// This also aligns buffers of children data
pub fn align_buffers(&mut self) {
let layout = layout(&self.data_type);
for (buffer, spec) in self.buffers.iter_mut().zip(&layout.buffers) {
if let BufferSpec::FixedWidth { alignment, .. } = spec {
if buffer.as_ptr().align_offset(*alignment) != 0 {
*buffer = Buffer::from_slice_ref(buffer.as_ref())
*buffer = Buffer::from_slice_ref(buffer.as_ref());
}
}
}
// align children data recursively
for data in self.child_data.iter_mut() {
data.align_buffers()
}
}

/// "cheap" validation of an `ArrayData`. Ensures buffers are
Expand Down Expand Up @@ -1961,7 +1967,7 @@ impl From<ArrayData> for ArrayDataBuilder {
#[cfg(test)]
mod tests {
use super::*;
use arrow_schema::Field;
use arrow_schema::{Field, Fields};

// See arrow/tests/array_data_validation.rs for test of array validation

Expand Down Expand Up @@ -2224,6 +2230,7 @@ mod tests {
};
data.validate_full().unwrap();

// break alignment in data
data.buffers[0] = sliced;
let err = data.validate().unwrap_err();

Expand All @@ -2236,6 +2243,44 @@ mod tests {
data.validate_full().unwrap();
}

#[test]
fn test_alignment_struct() {
let buffer = Buffer::from_vec(vec![1_i32, 2_i32, 3_i32]);
let sliced = buffer.slice(1);

let child_data = ArrayData {
data_type: DataType::Int32,
len: 0,
offset: 0,
buffers: vec![buffer],
child_data: vec![],
nulls: None,
};

let schema = DataType::Struct(Fields::from(vec![Field::new("a", DataType::Int32, false)]));
let mut data = ArrayData {
data_type: schema,
len: 0,
offset: 0,
buffers: vec![],
child_data: vec![child_data],
nulls: None,
};
data.validate_full().unwrap();

// break alignment in child data
data.child_data[0].buffers[0] = sliced;
let err = data.validate().unwrap_err();

assert_eq!(
err.to_string(),
"Invalid argument error: Misaligned buffers[0] in array of type Int32, offset from expected alignment of 4 by 1"
);

data.align_buffers();
data.validate_full().unwrap();
}

#[test]
fn test_null_view_types() {
let array_len = 32;
Expand Down
2 changes: 1 addition & 1 deletion arrow-flight/gen/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,4 @@ publish = false
# (and checked in) arrow.flight.protocol.rs from changing
proc-macro2 = { version = "=1.0.86", default-features = false }
prost-build = { version = "=0.13.3", default-features = false }
tonic-build = { version = "=0.12.2", default-features = false, features = ["transport", "prost"] }
tonic-build = { version = "=0.12.3", default-features = false, features = ["transport", "prost"] }
70 changes: 37 additions & 33 deletions arrow-flight/src/arrow.flight.protocol.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 59 additions & 0 deletions arrow-select/src/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,65 @@ pub fn take(
}
}

/// For each [ArrayRef] in the [`Vec<ArrayRef>`], take elements by index and create a new
/// [`Vec<ArrayRef>`] from those indices.
///
/// ```text
/// ┌────────┬────────┐
/// │ │ │ ┌────────┐ ┌────────┬────────┐
/// │ A │ 1 │ │ │ │ │ │
/// ├────────┼────────┤ │ 0 │ │ A │ 1 │
/// │ │ │ ├────────┤ ├────────┼────────┤
/// │ D │ 4 │ │ │ │ │ │
/// ├────────┼────────┤ │ 2 │ take_arrays(values,indices) │ B │ 2 │
/// │ │ │ ├────────┤ ├────────┼────────┤
/// │ B │ 2 │ │ │ ───────────────────────────► │ │ │
/// ├────────┼────────┤ │ 3 │ │ C │ 3 │
/// │ │ │ ├────────┤ ├────────┼────────┤
/// │ C │ 3 │ │ │ │ │ │
/// ├────────┼────────┤ │ 1 │ │ D │ 4 │
/// │ │ │ └────────┘ └────────┼────────┘
/// │ E │ 5 │
/// └────────┴────────┘
/// values arrays indices array result
/// ```
///
/// # Errors
/// This function errors whenever:
/// * An index cannot be casted to `usize` (typically 32 bit architectures)
/// * An index is out of bounds and `options` is set to check bounds.
///
/// # Safety
///
/// When `options` is not set to check bounds, taking indexes after `len` will panic.
///
/// # Examples
/// ```
/// # use std::sync::Arc;
/// # use arrow_array::{StringArray, UInt32Array, cast::AsArray};
/// # use arrow_select::take::{take, take_arrays};
/// let string_values = Arc::new(StringArray::from(vec!["zero", "one", "two"]));
/// let values = Arc::new(UInt32Array::from(vec![0, 1, 2]));
///
/// // Take items at index 2, and 1:
/// let indices = UInt32Array::from(vec![2, 1]);
/// let taken_arrays = take_arrays(&[string_values, values], &indices, None).unwrap();
/// let taken_string = taken_arrays[0].as_string::<i32>();
/// assert_eq!(*taken_string, StringArray::from(vec!["two", "one"]));
/// let taken_values = taken_arrays[1].as_primitive();
/// assert_eq!(*taken_values, UInt32Array::from(vec![2, 1]));
/// ```
pub fn take_arrays(
arrays: &[ArrayRef],
indices: &dyn Array,
options: Option<TakeOptions>,
) -> Result<Vec<ArrayRef>, ArrowError> {
arrays
.iter()
.map(|array| take(array.as_ref(), indices, options.clone()))
.collect()
}

/// Verifies that the non-null values of `indices` are all `< len`
fn check_bounds<T: ArrowPrimitiveType>(
len: usize,
Expand Down
3 changes: 3 additions & 0 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ twox-hash = { version = "1.6", default-features = false }
paste = { version = "1.0" }
half = { version = "2.1", default-features = false, features = ["num-traits"] }
sysinfo = { version = "0.31.2", optional = true, default-features = false, features = ["system"] }
crc32fast = { version = "1.4.2", optional = true, default-features = false }

[dev-dependencies]
base64 = { version = "0.22", default-features = false, features = ["std"] }
Expand Down Expand Up @@ -117,6 +118,8 @@ object_store = ["dep:object_store", "async"]
zstd = ["dep:zstd", "zstd-sys"]
# Display memory in example/write_parquet.rs
sysinfo = ["dep:sysinfo"]
# Verify 32-bit CRC checksum when decoding parquet pages
crc = ["dep:crc32fast"]

[[example]]
name = "read_parquet"
Expand Down
3 changes: 2 additions & 1 deletion parquet/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ The `parquet` crate provides the following features which may be enabled in your
- `zstd` (default) - support for parquet using `zstd` compression
- `snap` (default) - support for parquet using `snappy` compression
- `cli` - parquet [CLI tools](https://github.com/apache/arrow-rs/tree/master/parquet/src/bin)
- `crc` - enables functionality to automatically verify checksums of each page (if present) when decoding
- `experimental` - Experimental APIs which may change, even between minor releases

## Parquet Feature Status
Expand All @@ -82,4 +83,4 @@ The `parquet` crate provides the following features which may be enabled in your

## License

Licensed under the Apache License, Version 2.0: http://www.apache.org/licenses/LICENSE-2.0.
Licensed under the Apache License, Version 2.0: <http://www.apache.org/licenses/LICENSE-2.0>.
12 changes: 10 additions & 2 deletions parquet/src/arrow/async_reader/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
/// Create a new [`MetadataLoader`] by reading the footer information
///
/// See [`fetch_parquet_metadata`] for the meaning of the individual parameters
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub async fn load(mut fetch: F, file_size: usize, prefetch: Option<usize>) -> Result<Self> {
if file_size < FOOTER_SIZE {
return Err(ParquetError::EOF(format!(
Expand Down Expand Up @@ -108,6 +109,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
}

/// Create a new [`MetadataLoader`] from an existing [`ParquetMetaData`]
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub fn new(fetch: F, metadata: ParquetMetaData) -> Self {
Self {
fetch,
Expand All @@ -120,6 +122,7 @@ impl<F: MetadataFetch> MetadataLoader<F> {
///
/// * `column_index`: if true will load column index
/// * `offset_index`: if true will load offset index
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub async fn load_page_index(&mut self, column_index: bool, offset_index: bool) -> Result<()> {
if !column_index && !offset_index {
return Ok(());
Expand Down Expand Up @@ -226,6 +229,7 @@ where
/// in the first request, instead of 8, and only issue further requests
/// if additional bytes are needed. Providing a `prefetch` hint can therefore
/// significantly reduce the number of `fetch` requests, and consequently latency
#[deprecated(since = "53.1.0", note = "Use ParquetMetaDataReader")]
pub async fn fetch_parquet_metadata<F, Fut>(
fetch: F,
file_size: usize,
Expand All @@ -236,10 +240,14 @@ where
Fut: Future<Output = Result<Bytes>> + Send,
{
let fetch = MetadataFetchFn(fetch);
let loader = MetadataLoader::load(fetch, file_size, prefetch).await?;
Ok(loader.finish())
ParquetMetaDataReader::new()
.with_prefetch_hint(prefetch)
.load_and_finish(fetch, file_size)
.await
}

// these tests are all replicated in parquet::file::metadata::reader
#[allow(deprecated)]
#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading

0 comments on commit 374d017

Please sign in to comment.