Skip to content

Commit

Permalink
allow opt-out of skip agg feature
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Aug 6, 2024
1 parent bddb641 commit 202d8a9
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 12 deletions.
9 changes: 9 additions & 0 deletions datafusion/execution/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,15 @@ impl SessionConfig {
self
}

/// Customize [`skip_partial_aggregation_probe_rows_threshold`]
///
/// [`skip_partial_aggregation_probe_rows_threshold`]: datafusion_common::config::ExecutionOptions::skip_partial_aggregation_probe_rows_threshold
pub fn with_skip_partial_aggregation_probe_rows_threshold(mut self, n: usize) -> Self {
self.options.execution.skip_partial_aggregation_probe_rows_threshold = n;
self
}


/// Insert new [ConfigExtension]
pub fn with_option_extension<T: ConfigExtension>(mut self, extension: T) -> Self {
self.options_mut().extensions.insert(extension);
Expand Down
27 changes: 15 additions & 12 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,25 +466,28 @@ impl GroupedHashAggregateStream {
// - all accumulators support input batch to intermediate
// aggregate state conversion
// - there is only one GROUP BY expressions set
// - skip_partial_aggregation_probe_rows_threshold is greater than 0
let skip_aggregation_probe = if agg.mode == AggregateMode::Partial
&& matches!(group_ordering, GroupOrdering::None)
&& accumulators
.iter()
.all(|acc| acc.supports_convert_to_state())
&& agg_group_by.is_single()
{
Some(SkipAggregationProbe::new(
context
.session_config()
.options()
.execution
.skip_partial_aggregation_probe_rows_threshold,
context
.session_config()
.options()
.execution
.skip_partial_aggregation_probe_ratio_threshold,
))
let execution_options = &context
.session_config()
.options()
.execution;
if execution_options.skip_partial_aggregation_probe_rows_threshold > 0 {
Some(SkipAggregationProbe::new(
execution_options
.skip_partial_aggregation_probe_rows_threshold,
execution_options
.skip_partial_aggregation_probe_ratio_threshold,
))
} else {
None
}
} else {
None
};
Expand Down

0 comments on commit 202d8a9

Please sign in to comment.