File tree Expand file tree Collapse file tree 2 files changed +7
-13
lines changed
datafusion/physical-plan/src/aggregates Expand file tree Collapse file tree 2 files changed +7
-13
lines changed Original file line number Diff line number Diff line change @@ -47,7 +47,6 @@ use datafusion_physical_expr::{
4747 PhysicalSortRequirement ,
4848} ;
4949
50- use crate :: aggregates:: no_grouping:: YieldStream ;
5150use datafusion_physical_expr_common:: physical_expr:: fmt_sql;
5251use itertools:: Itertools ;
5352
@@ -984,18 +983,8 @@ impl ExecutionPlan for AggregateExec {
984983 partition : usize ,
985984 context : Arc < TaskContext > ,
986985 ) -> Result < SendableRecordBatchStream > {
987- // Only wrap no‐grouping aggregates in our YieldStream
988- // (grouped aggregates tend to produce small streams
989- // and can rely on Tokio's own task‐yielding)
990- let typed = self . execute_typed ( partition, context) ?;
991- if self . group_expr ( ) . is_empty ( ) {
992- // no GROUP BY: inject our yield wrapper
993- let raw_stream = typed. into ( ) ; // SendableRecordBatchStream
994- Ok ( Box :: pin ( YieldStream :: new ( raw_stream) ) )
995- } else {
996- // has GROUP BY: just hand back the raw stream
997- Ok ( typed. into ( ) )
998- }
986+ self . execute_typed ( partition, context)
987+ . map ( |stream| stream. into ( ) )
999988 }
1000989
1001990 fn metrics ( & self ) -> Option < MetricsSet > {
Original file line number Diff line number Diff line change @@ -77,6 +77,11 @@ impl AggregateStream {
7777 let baseline_metrics = BaselineMetrics :: new ( & agg. metrics , partition) ;
7878 let input = agg. input . execute ( partition, Arc :: clone ( & context) ) ?;
7979
80+ // Only wrap no‐grouping aggregates in our YieldStream
81+ // (grouped aggregates tend to produce small streams
82+ // and can rely on Tokio's own task‐yielding)
83+ let input = Box :: pin ( YieldStream :: new ( input) ) as SendableRecordBatchStream ;
84+
8085 let aggregate_expressions = aggregate_expressions ( & agg. aggr_expr , & agg. mode , 0 ) ?;
8186 let filter_expressions = match agg. mode {
8287 AggregateMode :: Partial
You can’t perform that action at this time.
0 commit comments