Skip to content

Return an error on overflow in do_append_val_inner #16201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use arrow::array::{
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
use arrow::datatypes::{ByteArrayType, DataType, GenericBinaryType};
use datafusion_common::utils::proxy::VecAllocExt;
use datafusion_common::{DataFusionError, Result};
use datafusion_physical_expr_common::binary_map::{OutputType, INITIAL_BUFFER_CAPACITY};
use itertools::izip;
use std::mem::size_of;
Expand Down Expand Up @@ -80,7 +81,7 @@ where
self.do_equal_to_inner(lhs_row, array, rhs_row)
}

fn append_val_inner<B>(&mut self, array: &ArrayRef, row: usize)
fn append_val_inner<B>(&mut self, array: &ArrayRef, row: usize) -> Result<()>
where
B: ByteArrayType,
{
Expand All @@ -92,8 +93,10 @@ where
self.offsets.push(O::usize_as(offset));
} else {
self.nulls.append(false);
self.do_append_val_inner(arr, row);
self.do_append_val_inner(arr, row)?;
}

Ok(())
}

fn vectorized_equal_to_inner<B>(
Expand Down Expand Up @@ -123,7 +126,11 @@ where
}
}

fn vectorized_append_inner<B>(&mut self, array: &ArrayRef, rows: &[usize])
fn vectorized_append_inner<B>(
&mut self,
array: &ArrayRef,
rows: &[usize],
) -> Result<()>
where
B: ByteArrayType,
{
Expand All @@ -141,22 +148,14 @@ where
match all_null_or_non_null {
None => {
for &row in rows {
if arr.is_null(row) {
self.nulls.append(true);
// nulls need a zero length in the offset buffer
let offset = self.buffer.len();
self.offsets.push(O::usize_as(offset));
} else {
self.nulls.append(false);
self.do_append_val_inner(arr, row);
}
self.append_val_inner::<B>(array, row)?
}
}

Some(true) => {
self.nulls.append_n(rows.len(), false);
for &row in rows {
self.do_append_val_inner(arr, row);
self.do_append_val_inner(arr, row)?;
}
}

Expand All @@ -168,6 +167,8 @@ where
self.offsets.resize(new_len, O::usize_as(offset));
}
}

Ok(())
}

fn do_equal_to_inner<B>(
Expand All @@ -188,20 +189,26 @@ where
self.value(lhs_row) == (array.value(rhs_row).as_ref() as &[u8])
}

fn do_append_val_inner<B>(&mut self, array: &GenericByteArray<B>, row: usize)
fn do_append_val_inner<B>(
&mut self,
array: &GenericByteArray<B>,
row: usize,
) -> Result<()>
where
B: ByteArrayType,
{
let value: &[u8] = array.value(row).as_ref();
self.buffer.append_slice(value);

assert!(
self.buffer.len() <= self.max_buffer_size,
"offset overflow, buffer size > {}",
self.max_buffer_size
);
if self.buffer.len() > self.max_buffer_size {
return Err(DataFusionError::Execution(format!(
"offset overflow, buffer size > {}",
self.max_buffer_size
)));
}

self.offsets.push(O::usize_as(self.buffer.len()));
Ok(())
}

/// return the current value of the specified row irrespective of null
Expand Down Expand Up @@ -238,25 +245,27 @@ where
}
}

fn append_val(&mut self, column: &ArrayRef, row: usize) {
fn append_val(&mut self, column: &ArrayRef, row: usize) -> Result<()> {
// Sanity array type
match self.output_type {
OutputType::Binary => {
debug_assert!(matches!(
column.data_type(),
DataType::Binary | DataType::LargeBinary
));
self.append_val_inner::<GenericBinaryType<O>>(column, row)
self.append_val_inner::<GenericBinaryType<O>>(column, row)?
}
OutputType::Utf8 => {
debug_assert!(matches!(
column.data_type(),
DataType::Utf8 | DataType::LargeUtf8
));
self.append_val_inner::<GenericStringType<O>>(column, row)
self.append_val_inner::<GenericStringType<O>>(column, row)?
}
_ => unreachable!("View types should use `ArrowBytesViewMap`"),
};

Ok(())
}

fn vectorized_equal_to(
Expand Down Expand Up @@ -296,24 +305,26 @@ where
}
}

fn vectorized_append(&mut self, column: &ArrayRef, rows: &[usize]) {
fn vectorized_append(&mut self, column: &ArrayRef, rows: &[usize]) -> Result<()> {
match self.output_type {
OutputType::Binary => {
debug_assert!(matches!(
column.data_type(),
DataType::Binary | DataType::LargeBinary
));
self.vectorized_append_inner::<GenericBinaryType<O>>(column, rows)
self.vectorized_append_inner::<GenericBinaryType<O>>(column, rows)?
}
OutputType::Utf8 => {
debug_assert!(matches!(
column.data_type(),
DataType::Utf8 | DataType::LargeUtf8
));
self.vectorized_append_inner::<GenericStringType<O>>(column, rows)
self.vectorized_append_inner::<GenericStringType<O>>(column, rows)?
}
_ => unreachable!("View types should use `ArrowBytesViewMap`"),
};

Ok(())
}

fn len(&self) -> usize {
Expand Down Expand Up @@ -421,12 +432,12 @@ mod tests {

use crate::aggregates::group_values::multi_group_by::bytes::ByteGroupValueBuilder;
use arrow::array::{ArrayRef, NullBufferBuilder, StringArray};
use datafusion_common::DataFusionError;
use datafusion_physical_expr::binary_map::OutputType;

use super::GroupColumn;

#[test]
#[should_panic]
fn test_byte_group_value_builder_overflow() {
let mut builder = ByteGroupValueBuilder::<i32>::new(OutputType::Utf8);

Expand All @@ -435,31 +446,36 @@ mod tests {
let array =
Arc::new(StringArray::from(vec![Some(large_string.as_str())])) as ArrayRef;

// Append items until our buffer length is 1 + i32::MAX as usize
for _ in 0..2048 {
builder.append_val(&array, 0);
// Append items until our buffer length is i32::MAX as usize
for _ in 0..2047 {
builder.append_val(&array, 0).unwrap();
}

assert_eq!(builder.value(2047), large_string.as_bytes());
assert!(matches!(
builder.append_val(&array, 0),
Err(DataFusionError::Execution(e)) if e.contains("offset overflow")
));

assert_eq!(builder.value(2046), large_string.as_bytes());
}

#[test]
fn test_byte_take_n() {
let mut builder = ByteGroupValueBuilder::<i32>::new(OutputType::Utf8);
let array = Arc::new(StringArray::from(vec![Some("a"), None])) as ArrayRef;
// a, null, null
builder.append_val(&array, 0);
builder.append_val(&array, 1);
builder.append_val(&array, 1);
builder.append_val(&array, 0).unwrap();
builder.append_val(&array, 1).unwrap();
builder.append_val(&array, 1).unwrap();

// (a, null) remaining: null
let output = builder.take_n(2);
assert_eq!(&output, &array);

// null, a, null, a
builder.append_val(&array, 0);
builder.append_val(&array, 1);
builder.append_val(&array, 0);
builder.append_val(&array, 0).unwrap();
builder.append_val(&array, 1).unwrap();
builder.append_val(&array, 0).unwrap();

// (null, a) remaining: (null, a)
let output = builder.take_n(2);
Expand All @@ -473,9 +489,9 @@ mod tests {
])) as ArrayRef;

// null, a, longstringfortest, null, null
builder.append_val(&array, 2);
builder.append_val(&array, 1);
builder.append_val(&array, 1);
builder.append_val(&array, 2).unwrap();
builder.append_val(&array, 1).unwrap();
builder.append_val(&array, 1).unwrap();

// (null, a, longstringfortest, null) remaining: (null)
let output = builder.take_n(4);
Expand All @@ -494,7 +510,7 @@ mod tests {
builder_array: &ArrayRef,
append_rows: &[usize]| {
for &index in append_rows {
builder.append_val(builder_array, index);
builder.append_val(builder_array, index).unwrap();
}
};

Expand All @@ -517,7 +533,9 @@ mod tests {
let append = |builder: &mut ByteGroupValueBuilder<i32>,
builder_array: &ArrayRef,
append_rows: &[usize]| {
builder.vectorized_append(builder_array, append_rows);
builder
.vectorized_append(builder_array, append_rows)
.unwrap();
};

let equal_to = |builder: &ByteGroupValueBuilder<i32>,
Expand Down Expand Up @@ -551,7 +569,9 @@ mod tests {
None,
None,
])) as _;
builder.vectorized_append(&all_nulls_input_array, &[0, 1, 2, 3, 4]);
builder
.vectorized_append(&all_nulls_input_array, &[0, 1, 2, 3, 4])
.unwrap();

let mut equal_to_results = vec![true; all_nulls_input_array.len()];
builder.vectorized_equal_to(
Expand All @@ -575,7 +595,9 @@ mod tests {
Some("string4"),
Some("string5"),
])) as _;
builder.vectorized_append(&all_not_nulls_input_array, &[0, 1, 2, 3, 4]);
builder
.vectorized_append(&all_not_nulls_input_array, &[0, 1, 2, 3, 4])
.unwrap();

let mut equal_to_results = vec![true; all_not_nulls_input_array.len()];
builder.vectorized_equal_to(
Expand Down
Loading