Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use datafusion_common::Result;

use datafusion_expr::EmitTo;

pub(crate) mod multi_group_by;
pub mod multi_group_by;

mod row;
mod single_group_by;
Expand Down Expand Up @@ -84,7 +84,7 @@ mod null_builder;
/// Each distinct group in a hash aggregation is identified by a unique group id
/// (usize) which is assigned by instances of this trait. Group ids are
/// continuous without gaps, starting from 0.
pub(crate) trait GroupValues: Send {
pub trait GroupValues: Send {
/// Calculates the group id for each input row of `cols`, assigning new
/// group ids as necessary.
///
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ mod primitive;

use std::mem::{self, size_of};

use crate::aggregates::group_values::multi_group_by::{
pub use crate::aggregates::group_values::multi_group_by::{
bytes::ByteGroupValueBuilder, bytes_view::ByteViewGroupValueBuilder,
primitive::PrimitiveGroupValueBuilder,
};
Expand Down
20 changes: 13 additions & 7 deletions datafusion/physical-plan/src/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use datafusion_physical_expr::{
use itertools::Itertools;
use tracing_futures::Instrument;

pub(crate) mod group_values;
pub mod group_values;
mod no_grouping;
pub mod order;
mod row_hash;
Expand Down Expand Up @@ -213,7 +213,7 @@ impl PhysicalGroupBy {
}

/// The number of expressions in the output schema.
fn num_output_exprs(&self) -> usize {
pub fn num_output_exprs(&self) -> usize {
let mut num_exprs = self.expr.len();
if !self.is_single() {
num_exprs += 1
Expand Down Expand Up @@ -242,7 +242,7 @@ impl PhysicalGroupBy {
}

/// Returns the number expression as grouping keys.
fn num_group_exprs(&self) -> usize {
pub fn num_group_exprs(&self) -> usize {
if self.is_single() {
self.expr.len()
} else {
Expand Down Expand Up @@ -285,7 +285,7 @@ impl PhysicalGroupBy {
///
/// This might be different from the `group_fields` that might contain internal expressions that
/// should not be part of the output schema.
fn output_fields(&self, input_schema: &Schema) -> Result<Vec<Field>> {
pub fn output_fields(&self, input_schema: &Schema) -> Result<Vec<Field>> {
let mut fields = self.group_fields(input_schema)?;
fields.truncate(self.num_output_exprs());
Ok(fields)
Expand Down Expand Up @@ -339,9 +339,15 @@ enum StreamType {
impl From<StreamType> for SendableRecordBatchStream {
fn from(stream: StreamType) -> Self {
match stream {
StreamType::AggregateStream(stream) => Box::pin(stream.instrument(tracing::trace_span!("AggregateStream"))),
StreamType::GroupedHash(stream) => Box::pin(stream.instrument(tracing::trace_span!("GroupedHashAggregateStream"))),
StreamType::GroupedPriorityQueue(stream) => Box::pin(stream.instrument(tracing::trace_span!("GroupedTopKAggregateStream"))),
StreamType::AggregateStream(stream) => {
Box::pin(stream.instrument(tracing::trace_span!("AggregateStream")))
}
StreamType::GroupedHash(stream) => Box::pin(
stream.instrument(tracing::trace_span!("GroupedHashAggregateStream")),
),
StreamType::GroupedPriorityQueue(stream) => Box::pin(
stream.instrument(tracing::trace_span!("GroupedTopKAggregateStream")),
),
}
}
}
Expand Down
Loading