Skip to content

Commit 6cf3bf0

Browse files
committed
Also add grouping case
1 parent da3c2d5 commit 6cf3bf0

File tree

2 files changed

+6
-3
lines changed

2 files changed

+6
-3
lines changed

datafusion/physical-plan/src/aggregates/no_grouping.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,7 @@ impl AggregateStream {
7777
let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
7878
let input = agg.input.execute(partition, Arc::clone(&context))?;
7979

80-
// Only wrap no‐grouping aggregates in our YieldStream
81-
// (grouped aggregates tend to produce small streams
82-
// and can rely on Tokio's own task‐yielding)
80+
// Wrap no‐grouping aggregates in our YieldStream
8381
let input = Box::pin(YieldStream::new(input)) as SendableRecordBatchStream;
8482

8583
let aggregate_expressions = aggregate_expressions(&agg.aggr_expr, &agg.mode, 0)?;

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ use datafusion_physical_expr::{GroupsAccumulatorAdapter, PhysicalSortExpr};
4949

5050
use super::order::GroupOrdering;
5151
use super::AggregateExec;
52+
use crate::aggregates::no_grouping::YieldStream;
5253
use datafusion_physical_expr::aggregate::AggregateFunctionExpr;
5354
use datafusion_physical_expr_common::sort_expr::LexOrdering;
5455
use futures::ready;
@@ -448,6 +449,10 @@ impl GroupedHashAggregateStream {
448449

449450
let batch_size = context.session_config().batch_size();
450451
let input = agg.input.execute(partition, Arc::clone(&context))?;
452+
453+
// Wrap grouping aggregates in our YieldStream
454+
let input = Box::pin(YieldStream::new(input)) as SendableRecordBatchStream;
455+
451456
let baseline_metrics = BaselineMetrics::new(&agg.metrics, partition);
452457

453458
let timer = baseline_metrics.elapsed_compute().timer();

0 commit comments

Comments
 (0)