Skip to content

ScalarValue::List not working as expected in UDAF state #8472

@rspears74

Description

@rspears74

Describe the bug

I am trying to use ScalarValue::List to store f64 values for the state in a UDAF. It was working well in datafusion version 32, but I upgraded to datafusion 33, only to find that the API had changed for ScalarValue::List to accept an ArrayRef rather than an Option<Vec<ScalarValue>>. I thought I had converted my code correctly, but I'm getting the following error:

Error: External(Internal("Inconsistent types in ScalarValue::iter_to_array. Expected Float64, got List([PrimitiveArray<Float64>\n[\n  43.0,\n  1.0,\n  43.0,\n  1.0,\n  43.0,\n  1.0,\n  43.0,\n  1.0,\n  43.0,\n  3.0,\n  ...180 elements...,\n  62.0,\n  3.0,\n  62.0,\n  2.0,\n  63.0,\n  1.0,\n  63.0,\n  1.0,\n  63.0,\n  1.0,\n]])"))

First of all, I can't tell exactly where this error is coming from (and RUST_BACKTRACE=1 doesn't do anything), even after adding some log statements at the beginning and end of each Accumulator function (what I mean by this is that this error doesn't seem to be occurring "in the middle" of any of the Accumulator functions I've implemented).

I am serializing my list via ScalarValue::List(Arc::new(Float64Array::from(floats))) where floats: Vec<f64>
And deserializing my list via something like this:

if let ScalarValue::List(float_vals) = &v[0] {
  let floats: Vec<f64> = float_vals
    .as_any()
    .downcast_ref::<Float64Array>()
    .expect("Failed to downcast")
    .values()
    .collect()
}

Finally, my state_type in the create_udaf function is:

Arc::new(vec![
    DataType::List(Field::new(
        "item",
        DataType::Float64,
        true
    ).into())

Via logging, I seem to get successful calls to state and update_batch (update_batch doesn't concern itself with the serialized state and probably isn't part of the issue). If the issue was in the state deserialization I'd expect to see a log statement from the beginning of merge_batch but not one from the end, but I'm not seeing any of my merge_batch log statements.

I'm not sure if I'm making a mistake somewhere, or if this is a bug. But it seems like somewhere in the guts of the Accumulator, something is not working correctly. Happy to help fix the bug if one can be identified.

To Reproduce

Define a UDAF Accumulator that uses ScalarValue::List to serialize its state, and use the aggregate function.

Expected behavior

Successful aggregation of the values into a new dataframe column.

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions