Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(arrow-select): concat kernel will merge dictionary values for list of dictionaries #6893

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Prev Previous commit
remove the use of ArrayData
  • Loading branch information
rluvaton committed Dec 21, 2024
commit ca61ce271bd1463272f2333ddc56ad9d66f30525
35 changes: 14 additions & 21 deletions arrow-select/src/concat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ use arrow_array::types::*;
use arrow_array::*;
use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, NullBuffer, OffsetBuffer};
use arrow_data::transform::{Capacities, MutableArrayData};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{ArrowError, DataType, SchemaRef};
use arrow_schema::{ArrowError, DataType, FieldRef, SchemaRef};
use std::sync::Arc;

fn binary_capacity<T: ByteArrayType>(arrays: &[&dyn Array]) -> Capacities {
Expand Down Expand Up @@ -132,6 +131,7 @@ fn concat_dictionaries<K: ArrowDictionaryKeyType>(

fn concat_lists<OffsetSize: OffsetSizeTrait>(
arrays: &[&dyn Array],
field: &FieldRef,
) -> Result<ArrayRef, ArrowError> {
let mut output_len = 0;
let mut list_has_nulls = false;
Expand All @@ -156,7 +156,7 @@ fn concat_lists<OffsetSize: OffsetSizeTrait>(
NullBuffer::new(nulls.finish())
});

let values = lists
let values: Vec<&dyn Array> = lists
.iter()
.map(|x| x.values().as_ref())
.collect::<Vec<_>>();
Expand All @@ -165,22 +165,15 @@ fn concat_lists<OffsetSize: OffsetSizeTrait>(

// Merge value offsets from the lists
let value_offset_buffer =
OffsetBuffer::<OffsetSize>::from_lengths(lists.iter().flat_map(|x| x.offsets().lengths()))
.into_inner()
.into_inner();

let builder = ArrayDataBuilder::new(arrays[0].data_type().clone())
.len(output_len)
.nulls(lists_nulls)
// `GenericListArray` must only have 1 buffer
.buffers(vec![value_offset_buffer])
// `GenericListArray` must only have 1 child_data
.child_data(vec![concatenated_values.to_data()]);

// TODO - maybe use build_unchecked?
let array_data = builder.build()?;

let array = GenericListArray::<OffsetSize>::from(array_data);
OffsetBuffer::<OffsetSize>::from_lengths(lists.iter().flat_map(|x| x.offsets().lengths()));

let array = GenericListArray::<OffsetSize>::try_new(
Arc::clone(field),
value_offset_buffer,
concatenated_values,
lists_nulls,
)?;

Ok(Arc::new(array))
}

Expand Down Expand Up @@ -226,8 +219,8 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
_ => unreachable!("illegal dictionary key type {k}")
}
}
DataType::List(_) => concat_lists::<i32>(arrays),
DataType::LargeList(_) => concat_lists::<i64>(arrays),
DataType::List(field) => concat_lists::<i32>(arrays, field),
DataType::LargeList(field) => concat_lists::<i64>(arrays, field),
_ => {
let capacity = get_capacity(arrays, d);
concat_fallback(arrays, capacity)
Expand Down
Loading