From a6410ce6f42ea3f141d350b1c7966a2406e7f552 Mon Sep 17 00:00:00 2001 From: Yijun Zhao Date: Wed, 6 Mar 2024 14:32:35 +0800 Subject: [PATCH] support string_view and binary_view --- arrow-array/src/array/byte_array.rs | 6 +- arrow-array/src/array/bytes_view_array.rs | 391 ++++++++++++++++++ arrow-array/src/array/mod.rs | 7 + .../src/builder/generic_bytes_view_builder.rs | 215 ++++++++++ arrow-array/src/builder/mod.rs | 3 + arrow-array/src/record_batch.rs | 28 +- arrow-array/src/types.rs | 97 +++++ arrow-buffer/src/native.rs | 1 + arrow-data/Cargo.toml | 1 + arrow-data/src/bytes_view.rs | 115 ++++++ arrow-data/src/data.rs | 85 ++-- arrow-data/src/equal/bytes_view.rs | 71 ++++ arrow-data/src/equal/mod.rs | 4 +- arrow-data/src/lib.rs | 3 + arrow-data/src/transform/mod.rs | 172 +++++--- arrow/tests/array_transform.rs | 39 ++ 16 files changed, 1128 insertions(+), 110 deletions(-) create mode 100644 arrow-array/src/array/bytes_view_array.rs create mode 100644 arrow-array/src/builder/generic_bytes_view_builder.rs create mode 100644 arrow-data/src/bytes_view.rs create mode 100644 arrow-data/src/equal/bytes_view.rs diff --git a/arrow-array/src/array/byte_array.rs b/arrow-array/src/array/byte_array.rs index db825bbea97d..a57abc5b1e71 100644 --- a/arrow-array/src/array/byte_array.rs +++ b/arrow-array/src/array/byte_array.rs @@ -94,7 +94,7 @@ pub struct GenericByteArray { impl Clone for GenericByteArray { fn clone(&self) -> Self { Self { - data_type: self.data_type.clone(), + data_type: T::DATA_TYPE, value_offsets: self.value_offsets.clone(), value_data: self.value_data.clone(), nulls: self.nulls.clone(), @@ -323,7 +323,7 @@ impl GenericByteArray { /// Returns a zero-copy slice of this array with the indicated offset and length. pub fn slice(&self, offset: usize, length: usize) -> Self { Self { - data_type: self.data_type.clone(), + data_type: T::DATA_TYPE, value_offsets: self.value_offsets.slice(offset, length), value_data: self.value_data.clone(), nulls: self.nulls.as_ref().map(|n| n.slice(offset, length)), @@ -511,7 +511,7 @@ impl From for GenericByteArray { Self { value_offsets, value_data, - data_type: data.data_type().clone(), + data_type: T::DATA_TYPE, nulls: data.nulls().cloned(), } } diff --git a/arrow-array/src/array/bytes_view_array.rs b/arrow-array/src/array/bytes_view_array.rs new file mode 100644 index 000000000000..dd21c3e7dbc7 --- /dev/null +++ b/arrow-array/src/array/bytes_view_array.rs @@ -0,0 +1,391 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::array::print_long_array; +use crate::builder::GenericBytesViewBuilder; +use crate::iterator::ArrayIter; +use crate::types::bytes::ByteArrayNativeType; +use crate::types::BytesViewType; +use crate::{Array, ArrayAccessor, ArrayRef}; +use arrow_buffer::{Buffer, NullBuffer, ScalarBuffer}; +use arrow_data::{ArrayData, ArrayDataBuilder, BytesView}; +use arrow_schema::{ArrowError, DataType}; +use std::any::Any; +use std::fmt::Debug; +use std::marker::PhantomData; +use std::sync::Arc; + +/// An array of variable length bytes view arrays +pub struct GenericBytesViewArray { + data_type: DataType, + views: ScalarBuffer, + buffers: Vec, + phantom: PhantomData, + nulls: Option, +} + +impl Clone for GenericBytesViewArray { + fn clone(&self) -> Self { + Self { + data_type: T::DATA_TYPE, + views: self.views.clone(), + buffers: self.buffers.clone(), + nulls: self.nulls.clone(), + phantom: Default::default(), + } + } +} + +impl GenericBytesViewArray { + /// Create a new [`GenericBytesViewArray`] from the provided parts, panicking on failure + /// + /// # Panics + /// + /// Panics if [`GenericBytesViewArray::try_new`] returns an error + pub fn new(views: ScalarBuffer, buffers: Vec, nulls: Option) -> Self { + Self::try_new(views, buffers, nulls).unwrap() + } + + /// Create a new [`GenericBytesViewArray`] from the provided parts, returning an error on failure + /// + /// # Errors + /// + /// * `views.len() != nulls.len()` + /// * [BytesViewType::validate] fails + pub fn try_new( + views: ScalarBuffer, + buffers: Vec, + nulls: Option, + ) -> Result { + T::validate(&views, &buffers)?; + + if let Some(n) = nulls.as_ref() { + if n.len() != views.len() { + return Err(ArrowError::InvalidArgumentError(format!( + "Incorrect length of null buffer for {}ViewArray, expected {} got {}", + T::PREFIX, + views.len(), + n.len(), + ))); + } + } + + Ok(Self { + data_type: T::DATA_TYPE, + views, + buffers, + nulls, + phantom: Default::default(), + }) + } + + /// Create a new [`GenericBytesViewArray`] from the provided parts, without validation + /// + /// # Safety + /// + /// Safe if [`Self::try_new`] would not error + pub unsafe fn new_unchecked( + views: ScalarBuffer, + buffers: Vec, + nulls: Option, + ) -> Self { + Self { + data_type: T::DATA_TYPE, + phantom: Default::default(), + views, + buffers, + nulls, + } + } + + /// Create a new [`GenericBytesViewArray`] of length `len` where all values are null + pub fn new_null(len: usize) -> Self { + Self { + data_type: T::DATA_TYPE, + views: vec![0; len].into(), + buffers: vec![], + nulls: Some(NullBuffer::new_null(len)), + phantom: Default::default(), + } + } + + /// Creates a [`GenericBytesViewArray`] based on an iterator of values without nulls + pub fn from_iter_values(iter: I) -> Self + where + Ptr: AsRef, + I: IntoIterator, + { + let iter = iter.into_iter(); + let mut builder = GenericBytesViewBuilder::::with_capacity(iter.size_hint().0); + for v in iter { + builder.append_value(v); + } + builder.finish() + } + + /// Deconstruct this array into its constituent parts + pub fn into_parts(self) -> (ScalarBuffer, Vec, Option) { + (self.views, self.buffers, self.nulls) + } + + /// Returns the views buffer + #[inline] + pub fn views(&self) -> &ScalarBuffer { + &self.views + } + + /// Returns the buffers storing string data + #[inline] + pub fn data_buffers(&self) -> &[Buffer] { + &self.buffers + } + + /// Returns the element at index `i` + /// # Panics + /// Panics if index `i` is out of bounds. + pub fn value(&self, i: usize) -> &T::Native { + assert!( + i < self.len(), + "Trying to access an element at index {} from a {}ViewArray of length {}", + i, + T::PREFIX, + self.len() + ); + + unsafe { self.value_unchecked(i) } + } + + /// Returns the element at index `i` + /// # Safety + /// Caller is responsible for ensuring that the index is within the bounds of the array + pub unsafe fn value_unchecked(&self, idx: usize) -> &T::Native { + let v = self.views.get_unchecked(idx); + let len = *v as u32; + let b = if len <= 12 { + let ptr = self.views.as_ptr() as *const u8; + std::slice::from_raw_parts(ptr.add(idx * 16 + 4), len as usize) + } else { + let view = BytesView::from(*v); + let data = self.buffers.get_unchecked(view.buffer_index as usize); + let offset = view.offset as usize; + data.get_unchecked(offset..offset + len as usize) + }; + T::Native::from_bytes_unchecked(b) + } + + /// constructs a new iterator + pub fn iter(&self) -> ArrayIter<&Self> { + ArrayIter::new(self) + } + + /// Returns a zero-copy slice of this array with the indicated offset and length. + pub fn slice(&self, offset: usize, length: usize) -> Self { + Self { + data_type: T::DATA_TYPE, + views: self.views.slice(offset, length), + buffers: self.buffers.clone(), + nulls: self.nulls.as_ref().map(|n| n.slice(offset, length)), + phantom: Default::default(), + } + } +} + +impl Debug for GenericBytesViewArray { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "{}ViewArray\n[\n", T::PREFIX)?; + print_long_array(self, f, |array, index, f| { + std::fmt::Debug::fmt(&array.value(index), f) + })?; + write!(f, "]") + } +} + +impl Array for GenericBytesViewArray { + fn as_any(&self) -> &dyn Any { + self + } + + fn to_data(&self) -> ArrayData { + self.clone().into() + } + + fn into_data(self) -> ArrayData { + self.into() + } + + fn data_type(&self) -> &DataType { + &self.data_type + } + + fn slice(&self, offset: usize, length: usize) -> ArrayRef { + Arc::new(self.slice(offset, length)) + } + + fn len(&self) -> usize { + self.views.len() + } + + fn is_empty(&self) -> bool { + self.views.is_empty() + } + + fn offset(&self) -> usize { + 0 + } + + fn nulls(&self) -> Option<&NullBuffer> { + self.nulls.as_ref() + } + + fn get_buffer_memory_size(&self) -> usize { + let mut sum = self.buffers.iter().map(|b| b.capacity()).sum::(); + sum += self.views.inner().capacity(); + if let Some(x) = &self.nulls { + sum += x.buffer().capacity() + } + sum + } + + fn get_array_memory_size(&self) -> usize { + std::mem::size_of::() + self.get_buffer_memory_size() + } +} + +impl<'a, T: BytesViewType + ?Sized> ArrayAccessor for &'a GenericBytesViewArray { + type Item = &'a T::Native; + + fn value(&self, index: usize) -> Self::Item { + GenericBytesViewArray::value(self, index) + } + + unsafe fn value_unchecked(&self, index: usize) -> Self::Item { + GenericBytesViewArray::value_unchecked(self, index) + } +} + +impl<'a, T: BytesViewType + ?Sized> IntoIterator for &'a GenericBytesViewArray { + type Item = Option<&'a T::Native>; + type IntoIter = ArrayIter; + + fn into_iter(self) -> Self::IntoIter { + ArrayIter::new(self) + } +} + +impl From for GenericBytesViewArray { + fn from(value: ArrayData) -> Self { + let views = value.buffers()[0].clone(); + let views = ScalarBuffer::new(views, value.offset(), value.len()); + let buffers = value.buffers()[1..].to_vec(); + Self { + data_type: T::DATA_TYPE, + views, + buffers, + nulls: value.nulls().cloned(), + phantom: Default::default(), + } + } +} + +impl From> for ArrayData { + fn from(mut array: GenericBytesViewArray) -> Self { + let len = array.len(); + array.buffers.insert(0, array.views.into_inner()); + let builder = ArrayDataBuilder::new(T::DATA_TYPE) + .len(len) + .buffers(array.buffers) + .nulls(array.nulls); + + unsafe { builder.build_unchecked() } + } +} + +impl FromIterator> for GenericBytesViewArray +where + Ptr: AsRef, +{ + fn from_iter>>(iter: I) -> Self { + let iter = iter.into_iter(); + let mut builder = GenericBytesViewBuilder::::with_capacity(iter.size_hint().0); + builder.extend(iter); + builder.finish() + } +} + +/// A [`GenericBytesViewArray`] of `[u8]` +pub type BinaryViewArray = GenericBytesViewArray<[u8]>; + +/// A [`GenericBytesViewArray`] of `str` +/// +/// ``` +/// use arrow_array::StringViewArray; +/// let array = StringViewArray::from_iter_values(vec!["hello", "world", "lulu", "large payload over 12 bytes"]); +/// assert_eq!(array.value(0), "hello"); +/// assert_eq!(array.value(3), "large payload over 12 bytes"); +/// ``` +pub type StringViewArray = GenericBytesViewArray; + +impl From> for StringViewArray { + fn from(v: Vec<&str>) -> Self { + Self::from_iter_values(v) + } +} + +#[cfg(test)] +mod tests { + use crate::builder::StringViewBuilder; + use crate::types::BytesViewType; + use crate::{Array, BinaryViewArray, StringViewArray}; + + #[test] + fn try_new() { + let array = StringViewArray::from_iter_values(vec![ + "hello", + "world", + "lulu", + "large payload over 12 bytes", + ]); + assert_eq!(array.value(0), "hello"); + assert_eq!(array.value(3), "large payload over 12 bytes"); + + let array = BinaryViewArray::from_iter_values(vec![ + b"hello".to_bytes(), + b"world".to_bytes(), + b"lulu".to_bytes(), + b"large payload over 12 bytes".to_bytes(), + ]); + assert_eq!(array.value(0), b"hello"); + assert_eq!(array.value(3), b"large payload over 12 bytes"); + + let array = { + let mut builder = StringViewBuilder::new(); + builder.finish() + }; + assert!(array.is_empty()); + + let array = { + let mut builder = StringViewBuilder::new(); + builder.append_value("hello"); + builder.append_null(); + builder.append_option(Some("large payload over 12 bytes")); + builder.finish() + }; + assert_eq!(array.value(0), "hello"); + assert!(array.is_null(1)); + assert_eq!(array.value(2), "large payload over 12 bytes"); + } +} diff --git a/arrow-array/src/array/mod.rs b/arrow-array/src/array/mod.rs index 7aa3f92bfbd2..8513b4050cce 100644 --- a/arrow-array/src/array/mod.rs +++ b/arrow-array/src/array/mod.rs @@ -65,8 +65,13 @@ mod union_array; pub use union_array::*; mod run_array; + pub use run_array::*; +mod bytes_view_array; + +pub use bytes_view_array::*; + /// An array in the [arrow columnar format](https://arrow.apache.org/docs/format/Columnar.html) pub trait Array: std::fmt::Debug + Send + Sync { /// Returns the array as [`Any`] so that it can be @@ -596,8 +601,10 @@ pub fn make_array(data: ArrayData) -> ArrayRef { DataType::Binary => Arc::new(BinaryArray::from(data)) as ArrayRef, DataType::LargeBinary => Arc::new(LargeBinaryArray::from(data)) as ArrayRef, DataType::FixedSizeBinary(_) => Arc::new(FixedSizeBinaryArray::from(data)) as ArrayRef, + DataType::BinaryView => Arc::new(BinaryViewArray::from(data)) as ArrayRef, DataType::Utf8 => Arc::new(StringArray::from(data)) as ArrayRef, DataType::LargeUtf8 => Arc::new(LargeStringArray::from(data)) as ArrayRef, + DataType::Utf8View => Arc::new(StringViewArray::from(data)) as ArrayRef, DataType::List(_) => Arc::new(ListArray::from(data)) as ArrayRef, DataType::LargeList(_) => Arc::new(LargeListArray::from(data)) as ArrayRef, DataType::Struct(_) => Arc::new(StructArray::from(data)) as ArrayRef, diff --git a/arrow-array/src/builder/generic_bytes_view_builder.rs b/arrow-array/src/builder/generic_bytes_view_builder.rs new file mode 100644 index 000000000000..9e45f2c2345f --- /dev/null +++ b/arrow-array/src/builder/generic_bytes_view_builder.rs @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::builder::ArrayBuilder; +use crate::types::BytesViewType; +use crate::{ArrayRef, GenericBytesViewArray}; +use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer}; +use arrow_data::BytesView; +use std::any::Any; +use std::marker::PhantomData; +use std::sync::Arc; + +const DEFAULT_BLOCK_SIZE: u32 = 8 * 1024; + +/// A builder for [`GenericBytesViewArray`] +/// +/// See [`Self::append_value`] for the allocation strategy +pub struct GenericBytesViewBuilder { + views_builder: BufferBuilder, + null_buffer_builder: NullBufferBuilder, + completed: Vec, + in_progress: Vec, + block_size: u32, + phantom: PhantomData, +} + +impl GenericBytesViewBuilder { + /// Creates a new [`GenericByteViewBuilder`]. + pub fn new() -> Self { + Self::with_capacity(1024) + } + + /// Creates a new [`GenericByteViewBuilder`] with space for `capacity` strings + pub fn with_capacity(capacity: usize) -> Self { + Self { + views_builder: BufferBuilder::new(capacity), + null_buffer_builder: NullBufferBuilder::new(capacity), + completed: vec![], + in_progress: vec![], + block_size: DEFAULT_BLOCK_SIZE, + phantom: Default::default(), + } + } + + /// Override the minimum size of buffers to allocate for string data + pub fn with_block_size(self, block_size: u32) -> Self { + Self { block_size, ..self } + } + + /// Appends a value into the builder + /// + /// # Panics + /// + /// Panics if + /// - String buffer count exceeds `u32::MAX` + /// - String length exceeds `u32::MAX` + #[inline] + pub fn append_value(&mut self, value: impl AsRef) { + let v: &[u8] = value.as_ref().as_ref(); + let length: u32 = v.len().try_into().unwrap(); + if length <= 12 { + let mut view_buffer = [0; 16]; + view_buffer[0..4].copy_from_slice(&length.to_le_bytes()); + view_buffer[4..4 + v.len()].copy_from_slice(v); + self.views_builder.append(u128::from_le_bytes(view_buffer)); + self.null_buffer_builder.append_non_null(); + return; + } + + let required_cap = self.in_progress.len() + v.len(); + if self.in_progress.capacity() < required_cap { + let in_progress = Vec::with_capacity(v.len().max(self.block_size as usize)); + let flushed = std::mem::replace(&mut self.in_progress, in_progress); + if !flushed.is_empty() { + assert!(self.completed.len() < u32::MAX as usize); + self.completed.push(flushed.into()); + } + }; + let offset = self.in_progress.len() as u32; + self.in_progress.extend_from_slice(v); + + let view = BytesView { + length, + prefix: u32::from_le_bytes(v[0..4].try_into().unwrap()), + buffer_index: self.completed.len() as u32, + offset, + }; + self.views_builder.append(view.into()); + self.null_buffer_builder.append_non_null(); + } + + /// Append an `Option` value into the builder + #[inline] + pub fn append_option(&mut self, value: Option>) { + match value { + None => self.append_null(), + Some(v) => self.append_value(v), + }; + } + + /// Append a null value into the builder + #[inline] + pub fn append_null(&mut self) { + self.null_buffer_builder.append_null(); + self.views_builder.append(0); + } + + /// Builds the [`GenericBytesViewArray`] and reset this builder + pub fn finish(&mut self) -> GenericBytesViewArray { + let mut completed = std::mem::take(&mut self.completed); + if !self.in_progress.is_empty() { + completed.push(std::mem::take(&mut self.in_progress).into()); + } + let len = self.views_builder.len(); + let views = ScalarBuffer::new(self.views_builder.finish(), 0, len); + let nulls = self.null_buffer_builder.finish(); + // SAFETY: valid by construction + unsafe { GenericBytesViewArray::new_unchecked(views, completed, nulls) } + } + + /// Builds the [`GenericBytesViewArray`] without resetting the builder + pub fn finish_cloned(&self) -> GenericBytesViewArray { + let mut completed = self.completed.clone(); + if !self.in_progress.is_empty() { + completed.push(Buffer::from_slice_ref(&self.in_progress)); + } + let len = self.views_builder.len(); + let views = Buffer::from_slice_ref(self.views_builder.as_slice()); + let views = ScalarBuffer::new(views, 0, len); + let nulls = self.null_buffer_builder.finish_cloned(); + // SAFETY: valid by construction + unsafe { GenericBytesViewArray::new_unchecked(views, completed, nulls) } + } +} + +impl Default for GenericBytesViewBuilder { + fn default() -> Self { + Self::new() + } +} + +impl std::fmt::Debug for GenericBytesViewBuilder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}ViewBuilder", T::PREFIX)?; + f.debug_struct("") + .field("views_builder", &self.views_builder) + .field("in_progress", &self.in_progress) + .field("completed", &self.completed) + .field("null_buffer_builder", &self.null_buffer_builder) + .finish() + } +} + +impl ArrayBuilder for GenericBytesViewBuilder { + fn len(&self) -> usize { + self.null_buffer_builder.len() + } + + fn finish(&mut self) -> ArrayRef { + Arc::new(self.finish()) + } + + fn finish_cloned(&self) -> ArrayRef { + Arc::new(self.finish_cloned()) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + self + } + + fn into_box_any(self: Box) -> Box { + self + } +} + +impl> Extend> + for GenericBytesViewBuilder +{ + #[inline] + fn extend>>(&mut self, iter: I) { + for v in iter { + self.append_option(v) + } + } +} + +/// Array builder for [`StringViewArray`][crate::StringViewArray] +/// +/// Values can be appended using [`GenericByteViewBuilder::append_value`], and nulls with +/// [`GenericByteViewBuilder::append_null`] as normal. +pub type StringViewBuilder = GenericBytesViewBuilder; + +/// Array builder for [`BinaryViewArray`][crate::BinaryViewArray] +/// +/// Values can be appended using [`GenericByteViewBuilder::append_value`], and nulls with +/// [`GenericByteViewBuilder::append_null`] as normal. +pub type BinaryViewBuilder = GenericBytesViewBuilder<[u8]>; diff --git a/arrow-array/src/builder/mod.rs b/arrow-array/src/builder/mod.rs index d33e565a868b..e4ab7ae4ba23 100644 --- a/arrow-array/src/builder/mod.rs +++ b/arrow-array/src/builder/mod.rs @@ -178,7 +178,10 @@ mod generic_bytes_dictionary_builder; pub use generic_bytes_dictionary_builder::*; mod generic_byte_run_builder; pub use generic_byte_run_builder::*; +mod generic_bytes_view_builder; +pub use generic_bytes_view_builder::*; mod union_builder; + pub use union_builder::*; use crate::ArrayRef; diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 314445bba617..c56b1fd308cf 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -626,7 +626,9 @@ mod tests { use std::collections::HashMap; use super::*; - use crate::{BooleanArray, Int32Array, Int64Array, Int8Array, ListArray, StringArray}; + use crate::{ + BooleanArray, Int32Array, Int64Array, Int8Array, ListArray, StringArray, StringViewArray, + }; use arrow_buffer::{Buffer, ToByteSlice}; use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::Fields; @@ -646,6 +648,30 @@ mod tests { check_batch(record_batch, 5) } + #[test] + fn create_string_view_record_batch() { + let schema = Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8View, false), + ]); + + let a = Int32Array::from(vec![1, 2, 3, 4, 5]); + let b = StringViewArray::from(vec!["a", "b", "c", "d", "e"]); + + let record_batch = + RecordBatch::try_new(Arc::new(schema), vec![Arc::new(a), Arc::new(b)]).unwrap(); + + assert_eq!(5, record_batch.num_rows()); + assert_eq!(2, record_batch.num_columns()); + assert_eq!(&DataType::Int32, record_batch.schema().field(0).data_type()); + assert_eq!( + &DataType::Utf8View, + record_batch.schema().field(1).data_type() + ); + assert_eq!(5, record_batch.column(0).len()); + assert_eq!(5, record_batch.column(1).len()); + } + #[test] fn byte_size_should_not_regress() { let schema = Schema::new(vec![ diff --git a/arrow-array/src/types.rs b/arrow-array/src/types.rs index 83a229c1da0d..65ba46e5cd79 100644 --- a/arrow-array/src/types.rs +++ b/arrow-array/src/types.rs @@ -25,12 +25,14 @@ use crate::timezone::Tz; use crate::{ArrowNativeTypeOp, OffsetSizeTrait}; use arrow_buffer::{i256, Buffer, OffsetBuffer}; use arrow_data::decimal::{validate_decimal256_precision, validate_decimal_precision}; +use arrow_data::{validate_binary_view, validate_string_view}; use arrow_schema::{ ArrowError, DataType, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION, DECIMAL128_MAX_SCALE, DECIMAL256_MAX_PRECISION, DECIMAL256_MAX_SCALE, DECIMAL_DEFAULT_SCALE, }; use chrono::{Duration, NaiveDate, NaiveDateTime}; use half::f16; +use std::fmt::Debug; use std::marker::PhantomData; use std::ops::{Add, Sub}; @@ -1544,6 +1546,101 @@ pub type BinaryType = GenericBinaryType; /// An arrow binary array with i64 offsets pub type LargeBinaryType = GenericBinaryType; +mod bytes_view { + pub trait Sealed: Send + Sync {} + impl Sealed for str {} + impl Sealed for [u8] {} +} + +/// A trait over the variable length bytes view array types +pub trait BytesViewType: bytes_view::Sealed + 'static + PartialEq + AsRef { + /// If element in array is utf8 encoded string. + const IS_UTF8: bool; + + /// Datatype of array elements + const DATA_TYPE: DataType = if Self::IS_UTF8 { + DataType::Utf8View + } else { + DataType::BinaryView + }; + + /// "Binary" or "String", for use in displayed or error messages + const PREFIX: &'static str; + + /// Type for representing its equivalent rust type i.e + /// Utf8Array will have native type has &str + /// BinaryArray will have type as [u8] + type Native: bytes::ByteArrayNativeType + AsRef + AsRef<[u8]> + ?Sized; + + /// Type for owned corresponding to `Native` + type Owned: Debug + Clone + Sync + Send + AsRef; + + /// # Safety + /// The caller must ensure `index < self.len()`. + unsafe fn from_bytes_unchecked(slice: &[u8]) -> &Self; + + /// To bytes slice. + fn to_bytes(&self) -> &[u8]; + + /// To owned type + #[allow(clippy::wrong_self_convention)] + fn into_owned(&self) -> Self::Owned; + + /// Verifies that the provided buffers are valid for this array type + fn validate(views: &[u128], buffers: &[Buffer]) -> Result<(), ArrowError>; +} + +impl BytesViewType for str { + const IS_UTF8: bool = true; + const PREFIX: &'static str = "String"; + + type Native = str; + type Owned = String; + + #[inline(always)] + unsafe fn from_bytes_unchecked(slice: &[u8]) -> &Self { + std::str::from_utf8_unchecked(slice) + } + + #[inline(always)] + fn to_bytes(&self) -> &[u8] { + self.as_bytes() + } + + fn into_owned(&self) -> Self::Owned { + self.to_string() + } + + fn validate(views: &[u128], buffers: &[Buffer]) -> Result<(), ArrowError> { + validate_string_view(views, buffers) + } +} + +impl BytesViewType for [u8] { + const IS_UTF8: bool = false; + const PREFIX: &'static str = "Binary"; + type Native = [u8]; + type Owned = Vec; + + #[inline(always)] + unsafe fn from_bytes_unchecked(slice: &[u8]) -> &Self { + slice + } + + #[inline(always)] + fn to_bytes(&self) -> &[u8] { + self + } + + fn into_owned(&self) -> Self::Owned { + self.to_vec() + } + + fn validate(views: &[u128], buffers: &[Buffer]) -> Result<(), ArrowError> { + validate_binary_view(views, buffers) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/arrow-buffer/src/native.rs b/arrow-buffer/src/native.rs index 38074a8dc26c..5184d60ac1fd 100644 --- a/arrow-buffer/src/native.rs +++ b/arrow-buffer/src/native.rs @@ -149,6 +149,7 @@ native_integer!(u8); native_integer!(u16); native_integer!(u32); native_integer!(u64); +native_integer!(u128); macro_rules! native_float { ($t:ty, $s:ident, $as_usize: expr, $i:ident, $usize_as: expr) => { diff --git a/arrow-data/Cargo.toml b/arrow-data/Cargo.toml index c83f867523d5..e7a897240676 100644 --- a/arrow-data/Cargo.toml +++ b/arrow-data/Cargo.toml @@ -51,6 +51,7 @@ arrow-schema = { workspace = true } num = { version = "0.4", default-features = false, features = ["std"] } half = { version = "2.1", default-features = false } +simdutf8 = { version = "0.1.4", default-features = false, features = ["std", "aarch64_neon"] } [dev-dependencies] diff --git a/arrow-data/src/bytes_view.rs b/arrow-data/src/bytes_view.rs new file mode 100644 index 000000000000..40cc7c2798a6 --- /dev/null +++ b/arrow-data/src/bytes_view.rs @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_buffer::Buffer; +use arrow_schema::ArrowError; + +#[derive(Debug, Copy, Clone, Default)] +#[repr(C)] +pub struct BytesView { + /// The length of the string/bytes. + pub length: u32, + /// First 4 bytes of string/bytes data. + pub prefix: u32, + /// The buffer index. + pub buffer_index: u32, + /// The offset into the buffer. + pub offset: u32, +} + +impl BytesView { + #[inline(always)] + pub fn as_u128(self) -> u128 { + unsafe { std::mem::transmute(self) } + } +} + +impl From for BytesView { + #[inline] + fn from(value: u128) -> Self { + unsafe { std::mem::transmute(value) } + } +} + +impl From for u128 { + #[inline] + fn from(value: BytesView) -> Self { + value.as_u128() + } +} + +/// Validates the combination of `views` and `buffers` is a valid BinaryView +pub fn validate_binary_view(views: &[u128], buffers: &[Buffer]) -> Result<(), ArrowError> { + validate_view_impl(views, buffers, |_, _| Ok(())) +} + +/// Validates the combination of `views` and `buffers` is a valid StringView +pub fn validate_string_view(views: &[u128], buffers: &[Buffer]) -> Result<(), ArrowError> { + validate_view_impl(views, buffers, |idx, b| { + simdutf8::basic::from_utf8(b).map_err(|e| { + ArrowError::InvalidArgumentError(format!( + "Encountered non-UTF-8 data at index {idx}: {e}" + )) + })?; + Ok(()) + }) +} + +fn validate_view_impl(views: &[u128], buffers: &[Buffer], f: F) -> Result<(), ArrowError> +where + F: Fn(usize, &[u8]) -> Result<(), ArrowError>, +{ + for (idx, v) in views.iter().enumerate() { + let len = *v as u32; + if len <= 12 { + if len < 12 && (v >> (32 + len * 8)) != 0 { + return Err(ArrowError::InvalidArgumentError(format!( + "View at index {idx} contained non-zero padding for string of length {len}", + ))); + } + f(idx, &v.to_le_bytes()[4..4 + len as usize])?; + } else { + let view = BytesView::from(*v); + let data = buffers.get(view.buffer_index as usize).ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "Invalid buffer index at {idx}: got index {} but only has {} buffers", + view.buffer_index, + buffers.len() + )) + })?; + + let start = view.offset as usize; + let end = start + len as usize; + let b = data.get(start..end).ok_or_else(|| { + ArrowError::InvalidArgumentError(format!( + "Invalid buffer slice at {idx}: got {start}..{end} but buffer {} has length {}", + view.buffer_index, + data.len() + )) + })?; + + if !b.starts_with(&view.prefix.to_le_bytes()) { + return Err(ArrowError::InvalidArgumentError( + "Mismatch between embedded prefix and data".to_string(), + )); + } + + f(idx, b)?; + } + } + Ok(()) +} diff --git a/arrow-data/src/data.rs b/arrow-data/src/data.rs index bd45c4f8ddda..6e4f1e283713 100644 --- a/arrow-data/src/data.rs +++ b/arrow-data/src/data.rs @@ -26,7 +26,7 @@ use std::mem; use std::ops::Range; use std::sync::Arc; -use crate::equal; +use crate::{equal, validate_binary_view, validate_string_view}; /// A collection of [`Buffer`] #[doc(hidden)] @@ -151,29 +151,6 @@ pub(crate) fn new_buffers(data_type: &DataType, capacity: usize) -> [MutableBuff } } -/// Maps 2 [`MutableBuffer`]s into a vector of [Buffer]s whose size depends on `data_type`. -#[inline] -pub(crate) fn into_buffers( - data_type: &DataType, - buffer1: MutableBuffer, - buffer2: MutableBuffer, -) -> Vec { - match data_type { - DataType::Null | DataType::Struct(_) | DataType::FixedSizeList(_, _) => vec![], - DataType::Utf8 | DataType::Binary | DataType::LargeUtf8 | DataType::LargeBinary => { - vec![buffer1.into(), buffer2.into()] - } - DataType::Union(_, mode) => { - match mode { - // Based on Union's DataTypeLayout - UnionMode::Sparse => vec![buffer1.into()], - UnionMode::Dense => vec![buffer1.into(), buffer2.into()], - } - } - _ => vec![buffer1.into()], - } -} - /// A generic representation of Arrow array data which encapsulates common attributes and /// operations for Arrow array. Specific operations for different arrays types (e.g., /// primitive, list, struct) are implemented in `Array`. @@ -737,7 +714,10 @@ impl ArrayData { ))); } - if self.buffers.len() != layout.buffers.len() { + // Check data buffers length for view types and other types + if self.buffers.len() < layout.buffers.len() + || (!layout.variadic && self.buffers.len() != layout.buffers.len()) + { return Err(ArrowError::InvalidArgumentError(format!( "Expected {} buffers in array of type {:?}, got {}", layout.buffers.len(), @@ -1232,6 +1212,14 @@ impl ArrayData { DataType::LargeUtf8 => self.validate_utf8::(), DataType::Binary => self.validate_offsets_full::(self.buffers[1].len()), DataType::LargeBinary => self.validate_offsets_full::(self.buffers[1].len()), + DataType::BinaryView => { + let views = self.typed_buffer::(0, self.len)?; + validate_binary_view(views, &self.buffers[1..]) + } + DataType::Utf8View => { + let views = self.typed_buffer::(0, self.len)?; + validate_string_view(views, &self.buffers[1..]) + } DataType::List(_) | DataType::Map(_, _) => { let child = &self.child_data[0]; self.validate_offsets_full::(child.len) @@ -1503,10 +1491,12 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout { DataType::Null => DataTypeLayout { buffers: vec![], can_contain_null_mask: false, + variadic: false, }, DataType::Boolean => DataTypeLayout { buffers: vec![BufferSpec::BitMap], can_contain_null_mask: true, + variadic: false, }, DataType::Int8 => DataTypeLayout::new_fixed_width::(), DataType::Int16 => DataTypeLayout::new_fixed_width::(), @@ -1538,15 +1528,14 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout { DataTypeLayout { buffers: vec![spec], can_contain_null_mask: true, + variadic: false, } } DataType::Binary => DataTypeLayout::new_binary::(), DataType::LargeBinary => DataTypeLayout::new_binary::(), DataType::Utf8 => DataTypeLayout::new_binary::(), DataType::LargeUtf8 => DataTypeLayout::new_binary::(), - DataType::BinaryView | DataType::Utf8View => { - unimplemented!("BinaryView/Utf8View not implemented") - } + DataType::BinaryView | DataType::Utf8View => DataTypeLayout::new_view(), DataType::FixedSizeList(_, _) => DataTypeLayout::new_empty(), // all in child data DataType::List(_) => DataTypeLayout::new_fixed_width::(), DataType::LargeList(_) => DataTypeLayout::new_fixed_width::(), @@ -1575,6 +1564,7 @@ pub fn layout(data_type: &DataType) -> DataTypeLayout { } }, can_contain_null_mask: false, + variadic: false, } } DataType::Dictionary(key_type, _value_type) => layout(key_type), @@ -1590,6 +1580,11 @@ pub struct DataTypeLayout { /// Can contain a null bitmask pub can_contain_null_mask: bool, + + /// This field only applies to the view type,[`DataType::BinaryView`] and [`DataType::Utf8View`] + /// If `variadic` is true, the number of buffers expected is only lower-bounded by + /// buffers.len(). Buffers that exceed the lower bound are legal. + pub variadic: bool, } impl DataTypeLayout { @@ -1601,6 +1596,7 @@ impl DataTypeLayout { alignment: mem::align_of::(), }], can_contain_null_mask: true, + variadic: false, } } @@ -1611,6 +1607,7 @@ impl DataTypeLayout { Self { buffers: vec![], can_contain_null_mask: true, + variadic: false, } } @@ -1629,6 +1626,19 @@ impl DataTypeLayout { BufferSpec::VariableWidth, ], can_contain_null_mask: true, + variadic: false, + } + } + + /// Describes a view type + pub fn new_view() -> Self { + Self { + buffers: vec![BufferSpec::FixedWidth { + byte_width: mem::size_of::(), + alignment: mem::align_of::(), + }], + can_contain_null_mask: true, + variadic: true, } } } @@ -1834,7 +1844,7 @@ impl From for ArrayDataBuilder { #[cfg(test)] mod tests { use super::*; - use arrow_schema::{Field, UnionFields}; + use arrow_schema::Field; // See arrow/tests/array_data_validation.rs for test of array validation @@ -2082,23 +2092,6 @@ mod tests { assert!(!contains_nulls(Some(&buffer), 0, 0)); } - #[test] - fn test_into_buffers() { - let data_types = vec![ - DataType::Union(UnionFields::empty(), UnionMode::Dense), - DataType::Union(UnionFields::empty(), UnionMode::Sparse), - ]; - - for data_type in data_types { - let buffers = new_buffers(&data_type, 0); - let [buffer1, buffer2] = buffers; - let buffers = into_buffers(&data_type, buffer1, buffer2); - - let layout = layout(&data_type); - assert_eq!(buffers.len(), layout.buffers.len()); - } - } - #[test] fn test_alignment() { let buffer = Buffer::from_vec(vec![1_i32, 2_i32, 3_i32]); diff --git a/arrow-data/src/equal/bytes_view.rs b/arrow-data/src/equal/bytes_view.rs new file mode 100644 index 000000000000..b225030b02f3 --- /dev/null +++ b/arrow-data/src/equal/bytes_view.rs @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::{ArrayData, BytesView}; + +pub(super) fn bytes_view_equal( + lhs: &ArrayData, + rhs: &ArrayData, + lhs_start: usize, + rhs_start: usize, + len: usize, +) -> bool { + let lhs_views = &lhs.buffer::(0)[lhs_start..lhs_start + len]; + let lhs_buffers = &lhs.buffers()[1..]; + let rhs_views = &rhs.buffer::(0)[rhs_start..rhs_start + len]; + let rhs_buffers = &rhs.buffers()[1..]; + + for (idx, (l, r)) in lhs_views.iter().zip(rhs_views).enumerate() { + // Only checking one null mask here because by the time the control flow reaches + // this point, the equality of the two masks would have already been verified. + if lhs.is_null(idx) { + continue; + } + + let l_len_prefix = *l as u64; + let r_len_prefix = *r as u64; + // short-circuit, check length and prefix + if l_len_prefix != r_len_prefix { + return false; + } + + let len = l_len_prefix as u32; + // for inline storage, only need check view + if len <= 12 { + if l != r { + return false; + } + continue; + } + + // check buffers + let l_view = BytesView::from(*l); + let r_view = BytesView::from(*r); + + let l_buffer = &lhs_buffers[l_view.buffer_index as usize]; + let r_buffer = &rhs_buffers[r_view.buffer_index as usize]; + + // prefixes are already known to be equal; skip checking them + let len = len as usize - 4; + let l_offset = l_view.offset as usize + 4; + let r_offset = r_view.offset as usize + 4; + if l_buffer[l_offset..l_offset + len] != r_buffer[r_offset..r_offset + len] { + return false; + } + } + true +} diff --git a/arrow-data/src/equal/mod.rs b/arrow-data/src/equal/mod.rs index 1255ff39e097..be33c7c0c2ad 100644 --- a/arrow-data/src/equal/mod.rs +++ b/arrow-data/src/equal/mod.rs @@ -25,6 +25,7 @@ use arrow_schema::{DataType, IntervalUnit}; use half::f16; mod boolean; +mod bytes_view; mod dictionary; mod fixed_binary; mod fixed_list; @@ -41,6 +42,7 @@ mod variable_size; // For this reason, they are not exposed and are instead used // to build the generic functions below (`equal_range` and `equal`). use boolean::boolean_equal; +use bytes_view::bytes_view_equal; use dictionary::dictionary_equal; use fixed_binary::fixed_binary_equal; use fixed_list::fixed_list_equal; @@ -97,7 +99,7 @@ fn equal_values( } DataType::FixedSizeBinary(_) => fixed_binary_equal(lhs, rhs, lhs_start, rhs_start, len), DataType::BinaryView | DataType::Utf8View => { - unimplemented!("BinaryView/Utf8View not yet implemented") + bytes_view_equal(lhs, rhs, lhs_start, rhs_start, len) } DataType::List(_) => list_equal::(lhs, rhs, lhs_start, rhs_start, len), DataType::LargeList(_) => list_equal::(lhs, rhs, lhs_start, rhs_start, len), diff --git a/arrow-data/src/lib.rs b/arrow-data/src/lib.rs index cfa0dba66c35..4399d0f3eca2 100644 --- a/arrow-data/src/lib.rs +++ b/arrow-data/src/lib.rs @@ -30,3 +30,6 @@ pub mod decimal; #[cfg(feature = "ffi")] pub mod ffi; + +mod bytes_view; +pub use bytes_view::*; diff --git a/arrow-data/src/transform/mod.rs b/arrow-data/src/transform/mod.rs index ef53efac2373..9105c522dbda 100644 --- a/arrow-data/src/transform/mod.rs +++ b/arrow-data/src/transform/mod.rs @@ -15,13 +15,10 @@ // specific language governing permissions and limitations // under the License. -use super::{ - data::{into_buffers, new_buffers}, - ArrayData, ArrayDataBuilder, -}; +use super::{data::new_buffers, ArrayData, ArrayDataBuilder, BytesView}; use crate::bit_mask::set_bits; use arrow_buffer::buffer::{BooleanBuffer, NullBuffer}; -use arrow_buffer::{bit_util, i256, ArrowNativeType, MutableBuffer}; +use arrow_buffer::{bit_util, i256, ArrowNativeType, Buffer, MutableBuffer}; use arrow_schema::{ArrowError, DataType, IntervalUnit, UnionMode}; use half::f16; use num::Integer; @@ -68,36 +65,6 @@ impl<'a> _MutableArrayData<'a> { .as_mut() .expect("MutableArrayData not nullable") } - - fn freeze(self, dictionary: Option) -> ArrayDataBuilder { - let buffers = into_buffers(&self.data_type, self.buffer1, self.buffer2); - - let child_data = match self.data_type { - DataType::Dictionary(_, _) => vec![dictionary.unwrap()], - _ => { - let mut child_data = Vec::with_capacity(self.child_data.len()); - for child in self.child_data { - child_data.push(child.freeze()); - } - child_data - } - }; - - let nulls = self - .null_buffer - .map(|nulls| { - let bools = BooleanBuffer::new(nulls.into(), 0, self.len); - unsafe { NullBuffer::new_unchecked(bools, self.null_count) } - }) - .filter(|n| n.null_count() > 0); - - ArrayDataBuilder::new(self.data_type) - .offset(0) - .len(self.len) - .nulls(nulls) - .buffers(buffers) - .child_data(child_data) - } } fn build_extend_null_bits(array: &ArrayData, use_nulls: bool) -> ExtendNullBits { @@ -138,26 +105,32 @@ fn build_extend_null_bits(array: &ArrayData, use_nulls: bool) -> ExtendNullBits pub struct MutableArrayData<'a> { #[allow(dead_code)] arrays: Vec<&'a ArrayData>, - // The attributes in [_MutableArrayData] cannot be in [MutableArrayData] due to - // mutability invariants (interior mutability): - // [MutableArrayData] contains a function that can only mutate [_MutableArrayData], not - // [MutableArrayData] itself + /// The attributes in [_MutableArrayData] cannot be in [MutableArrayData] due to + /// mutability invariants (interior mutability): + /// [MutableArrayData] contains a function that can only mutate [_MutableArrayData], not + /// [MutableArrayData] itself data: _MutableArrayData<'a>, - // the child data of the `Array` in Dictionary arrays. - // This is not stored in `MutableArrayData` because these values constant and only needed - // at the end, when freezing [_MutableArrayData]. + /// the child data of the `Array` in Dictionary arrays. + /// This is not stored in `MutableArrayData` because these values constant and only needed + /// at the end, when freezing [_MutableArrayData]. dictionary: Option, - // function used to extend values from arrays. This function's lifetime is bound to the array - // because it reads values from it. + /// Variadic data buffers referenced by views + /// This is not stored in `MutableArrayData` because these values constant and only needed + /// at the end, when freezing [_MutableArrayData] + variadic_data_buffers: Vec, + + /// function used to extend values from arrays. This function's lifetime is bound to the array + /// because it reads values from it. extend_values: Vec>, - // function used to extend nulls from arrays. This function's lifetime is bound to the array - // because it reads nulls from it. + + /// function used to extend nulls from arrays. This function's lifetime is bound to the array + /// because it reads nulls from it. extend_null_bits: Vec>, - // function used to extend nulls. - // this is independent of the arrays and therefore has no lifetime. + /// function used to extend nulls. + /// this is independent of the arrays and therefore has no lifetime. extend_nulls: ExtendNulls, } @@ -197,6 +170,26 @@ fn build_extend_dictionary(array: &ArrayData, offset: usize, max: usize) -> Opti } } +/// Builds an extend that adds `buffer_offset` to any buffer indices encountered +fn build_extend_view(array: &ArrayData, buffer_offset: u32) -> Extend { + let views = array.buffer::(0); + Box::new( + move |mutable: &mut _MutableArrayData, _, start: usize, len: usize| { + mutable + .buffer1 + .extend(views[start..start + len].iter().map(|v| { + let len = *v as u32; + if len <= 12 { + return *v; // Stored inline + } + let mut view = BytesView::from(*v); + view.buffer_index += buffer_offset; + view.into() + })) + }, + ) +} + fn build_extend(array: &ArrayData) -> Extend { match array.data_type() { DataType::Null => null::build_extend(array), @@ -224,9 +217,7 @@ fn build_extend(array: &ArrayData) -> Extend { DataType::Decimal256(_, _) => primitive::build_extend::(array), DataType::Utf8 | DataType::Binary => variable_size::build_extend::(array), DataType::LargeUtf8 | DataType::LargeBinary => variable_size::build_extend::(array), - DataType::BinaryView | DataType::Utf8View => { - unimplemented!("BinaryView/Utf8View not implemented") - } + DataType::BinaryView | DataType::Utf8View => unreachable!("should use build_extend_view"), DataType::Map(_, _) | DataType::List(_) => list::build_extend::(array), DataType::LargeList(_) => list::build_extend::(array), DataType::Dictionary(_, _) => unreachable!("should use build_extend_dictionary"), @@ -269,9 +260,7 @@ fn build_extend_nulls(data_type: &DataType) -> ExtendNulls { DataType::Decimal256(_, _) => primitive::extend_nulls::, DataType::Utf8 | DataType::Binary => variable_size::extend_nulls::, DataType::LargeUtf8 | DataType::LargeBinary => variable_size::extend_nulls::, - DataType::BinaryView | DataType::Utf8View => { - unimplemented!("BinaryView/Utf8View not implemented") - } + DataType::BinaryView | DataType::Utf8View => primitive::extend_nulls::, DataType::Map(_, _) | DataType::List(_) => list::extend_nulls::, DataType::LargeList(_) => list::extend_nulls::, DataType::Dictionary(child_data_type, _) => match child_data_type.as_ref() { @@ -423,11 +412,10 @@ impl<'a> MutableArrayData<'a> { | DataType::Binary | DataType::LargeUtf8 | DataType::LargeBinary + | DataType::BinaryView + | DataType::Utf8View | DataType::Interval(_) | DataType::FixedSizeBinary(_) => vec![], - DataType::BinaryView | DataType::Utf8View => { - unimplemented!("BinaryView/Utf8View not implemented") - } DataType::Map(_, _) | DataType::List(_) | DataType::LargeList(_) => { let children = arrays .iter() @@ -557,6 +545,15 @@ impl<'a> MutableArrayData<'a> { _ => (None, false), }; + let variadic_data_buffers = match &data_type { + DataType::BinaryView | DataType::Utf8View => arrays + .iter() + .flat_map(|x| x.buffers().iter().skip(1)) + .map(Buffer::clone) + .collect(), + _ => vec![], + }; + let extend_nulls = build_extend_nulls(data_type); let extend_null_bits = arrays @@ -589,6 +586,20 @@ impl<'a> MutableArrayData<'a> { extend_values.expect("MutableArrayData::new is infallible") } + DataType::BinaryView | DataType::Utf8View => { + let mut next_offset = 0u32; + arrays + .iter() + .map(|arr| { + let num_data_buffers = (arr.buffers().len() - 1) as u32; + let offset = next_offset; + next_offset = next_offset + .checked_add(num_data_buffers) + .expect("view buffer index overflow"); + build_extend_view(arr, offset) + }) + .collect() + } _ => arrays.iter().map(|array| build_extend(array)).collect(), }; @@ -605,6 +616,7 @@ impl<'a> MutableArrayData<'a> { arrays, data, dictionary, + variadic_data_buffers, extend_values, extend_null_bits, extend_nulls, @@ -664,13 +676,55 @@ impl<'a> MutableArrayData<'a> { /// Creates a [ArrayData] from the pushed regions up to this point, consuming `self`. pub fn freeze(self) -> ArrayData { - unsafe { self.data.freeze(self.dictionary).build_unchecked() } + unsafe { self.into_builder().build_unchecked() } } /// Creates a [ArrayDataBuilder] from the pushed regions up to this point, consuming `self`. /// This is useful for extending the default behavior of MutableArrayData. pub fn into_builder(self) -> ArrayDataBuilder { - self.data.freeze(self.dictionary) + let data = self.data; + + let buffers = match data.data_type { + DataType::Null | DataType::Struct(_) | DataType::FixedSizeList(_, _) => { + vec![] + } + DataType::BinaryView | DataType::Utf8View => { + let mut b = self.variadic_data_buffers; + b.insert(0, data.buffer1.into()); + b + } + DataType::Utf8 | DataType::Binary | DataType::LargeUtf8 | DataType::LargeBinary => { + vec![data.buffer1.into(), data.buffer2.into()] + } + DataType::Union(_, mode) => { + match mode { + // Based on Union's DataTypeLayout + UnionMode::Sparse => vec![data.buffer1.into()], + UnionMode::Dense => vec![data.buffer1.into(), data.buffer2.into()], + } + } + _ => vec![data.buffer1.into()], + }; + + let child_data = match data.data_type { + DataType::Dictionary(_, _) => vec![self.dictionary.unwrap()], + _ => data.child_data.into_iter().map(|x| x.freeze()).collect(), + }; + + let nulls = data + .null_buffer + .map(|nulls| { + let bools = BooleanBuffer::new(nulls.into(), 0, data.len); + unsafe { NullBuffer::new_unchecked(bools, data.null_count) } + }) + .filter(|n| n.null_count() > 0); + + ArrayDataBuilder::new(data.data_type) + .offset(0) + .len(data.len) + .nulls(nulls) + .buffers(buffers) + .child_data(child_data) } } diff --git a/arrow/tests/array_transform.rs b/arrow/tests/array_transform.rs index 5a267c876d6a..83d3003a0586 100644 --- a/arrow/tests/array_transform.rs +++ b/arrow/tests/array_transform.rs @@ -22,6 +22,7 @@ use arrow::array::{ UnionArray, }; use arrow::datatypes::Int16Type; +use arrow_array::StringViewArray; use arrow_buffer::Buffer; use arrow_data::transform::MutableArrayData; use arrow_data::ArrayData; @@ -1027,6 +1028,44 @@ fn test_extend_nulls_panic() { mutable.extend_nulls(2); } +#[test] +fn test_string_view() { + let a1 = + StringViewArray::from(vec!["foo", "very long string over 12 bytes", "bar"]).into_data(); + let a2 = StringViewArray::from_iter(vec![ + Some("bar"), + None, + Some("long string also over 12 bytes"), + ]) + .into_data(); + + a1.validate_full().unwrap(); + a2.validate_full().unwrap(); + + let mut mutable = MutableArrayData::new(vec![&a1, &a2], false, 4); + mutable.extend(1, 0, 1); + mutable.extend(0, 1, 2); + mutable.extend(0, 0, 1); + mutable.extend(1, 2, 3); + + let array = StringViewArray::from(mutable.freeze()); + assert_eq!(array.data_buffers().len(), 2); + // Should have reused data buffers + assert_eq!(array.data_buffers()[0].as_ptr(), a1.buffers()[1].as_ptr()); + assert_eq!(array.data_buffers()[1].as_ptr(), a2.buffers()[1].as_ptr()); + + let v = array.iter().collect::>(); + assert_eq!( + v, + vec![ + Some("bar"), + Some("very long string over 12 bytes"), + Some("foo"), + Some("long string also over 12 bytes") + ] + ) +} + #[test] #[should_panic(expected = "Arrays with inconsistent types passed to MutableArrayData")] fn test_mixed_types() {