Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for conflicting order sensitive aggregates in ARRAY_AGG aggregate function #8558

Closed
wants to merge 56 commits into from
Closed
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
8d8249e
Array agg ordered initial implementation
mustafasrepo Nov 30, 2023
8f54037
Minor changes
mustafasrepo Nov 30, 2023
7484c10
Add tests for first_value, last_value
mustafasrepo Nov 30, 2023
071b266
Add new tests
mustafasrepo Dec 1, 2023
6b77b40
Add first, last tests
mustafasrepo Dec 1, 2023
49d5f27
Minor changes
mustafasrepo Dec 1, 2023
7704b97
Simplifications
mustafasrepo Dec 1, 2023
b803a6f
Minor changes
mustafasrepo Dec 1, 2023
5913e0c
Minor changes
mustafasrepo Dec 1, 2023
4a4f1f8
Minor changes
mustafasrepo Dec 1, 2023
18815ea
Resolve linter errors
mustafasrepo Dec 1, 2023
edf5c5c
Remove clone
mustafasrepo Dec 1, 2023
ea6d94a
Remove unused code
mustafasrepo Dec 1, 2023
ffbe3bc
Add grouping support
mustafasrepo Dec 4, 2023
5c05f50
Implement grouping algorithm
mustafasrepo Dec 5, 2023
d7ead93
Minor changes
mustafasrepo Dec 5, 2023
019b9ba
Add reverse support
mustafasrepo Dec 5, 2023
e4b9ebc
Minor changes
mustafasrepo Dec 5, 2023
e2ee90a
Minor changes
mustafasrepo Dec 5, 2023
e0f2889
All cli tests pass
mustafasrepo Dec 5, 2023
16af57f
Simplifications
mustafasrepo Dec 5, 2023
d442e84
Resolve linter errors
mustafasrepo Dec 5, 2023
8c7b316
Minor changes
mustafasrepo Dec 5, 2023
b4003da
Minor changes
mustafasrepo Dec 5, 2023
69e5f59
Update comments, cleanup
mustafasrepo Dec 6, 2023
b848f4e
Add proper indices handling
mustafasrepo Dec 6, 2023
80a160c
Minnor changes
mustafasrepo Dec 6, 2023
89e0c2b
Minor changes
mustafasrepo Dec 6, 2023
299e89c
Minor changes
mustafasrepo Dec 6, 2023
a2723e5
Minor changes
mustafasrepo Dec 6, 2023
373dd88
Remove requirement originating from order sensitive aggregators
mustafasrepo Dec 6, 2023
6836e02
Merge branch 'apache_main' into feature/conflicting_order_sensitive_a…
mustafasrepo Dec 7, 2023
f375bc4
Minor changes
mustafasrepo Dec 7, 2023
96ac322
Minor changes
mustafasrepo Dec 7, 2023
163b056
Minor changes
mustafasrepo Dec 7, 2023
b813071
Minor changes
mustafasrepo Dec 7, 2023
1754719
Merge branch 'apache_main' into feature/conflicting_order_sensitive_a…
mustafasrepo Dec 13, 2023
eb97d99
Fix changing tests
mustafasrepo Dec 13, 2023
43c33ae
Initial review
metesynnada Dec 13, 2023
be85af3
Partial not working commit
metesynnada Dec 13, 2023
74825e9
Simplifications
mustafasrepo Dec 13, 2023
9703390
Update mod.rs
metesynnada Dec 13, 2023
f58b760
Minor changes
mustafasrepo Dec 13, 2023
585fdc8
No grouping refactor
metesynnada Dec 13, 2023
8c25f0d
Merge branch 'feature/conflicting_order_sensitive_aggregates' of http…
metesynnada Dec 13, 2023
184865e
Minor changes
mustafasrepo Dec 13, 2023
b5a250c
Minor changes
mustafasrepo Dec 13, 2023
77b5c06
Minor changes
mustafasrepo Dec 13, 2023
86cf0b9
instead of input use equivalence
mustafasrepo Dec 13, 2023
2ce128d
Update proto
mustafasrepo Dec 13, 2023
bda3e2a
Merge branch 'apache_main' into feature/conflicting_order_sensitive_a…
mustafasrepo Dec 14, 2023
16596b1
Fix test display
mustafasrepo Dec 14, 2023
42e3631
Merge branch 'apache_main' into feature/conflicting_order_sensitive_a…
mustafasrepo Dec 15, 2023
b378f80
Minor
metesynnada Dec 15, 2023
c0dfe04
Merge branch 'apache_main' into feature/conflicting_order_sensitive_a…
mustafasrepo Dec 17, 2023
48fe203
Merge branch 'apache_main' into feature/conflicting_order_sensitive_a…
mustafasrepo Dec 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 50 additions & 42 deletions datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::collections::BinaryHeap;
use std::fmt::Debug;
use std::sync::Arc;

use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
use crate::aggregate::utils::{down_cast_any_ref, get_sort_options, ordering_fields};
use crate::expressions::format_state_name;
use crate::{AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr};

Expand Down Expand Up @@ -192,14 +192,30 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
if values.is_empty() {
return Ok(());
}
let value_array_ref = &values[0];
let ordering_array_refs = &values[1..];

let n_row = values[0].len();
for index in 0..n_row {
let row = get_row_at_idx(values, index)?;
self.values.push(row[0].clone());
self.ordering_values.push(row[1..].to_vec());
}
let num_rows = value_array_ref.len();
// Convert &[ArrayRef] to Vec<Vec<ScalarValue>>
let new_ordering_values = (0..num_rows)
.map(|idx| get_row_at_idx(ordering_array_refs, idx))
.collect::<Result<Vec<_>>>()?;

// Convert ArrayRef to Vec<ScalarValue>
let new_scalar_values = (0..num_rows)
.map(|idx| ScalarValue::try_from_array(value_array_ref, idx))
.collect::<Result<Vec<_>>>()?;

let sort_options = get_sort_options(&self.ordering_req);

// Merge new values and new orderings
let (merged_values, merged_ordering_values) = merge_ordered_arrays(
&[&self.values, &new_scalar_values],
&[&self.ordering_values, &new_ordering_values],
&sort_options,
)?;
self.values = merged_values;
self.ordering_values = merged_ordering_values;
Ok(())
}

Expand All @@ -215,45 +231,37 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
let agg_orderings = &states[1];

if let Some(agg_orderings) = agg_orderings.as_list_opt::<i32>() {
// Stores ARRAY_AGG results coming from each partition
let mut partition_values = vec![];
// Stores ordering requirement expression results coming from each partition
let mut partition_ordering_values = vec![];

// Existing values should be merged also.
partition_values.push(self.values.clone());
partition_ordering_values.push(self.ordering_values.clone());
// Stores ARRAY_AGG results coming from each partition. Existing values should be merged also.
let mut partition_values = vec![self.values.as_slice()];

let array_agg_res =
ScalarValue::convert_array_to_scalar_vec(array_agg_values)?;

for v in array_agg_res.into_iter() {
partition_values.push(v);
}
partition_values.extend(array_agg_res.iter().map(|v| v.as_slice()));

let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?;
// Stores ordering requirement expression results coming from each partition. Existing values should be merged also.
let mut partition_ordering_values = vec![self.ordering_values.as_slice()];

for partition_ordering_rows in orderings.into_iter() {
// Extract value from struct to ordering_rows for each group/partition
let ordering_value = partition_ordering_rows.into_iter().map(|ordering_row| {
if let ScalarValue::Struct(Some(ordering_columns_per_row), _) = ordering_row {
Ok(ordering_columns_per_row)
} else {
exec_err!(
let orderings = ScalarValue::convert_array_to_scalar_vec(agg_orderings)?
.into_iter()
.map(|partition_ordering_rows| {
partition_ordering_rows
.into_iter()
.map(|ordering_row| match ordering_row {
ScalarValue::Struct(Some(ordering_columns_per_row), _) =>
Ok(ordering_columns_per_row),
_ => exec_err!(
"Expects to receive ScalarValue::Struct(Some(..), _) but got:{:?}",
ordering_row.data_type()
)
}
}).collect::<Result<Vec<_>>>()?;
),
})
.collect::<Result<Vec<_>>>()
})
.collect::<Result<Vec<_>>>()?;

partition_ordering_values.push(ordering_value);
}
partition_ordering_values.extend(orderings.iter().map(|v| v.as_slice()));

let sort_options = self
.ordering_req
.iter()
.map(|sort_expr| sort_expr.options)
.collect::<Vec<_>>();
let sort_options = get_sort_options(&self.ordering_req);
let (new_values, new_orderings) = merge_ordered_arrays(
&partition_values,
&partition_ordering_values,
Expand Down Expand Up @@ -413,11 +421,11 @@ impl<'a> PartialOrd for CustomElement<'a> {
/// Inner `Vec<ScalarValue>`s of the `ordering_values` will be compared according `sort_options` (Their sizes should match)
fn merge_ordered_arrays(
// We will merge values into single `Vec<ScalarValue>`.
values: &[Vec<ScalarValue>],
values: &[&[ScalarValue]],
// `values` will be merged according to `ordering_values`.
// Inner `Vec<ScalarValue>` can be thought as ordering information for the
// each `ScalarValue` in the values`.
ordering_values: &[Vec<Vec<ScalarValue>>],
ordering_values: &[&[Vec<ScalarValue>]],
// Defines according to which ordering comparisons should be done.
sort_options: &[SortOptions],
) -> Result<(Vec<ScalarValue>, Vec<Vec<ScalarValue>>)> {
Expand Down Expand Up @@ -554,8 +562,8 @@ mod tests {
];

let (merged_vals, merged_ts) = merge_ordered_arrays(
&[lhs_vals, rhs_vals],
&[lhs_orderings, rhs_orderings],
&[&lhs_vals, &rhs_vals],
&[&lhs_orderings, &rhs_orderings],
&sort_options,
)?;
let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
Expand Down Expand Up @@ -621,8 +629,8 @@ mod tests {
Arc::new(Int64Array::from(vec![4, 4, 3, 3, 2, 2, 1, 1, 0, 0])) as ArrayRef,
];
let (merged_vals, merged_ts) = merge_ordered_arrays(
&[lhs_vals, rhs_vals],
&[lhs_orderings, rhs_orderings],
&[&lhs_vals, &rhs_vals],
&[&lhs_orderings, &rhs_orderings],
&sort_options,
)?;
let merged_vals = ScalarValue::iter_to_array(merged_vals.into_iter())?;
Expand Down
112 changes: 84 additions & 28 deletions datafusion/physical-expr/src/aggregate/first_last.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@
use std::any::Any;
use std::sync::Arc;

use crate::aggregate::utils::{down_cast_any_ref, ordering_fields};
use crate::aggregate::utils::{down_cast_any_ref, get_sort_options, ordering_fields};
use crate::expressions::format_state_name;
use crate::{
reverse_order_bys, AggregateExpr, LexOrdering, PhysicalExpr, PhysicalSortExpr,
reverse_order_bys, AggregateExpr, LexOrdering, LexOrderingRef, PhysicalExpr,
PhysicalSortExpr,
};

use arrow::array::{Array, ArrayRef, AsArray, BooleanArray};
use arrow::compute::{self, lexsort_to_indices, SortColumn};
use arrow::datatypes::{DataType, Field};
use arrow_schema::SortOptions;
use datafusion_common::utils::{compare_rows, get_arrayref_at_indices, get_row_at_idx};
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_expr::Accumulator;
Expand Down Expand Up @@ -211,10 +211,25 @@ impl FirstValueAccumulator {
}

// Updates state with the values in the given row.
fn update_with_new_row(&mut self, row: &[ScalarValue]) {
self.first = row[0].clone();
self.orderings = row[1..].to_vec();
self.is_set = true;
fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
let value = &row[0];
let orderings = &row[1..];
// Update when
// - no entry in the state
// - There is an earlier entry in according to requirements
if !self.is_set
|| compare_rows(
&self.orderings,
orderings,
&get_sort_options(&self.ordering_req),
)?
.is_gt()
{
self.first = value.clone();
self.orderings = orderings.to_vec();
self.is_set = true;
}
Ok(())
}
}

Expand All @@ -227,11 +242,11 @@ impl Accumulator for FirstValueAccumulator {
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
// If we have seen first value, we shouldn't update it
if !values[0].is_empty() && !self.is_set {
let row = get_row_at_idx(values, 0)?;
// Update with first value in the array.
self.update_with_new_row(&row);
if let Some(first_idx) =
get_value_idx::<true>(values, &self.ordering_req, self.is_set)?
{
let row = get_row_at_idx(values, first_idx)?;
self.update_with_new_row(&row)?;
}
Ok(())
}
Expand Down Expand Up @@ -265,7 +280,7 @@ impl Accumulator for FirstValueAccumulator {
// Update with first value in the state. Note that we should exclude the
// is_set flag from the state. Otherwise, we will end up with a state
// containing two is_set flags.
self.update_with_new_row(&first_row[0..is_set_idx]);
self.update_with_new_row(&first_row[0..is_set_idx])?;
}
}
Ok(())
Expand Down Expand Up @@ -459,10 +474,27 @@ impl LastValueAccumulator {
}

// Updates state with the values in the given row.
fn update_with_new_row(&mut self, row: &[ScalarValue]) {
self.last = row[0].clone();
self.orderings = row[1..].to_vec();
self.is_set = true;
fn update_with_new_row(&mut self, row: &[ScalarValue]) -> Result<()> {
let value = &row[0];
let orderings = &row[1..];
// Update when
// - no value in the state
// - There is no specific requirement, but a new value (most recent entry in terms of execution)
// - There is a more recent entry in terms of requirement
if !self.is_set
|| self.orderings.is_empty()
|| compare_rows(
&self.orderings,
orderings,
&get_sort_options(&self.ordering_req),
)?
.is_lt()
{
self.last = value.clone();
self.orderings = orderings.to_vec();
self.is_set = true;
}
Ok(())
}
}

Expand All @@ -475,10 +507,11 @@ impl Accumulator for LastValueAccumulator {
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if !values[0].is_empty() {
let row = get_row_at_idx(values, values[0].len() - 1)?;
// Update with last value in the array.
self.update_with_new_row(&row);
if let Some(last_idx) =
get_value_idx::<false>(values, &self.ordering_req, self.is_set)?
{
let row = get_row_at_idx(values, last_idx)?;
self.update_with_new_row(&row)?;
}
Ok(())
}
Expand Down Expand Up @@ -515,7 +548,7 @@ impl Accumulator for LastValueAccumulator {
// Update with last value in the state. Note that we should exclude the
// is_set flag from the state. Otherwise, we will end up with a state
// containing two is_set flags.
self.update_with_new_row(&last_row[0..is_set_idx]);
self.update_with_new_row(&last_row[0..is_set_idx])?;
}
}
Ok(())
Expand Down Expand Up @@ -559,12 +592,35 @@ fn convert_to_sort_cols(
.collect::<Vec<_>>()
}

/// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions> {
ordering_req
.iter()
.map(|item| item.options)
.collect::<Vec<_>>()
/// Gets either first, or last value index inside `values` batch according to ordering requirements
/// Assumes `values` batch is ordered according to ordering_req already.
///
/// # Parameters
///
/// - `values`: A slice of `ArrayRef` representing the values to be processed. (Columns of record batch)
/// - `ordering_req`: A lexical ordering reference specifying the required ordering of values.
/// - `is_set`: Whether any value is stored in the state for `first value` or `last value` (At the beginning this is false.).
///
/// # Returns
///
/// A `Result` containing an `Option<usize>`. If successful, the `Option` holds the index of the
/// desired value. Returns `None` to indicate no existing value doesn't need to be updated.
fn get_value_idx<const FIRST: bool>(
values: &[ArrayRef],
ordering_req: LexOrderingRef,
is_set: bool,
) -> Result<Option<usize>> {
let value_array_ref = &values[0];
// Return None for empty batches or when no ordering is specified and is_set is true.
if value_array_ref.is_empty() || (is_set && FIRST && ordering_req.is_empty()) {
return Ok(None);
}
Ok(Some(if FIRST {
0
} else {
// LAST
value_array_ref.len() - 1
}))
}

#[cfg(test)]
Expand Down
10 changes: 9 additions & 1 deletion datafusion/physical-expr/src/aggregate/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use arrow_array::types::{
};
use arrow_array::ArrowNativeTypeOp;
use arrow_buffer::ArrowNativeType;
use arrow_schema::{DataType, Field};
use arrow_schema::{DataType, Field, SortOptions};
use datafusion_common::{exec_err, DataFusionError, Result};
use datafusion_expr::Accumulator;
use std::any::Any;
Expand Down Expand Up @@ -205,3 +205,11 @@ pub(crate) fn ordering_fields(
})
.collect()
}

/// Selects the sort option attribute from all the given `PhysicalSortExpr`s.
pub fn get_sort_options(ordering_req: &[PhysicalSortExpr]) -> Vec<SortOptions> {
ordering_req
.iter()
.map(|item| item.options)
.collect::<Vec<_>>()
}
Loading