Skip to content

Improve performance of other non GroupsAdapter aggregates: implement convert_to_state #11819

Closed
@alamb

Description

@alamb

Is your feature request related to a problem or challenge?

@korowa added "skip partial aggregation mode" in #11627 which helps with high cardinality aggregates by doing minimal work for the first phase of the aggregation. This mode is triggered dynamically based on how effective the first aggregation phase is working.

In order to use this new mode, the corresponding GroupsAccumulator needs to implement the convert_to_state method

/// Converts an input batch directly the intermediate aggregate state.
///
/// This is the equivalent of treating each input row as its own group. It
/// is invoked when the Partial phase of a multi-phase aggregation is not
/// reducing the cardinality enough to warrant spending more effort on
/// pre-aggregation (see `Background` section below), and switches to
/// passing intermediate state directly on to the next aggregation phase.
///
/// Examples:
/// * `COUNT`: an array of 1s for each row in the input batch.
/// * `SUM/MIN/MAX`: the input values themselves.
///
/// # Arguments
/// * `values`: the input arguments to the accumulator
/// * `opt_filter`: if present, any row where `opt_filter[i]` is false should be ignored
///
/// # Background
///
/// In a multi-phase aggregation (see [`Accumulator::state`]), the initial
/// Partial phase reduces the cardinality of the input data as soon as
/// possible in the plan.
///
/// This strategy is very effective for queries with a small number of
/// groups, as most of the data is aggregated immediately and only a small
/// amount of data must be repartitioned (see [`Accumulator::state`] for
/// background)
///
/// However, for queries with a large number of groups, the Partial phase
/// often does not reduce the cardinality enough to warrant the memory and
/// CPU cost of actually performing the aggregation. For such cases, the
/// HashAggregate operator will dynamically switch to passing intermediate
/// state directly to the next aggregation phase with minimal processing
/// using this method.
///
/// [`Accumulator::state`]: crate::Accumulator::state
fn convert_to_state(
&self,
_values: &[ArrayRef],
_opt_filter: Option<&BooleanArray>,
) -> Result<Vec<ArrayRef>> {
not_impl_err!("Input batch conversion to state not implemented")
}
/// Returns `true` if [`Self::convert_to_state`] is implemented to support
/// intermediate aggregate state conversion.
fn supports_convert_to_state(&self) -> bool {
false
}

Some aggregates implement the GroupsAccumulator interface directly, but by default they will use the GroupsAccumulatorAdapter along with the Accumulator trait

Describe the solution you'd like

Implement covert_to_state for

https://github.com/apache/datafusion/blob/b685e2d4f1f245dd1dbe468b32b115ae99316689/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs#L247-L246

Add tests in

# The main goal of these tests is to verify correctness of transforming
# input values to state by accumulators, supporting `convert_to_state`.

Describe alternatives you've considered

No response

Additional context

No response

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions