Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change ScalarValue::Struct to ArrayRef #7893

Merged
merged 12 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
714 changes: 490 additions & 224 deletions datafusion/common/src/scalar.rs

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1985,7 +1985,6 @@ mod tests {
use crate::physical_plan::{DisplayAs, SendableRecordBatchStream};
use crate::physical_planner::PhysicalPlanner;
use crate::prelude::{SessionConfig, SessionContext};
use crate::scalar::ScalarValue;
use crate::test_util::{scan_empty, scan_empty_with_partitions};
use arrow::array::{ArrayRef, DictionaryArray, Int32Array};
use arrow::datatypes::{DataType, Field, Int32Type, SchemaRef};
Expand Down Expand Up @@ -2310,10 +2309,11 @@ mod tests {

/// Return a `null` literal representing a struct type like: `{ a: bool }`
fn struct_literal() -> Expr {
let struct_literal = ScalarValue::Struct(
None,
let struct_literal = ScalarValue::try_from(DataType::Struct(
vec![Field::new("foo", DataType::Boolean, false)].into(),
);
))
.unwrap();

lit(struct_literal)
}

Expand Down
29 changes: 11 additions & 18 deletions datafusion/core/tests/user_defined/user_defined_aggregates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
//! user defined aggregate functions

use arrow::{array::AsArray, datatypes::Fields};
use arrow_array::{types::UInt64Type, Int32Array, PrimitiveArray};
use arrow_array::{types::UInt64Type, Int32Array, PrimitiveArray, StructArray};
use arrow_schema::Schema;
use std::sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -582,36 +582,29 @@ impl FirstSelector {

// Internally, keep the data types as this type
fn state_datatypes() -> Vec<DataType> {
vec![
DataType::Float64,
DataType::Timestamp(TimeUnit::Nanosecond, None),
]
vec![Self::output_datatype()]
}

/// Convert to a set of ScalarValues
fn to_state(&self) -> Vec<ScalarValue> {
vec![
ScalarValue::Float64(Some(self.value)),
ScalarValue::TimestampNanosecond(Some(self.time), None),
]
}
fn to_state(&self) -> Result<ScalarValue> {
let f64arr = Arc::new(Float64Array::from(vec![self.value])) as ArrayRef;
let timearr =
Arc::new(TimestampNanosecondArray::from(vec![self.time])) as ArrayRef;

/// return this selector as a single scalar (struct) value
fn to_scalar(&self) -> ScalarValue {
ScalarValue::Struct(Some(self.to_state()), Self::fields())
let struct_arr =
StructArray::try_new(Self::fields(), vec![f64arr, timearr], None)?;
Ok(ScalarValue::Struct(Arc::new(struct_arr)))
jayzhan211 marked this conversation as resolved.
Show resolved Hide resolved
}
}

impl Accumulator for FirstSelector {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
let state = self.to_state().into_iter().collect::<Vec<_>>();

Ok(state)
self.evaluate().map(|s| vec![s])
}

/// produce the output structure
fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(self.to_scalar())
self.to_state()
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
Expand Down
28 changes: 10 additions & 18 deletions datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@

//! Implementations for DISTINCT expressions, e.g. `COUNT(DISTINCT c)`

use arrow::datatypes::{DataType, Field};
use std::any::Any;
use std::collections::HashSet;
use std::fmt::Debug;
use std::sync::Arc;

use arrow::array::ArrayRef;
use std::collections::HashSet;
use arrow::datatypes::{DataType, Field};

use crate::aggregate::utils::down_cast_any_ref;
use crate::expressions::format_state_name;
use crate::{AggregateExpr, PhysicalExpr};

use datafusion_common::{Result, ScalarValue};
use datafusion_expr::Accumulator;

Expand Down Expand Up @@ -137,10 +138,11 @@ impl Accumulator for DistinctArrayAggAccumulator {
assert_eq!(values.len(), 1, "batch input should only include 1 column!");

let array = &values[0];
let scalars = ScalarValue::convert_array_to_scalar_vec(array)?;
for scalar in scalars {
self.values.extend(scalar)
let scalar_vec = ScalarValue::convert_array_to_scalar_vec(array)?;
for scalars in scalar_vec {
self.values.extend(scalars);
}

Ok(())
}

Expand All @@ -149,18 +151,7 @@ impl Accumulator for DistinctArrayAggAccumulator {
return Ok(());
}

assert_eq!(
states.len(),
1,
"array_agg_distinct states must contain single array"
);

let scalar_vec = ScalarValue::convert_array_to_scalar_vec(&states[0])?;
for scalars in scalar_vec {
self.values.extend(scalars)
}

Ok(())
self.update_batch(states)
}

fn evaluate(&mut self) -> Result<ScalarValue> {
Expand All @@ -187,7 +178,8 @@ mod tests {
use arrow::datatypes::{DataType, Schema};
use arrow::record_batch::RecordBatch;
use arrow_array::types::Int32Type;
use arrow_array::{Array, ListArray};
use arrow_array::Array;
use arrow_array::ListArray;
use arrow_buffer::OffsetBuffer;
use datafusion_common::utils::array_into_list_array;
use datafusion_common::{internal_err, DataFusionError};
Expand Down
80 changes: 52 additions & 28 deletions datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@ use crate::{
use arrow::array::{Array, ArrayRef};
use arrow::datatypes::{DataType, Field};
use arrow_array::cast::AsArray;
use arrow_array::{new_empty_array, StructArray};
use arrow_schema::{Fields, SortOptions};

use datafusion_common::utils::array_into_list_array;
use datafusion_common::utils::{compare_rows, get_row_at_idx};
use datafusion_common::{exec_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::Accumulator;
Expand Down Expand Up @@ -219,6 +222,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
if states.is_empty() {
return Ok(());
}

// First entry in the state is the aggregation result. Second entry
// stores values received for ordering requirement columns for each
// aggregation value inside `ARRAY_AGG` list. For each `StructArray`
Expand All @@ -241,41 +245,49 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
partition_values.push(self.values.clone().into());
partition_ordering_values.push(self.ordering_values.clone().into());

// Convert array to Scalars to sort them easily. Convert back to array at evaluation.
let array_agg_res = ScalarValue::convert_array_to_scalar_vec(array_agg_values)?;

for v in array_agg_res.into_iter() {
partition_values.push(v.into());
}

let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?;

let ordering_values = orderings.into_iter().map(|partition_ordering_rows| {
for partition_ordering_rows in orderings.into_iter() {
// Extract value from struct to ordering_rows for each group/partition
partition_ordering_rows.into_iter().map(|ordering_row| {
if let ScalarValue::Struct(Some(ordering_columns_per_row), _) = ordering_row {
Ok(ordering_columns_per_row)
} else {
exec_err!(
"Expects to receive ScalarValue::Struct(Some(..), _) but got: {:?}",
ordering_row.data_type()
)
}
}).collect::<Result<VecDeque<_>>>()
}).collect::<Result<Vec<_>>>()?;
for ordering_values in ordering_values.into_iter() {
partition_ordering_values.push(ordering_values);
let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| {
if let ScalarValue::Struct(s) = ordering_row {
let mut ordering_columns_per_row = vec![];

for column in s.columns() {
let sv = ScalarValue::try_from_array(column, 0)?;
ordering_columns_per_row.push(sv);
}

Ok(ordering_columns_per_row)
} else {
exec_err!(
"Expects to receive ScalarValue::Struct(Arc<StructArray>) but got:{:?}",
ordering_row.data_type()
)
}
}).collect::<Result<VecDeque<_>>>()?;

partition_ordering_values.push(ordering_value);
}

let sort_options = self
.ordering_req
.iter()
.map(|sort_expr| sort_expr.options)
.collect::<Vec<_>>();

(self.values, self.ordering_values) = merge_ordered_arrays(
&mut partition_values,
&mut partition_ordering_values,
&sort_options,
)?;

Ok(())
}

Expand Down Expand Up @@ -323,20 +335,32 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
impl OrderSensitiveArrayAggAccumulator {
fn evaluate_orderings(&self) -> Result<ScalarValue> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mustafasrepo Have you considered changing values: Vec<ScalarValue> and ordering_values: Vec<Vec<ScalarValue>>, from vec to vecdeque for OrderSensitiveArrayAggAccumulator too? If we can have VecDequeue here, we can avoid into() and reuse evalute_orderings() for both Ordersensitive And NthValue. They do the similar things instead one is Vec, another is VecDeque.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can file a follow on ticket to track this idea

let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
let struct_field = Fields::from(fields);

let orderings: Vec<ScalarValue> = self
.ordering_values
.iter()
.map(|ordering| {
ScalarValue::Struct(Some(ordering.clone()), struct_field.clone())
})
.collect();
let struct_type = DataType::Struct(struct_field);
let num_columns = fields.len();
let struct_field = Fields::from(fields.clone());

let mut column_wise_ordering_values = vec![];
for i in 0..num_columns {
Copy link
Contributor Author

@jayzhan211 jayzhan211 Jan 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be a better design for StructArray (previous design is based on old ScalarValue::Struct). I avoid changing the logic or data structure in this PR.

May benefit #8558?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there might be a better design for StructArray (previous design is based on old ScalarValue::Struct). I avoid changing the logic or data structure in this PR.

May benefit #8558?

I don't think it will benefit #8558 (won't harm either). However it will be a better change anyway.

let column_values = self
.ordering_values
.iter()
.map(|x| x[i].clone())
.collect::<Vec<_>>();
let array = if column_values.is_empty() {
new_empty_array(fields[i].data_type())
} else {
ScalarValue::iter_to_array(column_values.into_iter())?
};
column_wise_ordering_values.push(array);
}

// Wrap in List, so we have the same data structure ListArray(StructArray..) for group by cases
let arr = ScalarValue::new_list(&orderings, &struct_type);
Ok(ScalarValue::List(arr))
let ordering_array = StructArray::try_new(
struct_field.clone(),
column_wise_ordering_values,
None,
)?;
Ok(ScalarValue::List(Arc::new(array_into_list_array(
Arc::new(ordering_array),
))))
}
}

Expand Down
53 changes: 37 additions & 16 deletions datafusion/physical-expr/src/aggregate/nth_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ use crate::{
};

use arrow_array::cast::AsArray;
use arrow_array::ArrayRef;
use arrow_array::{new_empty_array, ArrayRef, StructArray};
use arrow_schema::{DataType, Field, Fields};
use datafusion_common::utils::get_row_at_idx;
use datafusion_common::utils::{array_into_list_array, get_row_at_idx};
use datafusion_common::{exec_err, internal_err, DataFusionError, Result, ScalarValue};
use datafusion_expr::Accumulator;

Expand Down Expand Up @@ -271,7 +271,14 @@ impl Accumulator for NthValueAccumulator {
let ordering_values = orderings.into_iter().map(|partition_ordering_rows| {
// Extract value from struct to ordering_rows for each group/partition
partition_ordering_rows.into_iter().map(|ordering_row| {
if let ScalarValue::Struct(Some(ordering_columns_per_row), _) = ordering_row {
if let ScalarValue::Struct(s) = ordering_row {
let mut ordering_columns_per_row = vec![];

for column in s.columns() {
let sv = ScalarValue::try_from_array(column, 0)?;
ordering_columns_per_row.push(sv);
}

Ok(ordering_columns_per_row)
} else {
exec_err!(
Expand Down Expand Up @@ -306,7 +313,7 @@ impl Accumulator for NthValueAccumulator {
fn state(&mut self) -> Result<Vec<ScalarValue>> {
let mut result = vec![self.evaluate_values()];
if !self.ordering_req.is_empty() {
result.push(self.evaluate_orderings());
result.push(self.evaluate_orderings()?);
}
Ok(result)
}
Expand Down Expand Up @@ -355,21 +362,35 @@ impl Accumulator for NthValueAccumulator {
}

impl NthValueAccumulator {
fn evaluate_orderings(&self) -> ScalarValue {
fn evaluate_orderings(&self) -> Result<ScalarValue> {
let fields = ordering_fields(&self.ordering_req, &self.datatypes[1..]);
let struct_field = Fields::from(fields);
let struct_field = Fields::from(fields.clone());

let orderings = self
.ordering_values
.iter()
.map(|ordering| {
ScalarValue::Struct(Some(ordering.clone()), struct_field.clone())
})
.collect::<Vec<_>>();
let struct_type = DataType::Struct(struct_field);
let mut column_wise_ordering_values = vec![];
let num_columns = fields.len();
for i in 0..num_columns {
let column_values = self
.ordering_values
.iter()
.map(|x| x[i].clone())
.collect::<Vec<_>>();
let array = if column_values.is_empty() {
new_empty_array(fields[i].data_type())
} else {
ScalarValue::iter_to_array(column_values.into_iter())?
};
column_wise_ordering_values.push(array);
}

let ordering_array = StructArray::try_new(
struct_field.clone(),
column_wise_ordering_values,
None,
)?;

// Wrap in List, so we have the same data structure ListArray(StructArray..) for group by cases
ScalarValue::List(ScalarValue::new_list(&orderings, &struct_type))
Ok(ScalarValue::List(Arc::new(array_into_list_array(
Arc::new(ordering_array),
))))
}

fn evaluate_values(&self) -> ScalarValue {
Expand Down
Loading
Loading