-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Describe the bug
I am trying to use ScalarValue::List to store f64 values for the state in a UDAF. It was working well in datafusion version 32, but I upgraded to datafusion 33, only to find that the API had changed for ScalarValue::List to accept an ArrayRef rather than an Option<Vec<ScalarValue>>. I thought I had converted my code correctly, but I'm getting the following error:
Error: External(Internal("Inconsistent types in ScalarValue::iter_to_array. Expected Float64, got List([PrimitiveArray<Float64>\n[\n 43.0,\n 1.0,\n 43.0,\n 1.0,\n 43.0,\n 1.0,\n 43.0,\n 1.0,\n 43.0,\n 3.0,\n ...180 elements...,\n 62.0,\n 3.0,\n 62.0,\n 2.0,\n 63.0,\n 1.0,\n 63.0,\n 1.0,\n 63.0,\n 1.0,\n]])"))
First of all, I can't tell exactly where this error is coming from (and RUST_BACKTRACE=1 doesn't do anything), even after adding some log statements at the beginning and end of each Accumulator function (what I mean by this is that this error doesn't seem to be occurring "in the middle" of any of the Accumulator functions I've implemented).
I am serializing my list via ScalarValue::List(Arc::new(Float64Array::from(floats))) where floats: Vec<f64>
And deserializing my list via something like this:
if let ScalarValue::List(float_vals) = &v[0] {
let floats: Vec<f64> = float_vals
.as_any()
.downcast_ref::<Float64Array>()
.expect("Failed to downcast")
.values()
.collect()
}
Finally, my state_type in the create_udaf function is:
Arc::new(vec![
DataType::List(Field::new(
"item",
DataType::Float64,
true
).into())
Via logging, I seem to get successful calls to state and update_batch (update_batch doesn't concern itself with the serialized state and probably isn't part of the issue). If the issue was in the state deserialization I'd expect to see a log statement from the beginning of merge_batch but not one from the end, but I'm not seeing any of my merge_batch log statements.
I'm not sure if I'm making a mistake somewhere, or if this is a bug. But it seems like somewhere in the guts of the Accumulator, something is not working correctly. Happy to help fix the bug if one can be identified.
To Reproduce
Define a UDAF Accumulator that uses ScalarValue::List to serialize its state, and use the aggregate function.
Expected behavior
Successful aggregation of the values into a new dataframe column.
Additional context
No response