@@ -51,7 +51,7 @@ use datafusion_physical_expr::{
5151use itertools:: Itertools ;
5252use tracing_futures:: Instrument ;
5353
54- pub ( crate ) mod group_values;
54+ pub mod group_values;
5555mod no_grouping;
5656pub mod order;
5757mod row_hash;
@@ -213,7 +213,7 @@ impl PhysicalGroupBy {
213213 }
214214
215215 /// The number of expressions in the output schema.
216- fn num_output_exprs ( & self ) -> usize {
216+ pub fn num_output_exprs ( & self ) -> usize {
217217 let mut num_exprs = self . expr . len ( ) ;
218218 if !self . is_single ( ) {
219219 num_exprs += 1
@@ -242,7 +242,7 @@ impl PhysicalGroupBy {
242242 }
243243
244244 /// Returns the number expression as grouping keys.
245- fn num_group_exprs ( & self ) -> usize {
245+ pub fn num_group_exprs ( & self ) -> usize {
246246 if self . is_single ( ) {
247247 self . expr . len ( )
248248 } else {
@@ -285,7 +285,7 @@ impl PhysicalGroupBy {
285285 ///
286286 /// This might be different from the `group_fields` that might contain internal expressions that
287287 /// should not be part of the output schema.
288- fn output_fields ( & self , input_schema : & Schema ) -> Result < Vec < Field > > {
288+ pub fn output_fields ( & self , input_schema : & Schema ) -> Result < Vec < Field > > {
289289 let mut fields = self . group_fields ( input_schema) ?;
290290 fields. truncate ( self . num_output_exprs ( ) ) ;
291291 Ok ( fields)
@@ -339,9 +339,15 @@ enum StreamType {
339339impl From < StreamType > for SendableRecordBatchStream {
340340 fn from ( stream : StreamType ) -> Self {
341341 match stream {
342- StreamType :: AggregateStream ( stream) => Box :: pin ( stream. instrument ( tracing:: trace_span!( "AggregateStream" ) ) ) ,
343- StreamType :: GroupedHash ( stream) => Box :: pin ( stream. instrument ( tracing:: trace_span!( "GroupedHashAggregateStream" ) ) ) ,
344- StreamType :: GroupedPriorityQueue ( stream) => Box :: pin ( stream. instrument ( tracing:: trace_span!( "GroupedTopKAggregateStream" ) ) ) ,
342+ StreamType :: AggregateStream ( stream) => {
343+ Box :: pin ( stream. instrument ( tracing:: trace_span!( "AggregateStream" ) ) )
344+ }
345+ StreamType :: GroupedHash ( stream) => Box :: pin (
346+ stream. instrument ( tracing:: trace_span!( "GroupedHashAggregateStream" ) ) ,
347+ ) ,
348+ StreamType :: GroupedPriorityQueue ( stream) => Box :: pin (
349+ stream. instrument ( tracing:: trace_span!( "GroupedTopKAggregateStream" ) ) ,
350+ ) ,
345351 }
346352 }
347353}
0 commit comments