Skip to content

Fix: Map functions crash on out of bounds cases #16203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
316 changes: 283 additions & 33 deletions datafusion/functions-nested/src/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ use datafusion_common::cast::as_large_list_array;
use datafusion_common::cast::as_list_array;
use datafusion_common::utils::ListCoercion;
use datafusion_common::{
exec_err, internal_datafusion_err, plan_err, utils::take_function_args,
DataFusionError, Result,
exec_err, internal_datafusion_err, plan_err, utils::take_function_args, Result,
};
use datafusion_expr::{
ArrayFunctionArgument, ArrayFunctionSignature, Expr, TypeSignature,
Expand Down Expand Up @@ -213,6 +212,34 @@ fn array_element_inner(args: &[ArrayRef]) -> Result<ArrayRef> {
}
}

/// Adjusts a 1-based array index to 0-based, handling negative indices and bounds checking
fn adjusted_array_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
where
i64: TryInto<O>,
{
let index: O = match index.try_into() {
Ok(idx) => idx,
Err(_) => return exec_err!("array_element got invalid index: {}", index),
};

// Convert 1-based index to 0-based
let adjusted_zero_index = if index < O::usize_as(0) {
// Negative index: count from end
index + len
} else {
// Positive index: subtract 1 to make it 0-based
index - O::usize_as(1)
};

// Check bounds
if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
Ok(Some(adjusted_zero_index))
} else {
// Out of bounds
Ok(None)
}
}

fn general_array_element<O: OffsetSizeTrait>(
array: &GenericListArray<O>,
indexes: &Int64Array,
Expand All @@ -225,44 +252,33 @@ where
return Ok(Arc::new(NullArray::new(array.len())));
}

// Check if we have a struct with null fields
let has_struct_with_null_fields = if let DataType::Struct(fields) = values.data_type()
{
fields.iter().any(|field| field.data_type() == &Null)
} else {
false
};

// If we have problematic struct fields, we'll need to handle this differently
if has_struct_with_null_fields {
return handle_struct_with_null_fields(array, indexes);
}

let original_data = values.to_data();
let capacity = Capacities::Array(original_data.len());

// use_nulls: true, we don't construct List for array_element, so we need explicit nulls.
let mut mutable =
MutableArrayData::with_capacities(vec![&original_data], true, capacity);

fn adjusted_array_index<O: OffsetSizeTrait>(index: i64, len: O) -> Result<Option<O>>
where
i64: TryInto<O>,
{
let index: O = index.try_into().map_err(|_| {
DataFusionError::Execution(format!(
"array_element got invalid index: {index}"
))
})?;
// 0 ~ len - 1
let adjusted_zero_index = if index < O::usize_as(0) {
index + len
} else {
index - O::usize_as(1)
};

if O::usize_as(0) <= adjusted_zero_index && adjusted_zero_index < len {
Ok(Some(adjusted_zero_index))
} else {
// Out of bounds
Ok(None)
}
}

for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
let start = offset_window[0];
let end = offset_window[1];
let len = end - start;

// array is null
if len == O::usize_as(0) {
if array.is_null(row_index) || len == O::usize_as(0) || indexes.is_null(row_index)
{
mutable.extend_nulls(1);
continue;
}
Expand All @@ -282,6 +298,103 @@ where
Ok(arrow::array::make_array(data))
}

/// Handle structs with null fields to avoid MutableArrayData issues
fn handle_struct_with_null_fields<O: OffsetSizeTrait>(
array: &GenericListArray<O>,
indexes: &Int64Array,
) -> Result<ArrayRef>
where
i64: TryInto<O>,
{
use arrow::array::StructArray;

let values = array.values();
let struct_array = values.as_any().downcast_ref::<StructArray>().unwrap();

// Build the result struct arrays
let mut result_columns: Vec<ArrayRef> = Vec::new();
let mut null_buffer_builder = NullBufferBuilder::new(array.len());

// Process each field in the struct
for field in struct_array.columns().iter() {
if field.data_type() == &Null {
// For null fields, create a null array of the correct length
result_columns.push(Arc::new(NullArray::new(array.len())));
} else {
// For non-null fields, use MutableArrayData
let field_data = field.to_data();
let mut field_mutable = MutableArrayData::with_capacities(
vec![&field_data],
true,
Capacities::Array(array.len()),
);

for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
let start = offset_window[0];
let end = offset_window[1];
let len = end - start;

// array is null or empty
if array.is_null(row_index) || len == O::usize_as(0) {
field_mutable.extend_nulls(1);
continue;
}

// Check if index is null
if indexes.is_null(row_index) {
field_mutable.extend_nulls(1);
continue;
}

let index = adjusted_array_index::<O>(indexes.value(row_index), len)?;

if let Some(index) = index {
let actual_index = start.as_usize() + index.as_usize();
field_mutable.extend(0, actual_index, actual_index + 1);
} else {
// Index out of bounds
field_mutable.extend_nulls(1);
}
}

let field_result = arrow::array::make_array(field_mutable.freeze());
result_columns.push(field_result);
}
}

// Determine which rows should be null in the final result
for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
let start = offset_window[0];
let end = offset_window[1];
let len = end - start;

if array.is_null(row_index) || len == O::usize_as(0) || indexes.is_null(row_index)
{
null_buffer_builder.append_null();
continue;
}

let index = adjusted_array_index::<O>(indexes.value(row_index), len)?;
if index.is_some() {
null_buffer_builder.append_non_null();
} else {
null_buffer_builder.append_null();
}
}

// Create the result struct array
if let DataType::Struct(fields) = values.data_type() {
let result_struct = StructArray::new(
fields.clone(),
result_columns,
null_buffer_builder.finish(),
);
Ok(Arc::new(result_struct))
} else {
unreachable!("Should only be called for struct types")
}
}

#[doc = "returns a slice of the array."]
pub fn array_slice(array: Expr, begin: Expr, end: Expr, stride: Option<Expr>) -> Expr {
let args = match stride {
Expand Down Expand Up @@ -998,8 +1111,11 @@ where

#[cfg(test)]
mod tests {
use super::array_element_udf;
use super::*;
use arrow::array::{Int64Array, ListArray, StructArray};
use arrow::datatypes::{DataType, Field};
use std::sync::Arc;

use datafusion_common::{Column, DFSchema};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{Expr, ExprSchemable};
Expand All @@ -1008,13 +1124,12 @@ mod tests {
// Regression test for https://github.com/apache/datafusion/issues/13755
#[test]
fn test_array_element_return_type_fixed_size_list() {
let fixed_size_list_type = DataType::FixedSizeList(
let fixed_size_list_type = FixedSizeList(
Field::new("some_arbitrary_test_field", DataType::Int32, false).into(),
13,
);
let array_type = DataType::List(
Field::new_list_field(fixed_size_list_type.clone(), true).into(),
);
let array_type =
List(Field::new_list_field(fixed_size_list_type.clone(), true).into());
let index_type = DataType::Int64;

let schema = DFSchema::from_unqualified_fields(
Expand Down Expand Up @@ -1049,4 +1164,139 @@ mod tests {
fixed_size_list_type
);
}

#[test]
fn test_array_element_struct_with_null_fields() {
// This test reproduces the crash scenario from issue: https://github.com/apache/datafusion/issues/16187
// map_values(map([named_struct('a', 1, 'b', null)], [named_struct('a', 1, 'b', null)]))[0]

// Create a struct with one regular field and one Null-typed field
// This simulates what named_struct('a', 1, 'b', null) creates
let struct_fields = vec![
Field::new("a", DataType::Int64, true),
Field::new("b", Null, true), // This is the problematic field
];

let field_a_data = Arc::new(Int64Array::from(vec![Some(1)]));
let field_b_data = Arc::new(NullArray::new(1));

let struct_array = StructArray::new(
struct_fields.into(),
vec![field_a_data, field_b_data],
None,
);

// Create a ListArray containing this struct (simulates map_values output)
let list_field = Field::new_list_field(struct_array.data_type().clone(), true);
let list_array = ListArray::new(
Arc::new(list_field),
OffsetBuffer::new(vec![0, 1].into()),
Arc::new(struct_array),
None,
);

let index_array = Int64Array::from(vec![0]);

// This should NOT panic with the fix
let result = general_array_element::<i32>(&list_array, &index_array);

assert!(
result.is_ok(),
"array_element should not panic on struct with Null fields"
);

let result_array = result.unwrap();

// Should return a null array of the correct struct type
assert_eq!(result_array.len(), 1);
assert!(
result_array.is_null(0),
"Result should be null for problematic struct"
);
assert_eq!(
result_array.data_type(),
&DataType::Struct(
vec![
Field::new("a", DataType::Int64, true),
Field::new("b", Null, true),
]
.into()
)
);
}

#[test]
fn test_array_element_mixed_null_fields() {
let struct_fields = vec![
Field::new("valid", DataType::Utf8, true),
Field::new("null_field", Null, true),
Field::new("another_valid", DataType::Int32, true),
];

let field_1_data = Arc::new(arrow::array::StringArray::from(vec![Some("test")]));
let field_2_data = Arc::new(NullArray::new(1));
let field_3_data = Arc::new(arrow::array::Int32Array::from(vec![Some(42)]));

let struct_array = StructArray::new(
struct_fields.into(),
vec![field_1_data, field_2_data, field_3_data],
None,
);

let list_field = Field::new_list_field(struct_array.data_type().clone(), true);
let list_array = ListArray::new(
Arc::new(list_field),
OffsetBuffer::new(vec![0, 1].into()),
Arc::new(struct_array),
None,
);

let index_array = Int64Array::from(vec![0]);

let result = general_array_element::<i32>(&list_array, &index_array);
assert!(
result.is_ok(),
"Should handle mixed null fields by returning NULL"
);

let result_array = result.unwrap();
assert!(
result_array.is_null(0),
"Should return null for struct with any Null fields"
);
}

#[test]
fn test_array_element_out_of_bounds_with_null_fields() {
let struct_fields = vec![
Field::new("a", DataType::Int64, true),
Field::new("b", Null, true),
];

let field_a_data = Arc::new(Int64Array::from(vec![Some(1)]));
let field_b_data = Arc::new(NullArray::new(1));

let struct_array = StructArray::new(
struct_fields.into(),
vec![field_a_data, field_b_data],
None,
);

let list_field = Field::new_list_field(struct_array.data_type().clone(), true);
let list_array = ListArray::new(
Arc::new(list_field),
OffsetBuffer::new(vec![0, 1].into()),
Arc::new(struct_array),
None,
);

// Try to access index 5 (out of bounds)
let index_array = Int64Array::from(vec![5]);

let result = general_array_element::<i32>(&list_array, &index_array);
assert!(result.is_ok(), "Out of bounds access should not panic");

let result_array = result.unwrap();
assert!(result_array.is_null(0), "Out of bounds should return null");
}
}
Loading