Skip to content

Commit 2c013a1

Browse files
committed
Consolidate BoundedAggregateStream
1 parent afc9c9d commit 2c013a1

File tree

17 files changed

+1161
-1365
lines changed

17 files changed

+1161
-1365
lines changed

datafusion/core/src/physical_plan/aggregates/bounded_aggregate_stream.rs

Lines changed: 0 additions & 1072 deletions
This file was deleted.

datafusion/core/src/physical_plan/aggregates/mod.rs

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,7 @@
1818
//! Aggregates functionalities
1919
2020
use crate::physical_plan::aggregates::{
21-
bounded_aggregate_stream::BoundedAggregateStream, no_grouping::AggregateStream,
22-
row_hash::GroupedHashAggregateStream,
21+
no_grouping::AggregateStream, row_hash::GroupedHashAggregateStream,
2322
};
2423
use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
2524
use crate::physical_plan::{
@@ -46,10 +45,9 @@ use std::any::Any;
4645
use std::collections::HashMap;
4746
use std::sync::Arc;
4847

49-
mod bounded_aggregate_stream;
5048
mod no_grouping;
49+
mod order;
5150
mod row_hash;
52-
mod utils;
5351

5452
pub use datafusion_expr::AggregateFunction;
5553
use datafusion_physical_expr::aggregate::is_order_sensitive;
@@ -89,7 +87,7 @@ pub enum AggregateMode {
8987
/// Specifically, each distinct combination of the relevant columns
9088
/// are contiguous in the input, and once a new combination is seen
9189
/// previous combinations are guaranteed never to appear again
92-
#[derive(Debug, Clone, PartialEq, Eq)]
90+
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9391
pub enum GroupByOrderMode {
9492
/// The input is not (known to be) ordered by any of the
9593
/// expressions in the GROUP BY clause.
@@ -212,15 +210,15 @@ impl PartialEq for PhysicalGroupBy {
212210
enum StreamType {
213211
AggregateStream(AggregateStream),
214212
GroupedHashAggregateStream(GroupedHashAggregateStream),
215-
BoundedAggregate(BoundedAggregateStream),
213+
//BoundedAggregate(BoundedAggregateStream),
216214
}
217215

218216
impl From<StreamType> for SendableRecordBatchStream {
219217
fn from(stream: StreamType) -> Self {
220218
match stream {
221219
StreamType::AggregateStream(stream) => Box::pin(stream),
222220
StreamType::GroupedHashAggregateStream(stream) => Box::pin(stream),
223-
StreamType::BoundedAggregate(stream) => Box::pin(stream),
221+
//StreamType::BoundedAggregate(stream) => Box::pin(stream),
224222
}
225223
}
226224
}
@@ -719,14 +717,6 @@ impl AggregateExec {
719717
Ok(StreamType::AggregateStream(AggregateStream::new(
720718
self, context, partition,
721719
)?))
722-
} else if let Some(aggregation_ordering) = &self.aggregation_ordering {
723-
let aggregation_ordering = aggregation_ordering.clone();
724-
Ok(StreamType::BoundedAggregate(BoundedAggregateStream::new(
725-
self,
726-
context,
727-
partition,
728-
aggregation_ordering,
729-
)?))
730720
} else {
731721
Ok(StreamType::GroupedHashAggregateStream(
732722
GroupedHashAggregateStream::new(self, context, partition)?,
@@ -1105,6 +1095,7 @@ fn create_accumulators(
11051095
.collect::<Result<Vec<_>>>()
11061096
}
11071097

1098+
#[allow(dead_code)]
11081099
fn create_row_accumulators(
11091100
aggr_expr: &[Arc<dyn AggregateExpr>],
11101101
) -> Result<Vec<RowAccumulatorItem>> {
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::physical_expr::EmitTo;
19+
20+
/// Tracks grouping state when the data is ordered entirely by its
21+
/// group keys
22+
///
23+
/// When the group values are sorted, as soon as we see group `n+1` we
24+
/// know we will never see any rows for group `n again and thus they
25+
/// can be emitted.
26+
///
27+
/// ```text
28+
/// SUM(amt) GROUP BY id
29+
///
30+
/// The input is sorted by id
31+
///
32+
///
33+
/// ┌─────┐ ┌──────────────────┐
34+
/// │┌───┐│ │ ┌──────────────┐ │ ┏━━━━━━━━━━━━━━┓
35+
/// ││ 0 ││ │ │ 123 │ │ ┌─────┃ 13 ┃
36+
/// │└───┘│ │ └──────────────┘ │ │ ┗━━━━━━━━━━━━━━┛
37+
/// │ ... │ │ ... │ │
38+
/// │┌───┐│ │ ┌──────────────┐ │ │ current
39+
/// ││12 ││ │ │ 234 │ │ │
40+
/// │├───┤│ │ ├──────────────┤ │ │
41+
/// ││12 ││ │ │ 234 │ │ │
42+
/// │├───┤│ │ ├──────────────┤ │ │
43+
/// ││13 ││ │ │ 456 │◀┼───┘
44+
/// │└───┘│ │ └──────────────┘ │
45+
/// └─────┘ └──────────────────┘
46+
///
47+
/// group indices group_values current tracks the most
48+
/// (in group value recent group index
49+
/// order)
50+
/// ```
51+
///
52+
/// In the above diagram the current group is `13` groups `0..12` can
53+
/// be emitted. Group `13` can not be emitted because it may have more
54+
/// values in the next batch.
55+
#[derive(Debug)]
56+
pub(crate) struct GroupOrderingFull {
57+
state: State,
58+
/// Hash values for groups in 0..completed
59+
hashes: Vec<u64>,
60+
}
61+
62+
#[derive(Debug)]
63+
enum State {
64+
/// Have seen no input yet
65+
Start,
66+
67+
/// Have seen all groups with indexes less than `completed_index`
68+
InProgress {
69+
/// index of the current group for which values are being
70+
/// generated (can emit current - 1)
71+
current: usize,
72+
},
73+
74+
/// Seen end of input, all groups can be emitted
75+
Complete,
76+
}
77+
78+
impl GroupOrderingFull {
79+
pub fn new() -> Self {
80+
Self {
81+
state: State::Start,
82+
hashes: vec![],
83+
}
84+
}
85+
86+
// How far can data be emitted? Returns None if no data can be
87+
// emitted
88+
pub fn emit_to(&self) -> Option<EmitTo> {
89+
match &self.state {
90+
State::Start => None,
91+
State::InProgress { current, .. } => {
92+
// Can not emit if we are still on the first row,
93+
// otherwise emit all rows prior to the current group
94+
if *current == 0 {
95+
None
96+
} else {
97+
Some(EmitTo::First(*current))
98+
}
99+
}
100+
State::Complete { .. } => Some(EmitTo::All),
101+
}
102+
}
103+
104+
/// removes the first n groups from this ordering, shifting all
105+
/// existing indexes down by N and returns a reference to the
106+
/// updated hashes
107+
pub fn remove_groups(&mut self, n: usize) -> &[u64] {
108+
match &mut self.state {
109+
State::Start => panic!("invalid state: start"),
110+
State::InProgress { current } => {
111+
// shift down by n
112+
assert!(*current >= n);
113+
*current -= n;
114+
self.hashes.drain(0..n);
115+
}
116+
State::Complete { .. } => panic!("invalid state: complete"),
117+
};
118+
&self.hashes
119+
}
120+
121+
/// Note that the input is complete so any outstanding groups are done as well
122+
pub fn input_done(&mut self) {
123+
self.state = match self.state {
124+
State::Start => State::Complete,
125+
State::InProgress { .. } => State::Complete,
126+
State::Complete => State::Complete,
127+
};
128+
}
129+
130+
/// Note that we saw a new distinct group
131+
pub fn new_group(&mut self, group_index: usize, hash: u64) {
132+
self.state = match self.state {
133+
State::Start => {
134+
assert_eq!(group_index, 0);
135+
self.hashes.push(hash);
136+
State::InProgress {
137+
current: group_index,
138+
}
139+
}
140+
State::InProgress { current } => {
141+
// expect to see group_index the next after this
142+
assert_eq!(group_index, self.hashes.len());
143+
assert_eq!(group_index, current + 1);
144+
self.hashes.push(hash);
145+
State::InProgress {
146+
current: group_index,
147+
}
148+
}
149+
State::Complete { .. } => {
150+
panic!("Saw new group after input was complete");
151+
}
152+
};
153+
}
154+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! Order tracking for memory bounded grouping
19+
20+
use arrow_schema::Schema;
21+
use datafusion_common::Result;
22+
use datafusion_physical_expr::EmitTo;
23+
24+
use super::{AggregationOrdering, GroupByOrderMode};
25+
26+
mod full;
27+
mod partial;
28+
29+
pub(crate) use full::GroupOrderingFull;
30+
pub(crate) use partial::GroupOrderingPartial;
31+
32+
/// Group ordering state, if present, for each group in the hash
33+
/// table.
34+
#[derive(Debug)]
35+
pub(crate) enum GroupOrdering {
36+
/// Groups are not ordered
37+
None,
38+
/// Groups are orderd by some pre-set of the group keys
39+
Partial(GroupOrderingPartial),
40+
/// Groups are entirely contiguous,
41+
Full(GroupOrderingFull),
42+
/// The ordering was temporarily taken / borrowed
43+
/// Note: `Self::Taken` is left when the GroupOrdering is temporarily
44+
/// taken to satisfy the borrow checker. If an error happens
45+
/// before it can be restored the ordering information is lost and
46+
/// execution can not proceed. By panic'ing the behavior remains
47+
/// well defined if something tries to use a ordering that was
48+
/// taken.
49+
Taken,
50+
}
51+
52+
// Default is used for `std::mem::take` to satisfy the borrow checker
53+
impl Default for GroupOrdering {
54+
fn default() -> Self {
55+
Self::Taken
56+
}
57+
}
58+
59+
impl GroupOrdering {
60+
/// Create a `GroupOrdering` for the ordering
61+
pub fn try_new(
62+
input_schema: &Schema,
63+
ordering: &AggregationOrdering,
64+
) -> Result<Self> {
65+
let AggregationOrdering {
66+
mode,
67+
order_indices,
68+
ordering,
69+
} = ordering;
70+
71+
Ok(match mode {
72+
GroupByOrderMode::None => GroupOrdering::None,
73+
GroupByOrderMode::PartiallyOrdered => {
74+
let partial =
75+
GroupOrderingPartial::try_new(input_schema, order_indices, ordering)?;
76+
GroupOrdering::Partial(partial)
77+
}
78+
GroupByOrderMode::FullyOrdered => {
79+
GroupOrdering::Full(GroupOrderingFull::new())
80+
}
81+
})
82+
}
83+
84+
// How far can data be emitted based on groups seen so far?
85+
// Returns `None` if nothing can be emitted at this point based on
86+
// ordering information
87+
pub fn emit_to(&self) -> Option<EmitTo> {
88+
match self {
89+
GroupOrdering::Taken => panic!("group state taken"),
90+
GroupOrdering::None => None,
91+
GroupOrdering::Partial(partial) => partial.emit_to(),
92+
GroupOrdering::Full(full) => full.emit_to(),
93+
}
94+
}
95+
96+
/// Updates the state the input is done
97+
pub fn input_done(&mut self) {
98+
match self {
99+
GroupOrdering::Taken => panic!("group state taken"),
100+
GroupOrdering::None => {}
101+
GroupOrdering::Partial(partial) => partial.input_done(),
102+
GroupOrdering::Full(full) => full.input_done(),
103+
}
104+
}
105+
106+
/// removes the first n groups from this ordering, shifting all
107+
/// existing indexes down by N and returns a reference to the
108+
/// updated hashes
109+
pub fn remove_groups(&mut self, n: usize) -> &[u64] {
110+
match self {
111+
GroupOrdering::Taken => panic!("group state taken"),
112+
GroupOrdering::None => &[],
113+
GroupOrdering::Partial(partial) => partial.remove_groups(n),
114+
GroupOrdering::Full(full) => full.remove_groups(n),
115+
}
116+
}
117+
}

0 commit comments

Comments
 (0)