@@ -51,7 +51,7 @@ use datafusion_physical_expr::{
5151use  itertools:: Itertools ; 
5252use  tracing_futures:: Instrument ; 
5353
54- pub ( crate )  mod  group_values; 
54+ pub  mod  group_values; 
5555mod  no_grouping; 
5656pub  mod  order; 
5757mod  row_hash; 
@@ -213,7 +213,7 @@ impl PhysicalGroupBy {
213213    } 
214214
215215    /// The number of expressions in the output schema. 
216-      fn  num_output_exprs ( & self )  -> usize  { 
216+      pub   fn  num_output_exprs ( & self )  -> usize  { 
217217        let  mut  num_exprs = self . expr . len ( ) ; 
218218        if  !self . is_single ( )  { 
219219            num_exprs += 1 
@@ -242,7 +242,7 @@ impl PhysicalGroupBy {
242242    } 
243243
244244    /// Returns the number expression as grouping keys. 
245-      fn  num_group_exprs ( & self )  -> usize  { 
245+      pub   fn  num_group_exprs ( & self )  -> usize  { 
246246        if  self . is_single ( )  { 
247247            self . expr . len ( ) 
248248        }  else  { 
@@ -285,7 +285,7 @@ impl PhysicalGroupBy {
285285     /// 
286286     /// This might be different from the `group_fields` that might contain internal expressions that 
287287     /// should not be part of the output schema. 
288-      fn  output_fields ( & self ,  input_schema :  & Schema )  -> Result < Vec < Field > >  { 
288+      pub   fn  output_fields ( & self ,  input_schema :  & Schema )  -> Result < Vec < Field > >  { 
289289        let  mut  fields = self . group_fields ( input_schema) ?; 
290290        fields. truncate ( self . num_output_exprs ( ) ) ; 
291291        Ok ( fields) 
@@ -339,9 +339,15 @@ enum StreamType {
339339impl  From < StreamType >  for  SendableRecordBatchStream  { 
340340    fn  from ( stream :  StreamType )  -> Self  { 
341341        match  stream { 
342-             StreamType :: AggregateStream ( stream)  => Box :: pin ( stream. instrument ( tracing:: trace_span!( "AggregateStream" ) ) ) , 
343-             StreamType :: GroupedHash ( stream)  => Box :: pin ( stream. instrument ( tracing:: trace_span!( "GroupedHashAggregateStream" ) ) ) , 
344-             StreamType :: GroupedPriorityQueue ( stream)  => Box :: pin ( stream. instrument ( tracing:: trace_span!( "GroupedTopKAggregateStream" ) ) ) , 
342+             StreamType :: AggregateStream ( stream)  => { 
343+                 Box :: pin ( stream. instrument ( tracing:: trace_span!( "AggregateStream" ) ) ) 
344+             } 
345+             StreamType :: GroupedHash ( stream)  => Box :: pin ( 
346+                 stream. instrument ( tracing:: trace_span!( "GroupedHashAggregateStream" ) ) , 
347+             ) , 
348+             StreamType :: GroupedPriorityQueue ( stream)  => Box :: pin ( 
349+                 stream. instrument ( tracing:: trace_span!( "GroupedTopKAggregateStream" ) ) , 
350+             ) , 
345351        } 
346352    } 
347353} 
0 commit comments