Skip to content

Commit

Permalink
Minor: refactor probe check into function should_skip_aggregation (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb authored Aug 6, 2024
1 parent eb2b5fe commit a1645c4
Showing 1 changed file with 19 additions and 14 deletions.
33 changes: 19 additions & 14 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,11 +635,7 @@ impl Stream for GroupedHashAggregateStream {
(
if self.input_done {
ExecutionState::Done
} else if self
.skip_aggregation_probe
.as_ref()
.is_some_and(|probe| probe.should_skip())
{
} else if self.should_skip_aggregation() {
ExecutionState::SkippingAggregation
} else {
ExecutionState::ReadingInput
Expand Down Expand Up @@ -955,12 +951,13 @@ impl GroupedHashAggregateStream {
Ok(())
}

// Updates skip aggregation probe state.
// In case stream has any spills, the probe is forcefully set to
// forbid aggregation skipping, and locked, since spilling resets
// total number of unique groups.
//
// Note: currently spilling is not supported for Partial aggregation
/// Updates skip aggregation probe state.
///
/// In case stream has any spills, the probe is forcefully set to
/// forbid aggregation skipping, and locked, since spilling resets
/// total number of unique groups.
///
/// Note: currently spilling is not supported for Partial aggregation
fn update_skip_aggregation_probe(&mut self, input_rows: usize) {
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
if !self.spill_state.spills.is_empty() {
Expand All @@ -971,8 +968,8 @@ impl GroupedHashAggregateStream {
};
}

// In case the probe indicates that aggregation may be
// skipped, forces stream to produce currently accumulated output.
/// In case the probe indicates that aggregation may be
/// skipped, forces stream to produce currently accumulated output.
fn switch_to_skip_aggregation(&mut self) -> Result<()> {
if let Some(probe) = self.skip_aggregation_probe.as_mut() {
if probe.should_skip() {
Expand All @@ -984,7 +981,15 @@ impl GroupedHashAggregateStream {
Ok(())
}

// Transforms input batch to intermediate aggregate state, without grouping it
/// Returns true if the aggregation probe indicates that aggregation
/// should be skipped.
fn should_skip_aggregation(&self) -> bool {
self.skip_aggregation_probe
.as_ref()
.is_some_and(|probe| probe.should_skip())
}

/// Transforms input batch to intermediate aggregate state, without grouping it
fn transform_to_states(&self, batch: RecordBatch) -> Result<RecordBatch> {
let group_values = evaluate_group_by(&self.group_by, &batch)?;
let input_values = evaluate_many(&self.aggregate_arguments, &batch)?;
Expand Down

0 comments on commit a1645c4

Please sign in to comment.