Skip to content

Commit 5b874da

Browse files
committed
Consolidate BoundedAggregateStream
1 parent 413eba1 commit 5b874da

File tree

17 files changed

+977
-1396
lines changed

17 files changed

+977
-1396
lines changed

datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs

Lines changed: 0 additions & 1080 deletions
This file was deleted.

datafusion/core/src/physical_plan/aggregates/mod.rs

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
//! Aggregates functionalities
1919
2020
use crate::physical_plan::aggregates::{
21-
bounded_aggregate_stream::BoundedAggregateStream, no_grouping::AggregateStream,
22-
row_hash::GroupedHashAggregateStream,
21+
no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream,
2322
};
2423
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
2524
use crate::physical_plan::{
@@ -46,10 +45,9 @@ use std::any::Any;
4645
use std::collections::HashMap;
4746
use std::sync::Arc;
4847

49-
mod bounded_aggregate_stream;
5048
mod no_grouping;
49+
mod order;
5150
mod row_hash;
52-
mod utils;
5351

5452
pub use datafusion_expr::AggregateFunction;
5553
use datafusion_physical_expr::aggregate::is_order_sensitive;
@@ -95,7 +93,7 @@ pub enum AggregateMode {
9593
/// Specifically, each distinct combination of the relevant columns
9694
/// are contiguous in the input, and once a new combination is seen
9795
/// previous combinations are guaranteed never to appear again
98-
#[derive(Debug, Clone, PartialEq, Eq)]
96+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9997
pub enum GroupByOrderMode {
10098
/// The input is not (known to be) ordered by any of the
10199
/// expressions in the GROUP BY clause.
@@ -218,15 +216,13 @@ impl PartialEq for PhysicalGroupBy {
218216
enum StreamType {
219217
AggregateStream(AggregateStream),
220218
GroupedHashAggregateStream(GroupedHashAggregateStream),
221-
BoundedAggregate(BoundedAggregateStream),
222219
}
223220

224221
impl From<StreamType> for SendableRecordBatchStream {
225222
fn from(stream: StreamType) -> Self {
226223
match stream {
227224
StreamType::AggregateStream(stream) => Box::pin(stream),
228225
StreamType::GroupedHashAggregateStream(stream) => Box::pin(stream),
229-
StreamType::BoundedAggregate(stream) => Box::pin(stream),
230226
}
231227
}
232228
}
@@ -725,14 +721,6 @@ impl AggregateExec {
725721
Ok(StreamType::AggregateStream(AggregateStream::new(
726722
self, context, partition,
727723
)?))
728-
} else if let Some(aggregation_ordering) = &self.aggregation_ordering {
729-
let aggregation_ordering = aggregation_ordering.clone();
730-
Ok(StreamType::BoundedAggregate(BoundedAggregateStream::new(
731-
self,
732-
context,
733-
partition,
734-
aggregation_ordering,
735-
)?))
736724
} else {
737725
Ok(StreamType::GroupedHashAggregateStream(
738726
GroupedHashAggregateStream::new(self, context, partition)?,
@@ -1116,6 +1104,7 @@ fn create_accumulators(
11161104
.collect::<Result<Vec<_>>>()
11171105
}
11181106

1107+
#[allow(dead_code)]
11191108
fn create_row_accumulators(
11201109
aggr_expr: &[Arc<dyn AggregateExpr>],
11211110
) -> Result<Vec<RowAccumulatorItem>> {
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use datafusion_execution::memory_pool::proxy::VecAllocExt;
19+
20+
use crate::physical_expr::EmitTo;
21+
22+
/// Tracks grouping state when the data is ordered entirely by its
23+
/// group keys
24+
///
25+
/// When the group values are sorted, as soon as we see group `n+1` we
26+
/// know we will never see any rows for group `n again and thus they
27+
/// can be emitted.
28+
///
29+
/// For example, given `SUM(amt) GROUP BY id` if the input is sorted
30+
/// by `id` as soon as a new `id` value is seen all previous values
31+
/// can be emitted.
32+
///
33+
/// The state is tracked like this:
34+
///
35+
/// ```text
36+
/// ┌─────┐ ┌──────────────────┐
37+
/// │┌───┐│ │ ┌──────────────┐ │ ┏━━━━━━━━━━━━━━┓
38+
/// ││ 0 ││ │ │ 123 │ │ ┌─────┃ 13 ┃
39+
/// │└───┘│ │ └──────────────┘ │ │ ┗━━━━━━━━━━━━━━┛
40+
/// │ ... │ │ ... │ │
41+
/// │┌───┐│ │ ┌──────────────┐ │ │ current
42+
/// ││12 ││ │ │ 234 │ │ │
43+
/// │├───┤│ │ ├──────────────┤ │ │
44+
/// ││12 ││ │ │ 234 │ │ │
45+
/// │├───┤│ │ ├──────────────┤ │ │
46+
/// ││13 ││ │ │ 456 │◀┼───┘
47+
/// │└───┘│ │ └──────────────┘ │
48+
/// └─────┘ └──────────────────┘
49+
///
50+
/// group indices group_values current tracks the most
51+
/// (in group value recent group index
52+
/// order)
53+
/// ```
54+
///
55+
/// In this diagram, the current group is `13`, and thus groups
56+
/// `0..12` can be emitted. Note that `13` can not yet be emitted as
57+
/// there may be more values in the next batch with the same group_id.
58+
#[derive(Debug)]
59+
pub(crate) struct GroupOrderingFull {
60+
state: State,
61+
/// Hash values for groups in 0..current
62+
hashes: Vec<u64>,
63+
}
64+
65+
#[derive(Debug)]
66+
enum State {
67+
/// Seen no input yet
68+
Start,
69+
70+
/// Data is in progress. `current is the current group for which
71+
/// values are being generated. Can emit `current` - 1
72+
InProgress { current: usize },
73+
74+
/// Seen end of input: all groups can be emitted
75+
Complete,
76+
}
77+
78+
impl GroupOrderingFull {
79+
pub fn new() -> Self {
80+
Self {
81+
state: State::Start,
82+
hashes: vec![],
83+
}
84+
}
85+
86+
// How many groups be emitted, or None if no data can be emitted
87+
pub fn emit_to(&self) -> Option<EmitTo> {
88+
match &self.state {
89+
State::Start => None,
90+
State::InProgress { current, .. } => {
91+
if *current == 0 {
92+
// Can not emit if still on the first row
93+
None
94+
} else {
95+
// otherwise emit all rows prior to the current group
96+
Some(EmitTo::First(*current))
97+
}
98+
}
99+
State::Complete { .. } => Some(EmitTo::All),
100+
}
101+
}
102+
103+
/// remove the first n groups from the internal state, shifting
104+
/// all existing indexes down by `n`. Returns stored hash values
105+
pub fn remove_groups(&mut self, n: usize) -> &[u64] {
106+
match &mut self.state {
107+
State::Start => panic!("invalid state: start"),
108+
State::InProgress { current } => {
109+
// shift down by n
110+
assert!(*current >= n);
111+
*current -= n;
112+
self.hashes.drain(0..n);
113+
}
114+
State::Complete { .. } => panic!("invalid state: complete"),
115+
};
116+
&self.hashes
117+
}
118+
119+
/// Note that the input is complete so any outstanding groups are done as well
120+
pub fn input_done(&mut self) {
121+
self.state = match self.state {
122+
State::Start => State::Complete,
123+
State::InProgress { .. } => State::Complete,
124+
State::Complete => State::Complete,
125+
};
126+
}
127+
128+
/// Called when new groups are added in a batch. See documentation
129+
/// on [`super::GroupOrdering::new_groups`]
130+
pub fn new_groups(
131+
&mut self,
132+
group_indices: &[usize],
133+
batch_hashes: &[u64],
134+
total_num_groups: usize,
135+
) {
136+
assert!(total_num_groups > 0);
137+
assert_eq!(group_indices.len(), batch_hashes.len());
138+
139+
// copy any hash values
140+
self.hashes.resize(total_num_groups, 0);
141+
for (&group_index, &hash) in group_indices.iter().zip(batch_hashes.iter()) {
142+
self.hashes[group_index] = hash;
143+
}
144+
145+
// Update state
146+
let max_group_index = total_num_groups - 1;
147+
self.state = match self.state {
148+
State::Start => State::InProgress {
149+
current: max_group_index,
150+
},
151+
State::InProgress { current } => {
152+
// expect to see new group indexes when called again
153+
assert!(current <= max_group_index, "{current} <= {max_group_index}");
154+
State::InProgress {
155+
current: max_group_index,
156+
}
157+
}
158+
State::Complete { .. } => {
159+
panic!("Saw new group after input was complete");
160+
}
161+
};
162+
}
163+
164+
pub(crate) fn size(&self) -> usize {
165+
std::mem::size_of::<Self>() + self.hashes.allocated_size()
166+
}
167+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use arrow_array::ArrayRef;
19+
use arrow_schema::Schema;
20+
use datafusion_common::Result;
21+
use datafusion_physical_expr::EmitTo;
22+
23+
use super::{AggregationOrdering, GroupByOrderMode};
24+
25+
mod full;
26+
mod partial;
27+
28+
pub(crate) use full::GroupOrderingFull;
29+
pub(crate) use partial::GroupOrderingPartial;
30+
31+
/// Ordering information for each group in the hash table
32+
#[derive(Debug)]
33+
pub(crate) enum GroupOrdering {
34+
/// Groups are not ordered
35+
None,
36+
/// Groups are orderd by some pre-set of the group keys
37+
Partial(GroupOrderingPartial),
38+
/// Groups are entirely contiguous,
39+
Full(GroupOrderingFull),
40+
}
41+
42+
impl GroupOrdering {
43+
/// Create a `GroupOrdering` for the the specified ordering
44+
pub fn try_new(
45+
input_schema: &Schema,
46+
ordering: &AggregationOrdering,
47+
) -> Result<Self> {
48+
let AggregationOrdering {
49+
mode,
50+
order_indices,
51+
ordering,
52+
} = ordering;
53+
54+
Ok(match mode {
55+
GroupByOrderMode::None => GroupOrdering::None,
56+
GroupByOrderMode::PartiallyOrdered => {
57+
let partial =
58+
GroupOrderingPartial::try_new(input_schema, order_indices, ordering)?;
59+
GroupOrdering::Partial(partial)
60+
}
61+
GroupByOrderMode::FullyOrdered => {
62+
GroupOrdering::Full(GroupOrderingFull::new())
63+
}
64+
})
65+
}
66+
67+
// How many groups be emitted, or None if no data can be emitted
68+
pub fn emit_to(&self) -> Option<EmitTo> {
69+
match self {
70+
GroupOrdering::None => None,
71+
GroupOrdering::Partial(partial) => partial.emit_to(),
72+
GroupOrdering::Full(full) => full.emit_to(),
73+
}
74+
}
75+
76+
/// Updates the state the input is done
77+
pub fn input_done(&mut self) {
78+
match self {
79+
GroupOrdering::None => {}
80+
GroupOrdering::Partial(partial) => partial.input_done(),
81+
GroupOrdering::Full(full) => full.input_done(),
82+
}
83+
}
84+
85+
/// remove the first n groups from the internal state, shifting
86+
/// all existing indexes down by `n`. Returns stored hash values
87+
pub fn remove_groups(&mut self, n: usize) -> &[u64] {
88+
match self {
89+
GroupOrdering::None => &[],
90+
GroupOrdering::Partial(partial) => partial.remove_groups(n),
91+
GroupOrdering::Full(full) => full.remove_groups(n),
92+
}
93+
}
94+
95+
/// Called when new groups are added in a batch
96+
///
97+
/// * `total_num_groups`: total number of groups (so max
98+
/// group_index is total_num_groups - 1).
99+
///
100+
/// * `group_values`: group key values for *each row* in the batch
101+
///
102+
/// * `group_indices`: indices for each row in the batch
103+
///
104+
/// * `hashes`: hash values for each row in the batch
105+
pub fn new_groups(
106+
&mut self,
107+
batch_group_values: &[ArrayRef],
108+
group_indices: &[usize],
109+
batch_hashes: &[u64],
110+
total_num_groups: usize,
111+
) -> Result<()> {
112+
match self {
113+
GroupOrdering::None => {}
114+
GroupOrdering::Partial(partial) => {
115+
partial.new_groups(
116+
batch_group_values,
117+
group_indices,
118+
batch_hashes,
119+
total_num_groups,
120+
)?;
121+
}
122+
123+
GroupOrdering::Full(full) => {
124+
full.new_groups(group_indices, batch_hashes, total_num_groups);
125+
}
126+
};
127+
Ok(())
128+
}
129+
130+
/// Return the size of memory used by the ordering state, in bytes
131+
pub(crate) fn size(&self) -> usize {
132+
std::mem::size_of::<Self>()
133+
+ match self {
134+
GroupOrdering::None => 0,
135+
GroupOrdering::Partial(partial) => partial.size(),
136+
GroupOrdering::Full(full) => full.size(),
137+
}
138+
}
139+
}

0 commit comments

Comments
 (0)