-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Intermediate result blocked approach to aggregation memory management #15591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
Rachelint
wants to merge
90
commits into
apache:main
Choose a base branch
from
Rachelint:intermeidate-result-blocked-approach
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+2,744
−189
Draft
Changes from all commits
Commits
Show all changes
90 commits
Select commit
Hold shift + click to select a range
4353748
define the needed methods in `GroupAccumulator` and `GroupValues`.
Rachelint a4450f7
define `GroupIndexOperations`, and impl block/flat mode for it.
Rachelint 96b7435
support block approach for `GroupValuesPrimitive`.
Rachelint 38d4fc6
add new emit mode.
Rachelint 72e0fc3
make `NullState` codes common for better reuse.
Rachelint 2dba944
improve comments.
Rachelint c08f23a
remove stale codes.
Rachelint f6f3bd6
add tests.
Rachelint 53e8c8e
support dynamic dispatching for `NullState`.
Rachelint b5d231e
improve tests to cover `BlockedNullState`.
Rachelint a644b17
complete the impl of blocked `GroupValuesPrimitive`.
Rachelint 67f03fc
support blocked mode for `GroupValuesPrimitive`.
Rachelint d165fb0
make `EmitTo::NextBlock` simpler.
Rachelint 319e135
extract the common codes of block for reusing.
Rachelint 808f142
support blocked mode for `PrimitiveGroupsAccumulator`.
Rachelint 489f093
impl block based result returning logic.
Rachelint 54002a1
add judgement about when we should enable blocked groups optimization.
Rachelint 1f7b4bc
add config to control if we enable blocked groups optimization.
Rachelint e58afa5
fix e2e sql tests.
Rachelint 4294ab7
fix group values len method.
Rachelint 79714a4
add `memory_limit` to expose the info in `MemoryPool`.
Rachelint 266b48e
modify the spilling judgement.
Rachelint 55de98c
add unit tests for primitive group values.
Rachelint 9145833
improve comments for `GroupIndexOperations`.
Rachelint 04f15b0
add `enable_aggregation_blocked_groups` to aggr fuzzy test.
Rachelint be64a74
refactor and make `QueryBuilder` more configurable.
Rachelint 7da0259
fix tests.
Rachelint d771038
Merge branch 'main' into intermeidate-result-blocked-approach
Rachelint ffb11cd
fix clippy.
Rachelint 7f543d8
add fuzzy tests for blocked groups, and fix enable logic.
Rachelint 868210f
update config.md and fix clippy.
Rachelint bdcd1b8
improve comment about blocked groups for `GroupedHashAggregateStream`.
Rachelint 3c7317d
fix stack overflow.
Rachelint ff9c3ad
add extended query to see the improvement.
Rachelint 96b3c77
Merge branch 'main' into intermeidate-result-blocked-approach
Rachelint 3e23408
Merge branch 'main' into intermeidate-result-blocked-approach
Rachelint bb63628
Update datafusion/common/src/config.rs
Rachelint a7c4c7b
update config.
Rachelint e033567
fix fmt.
Rachelint d173056
fix logic test.
Rachelint 29222e1
Merge branch 'main' into intermeidate-result-blocked-approach
Rachelint 426e2ee
improve comments.
Rachelint 948c4ce
move group index operations to a new module.
Rachelint 09b97ab
define `Blocks` and use it to refactor.
Rachelint cee016c
extract blocks to a dedicated module.
Rachelint 75ee3f3
add tests for `Blocks`.
Rachelint 5a6e030
simplify codes about blocks.
Rachelint 4c6799f
fix ci.
Rachelint 93e5f9d
Merge branch 'main' into intermeidate-result-blocked-approach
Rachelint 7542b49
return error when found blocked approach not supported.
Rachelint e8808eb
try to avoid using of `VecDeque`.
Rachelint e3ba95c
suggest inline.
Rachelint 8807026
use unsafe to get in Vec.
Rachelint 62157a9
optimize rehash.
Rachelint add409e
new blocks resize method.
Rachelint 4e6193a
Merge branch 'main' into intermeidate-result-blocked-approach
Rachelint c7ce363
optimize boolean builder blocks.
Rachelint 13296c1
revert BooleanBuffer.
Rachelint da4c590
refactor group index computation.
Rachelint 12c211d
use `VecDeque` to simplify codes of `GroupValuesPrimitive`.
Rachelint cd4ffea
fix tests for `NullState`, and also add more comments to explain why …
Rachelint a9a42e5
Refactor `Blocks` implementation.
Rachelint c889300
Use back `Vec` rather than `VecDeque` for performance, and we define …
Rachelint f26d57e
add tests for `Blocks`, also reduce some `unwrap`s.
Rachelint b64b7f4
fix clippy.
Rachelint a650895
Merge main, and fix some bugs.
Rachelint 3b93e63
improve tests for `GroupValuesPrimitive`.
Rachelint 907b43e
improve comments, also address cr.
Rachelint 7869779
revert some logics about better blocks emitting, because we can't sup…
Rachelint cfc3135
test more efficient group index operation.
Rachelint 9a72685
impl `get/get_mut` for `GeneralBlocks`.
Rachelint ab6276d
add option to check
Rachelint 1890b77
fix
Rachelint 98d04eb
add hash compare.
Rachelint 669e0c7
add dedicated bench.
Rachelint 14d8740
remove bound check in `Blocks`.
Rachelint c1fc822
make profile more visible.
Rachelint e6a06c1
bigger block size
Rachelint 1c417e5
larger.
Rachelint 7fbcde5
fix
Rachelint edc19fe
make block factor configurable.
Rachelint 4fc6f8d
use set_len.
Rachelint c02c5ef
capacity of outer Vec.
Rachelint 71ef9ce
test.
Rachelint b001de8
test2.
Rachelint ec658c8
test4.
Rachelint 73ab7f0
test 5.
Rachelint 72b1f20
test 6.
Rachelint a8b8d35
remove option
Rachelint 600318f
test 7.
Rachelint File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||||
---|---|---|---|---|---|---|---|---|
|
@@ -18,19 +18,23 @@ | |||||||
//! Vectorized [`GroupsAccumulator`] | ||||||||
|
||||||||
use arrow::array::{ArrayRef, BooleanArray}; | ||||||||
use datafusion_common::{not_impl_err, Result}; | ||||||||
use datafusion_common::{not_impl_err, DataFusionError, Result}; | ||||||||
|
||||||||
/// Describes how many rows should be emitted during grouping. | ||||||||
#[derive(Debug, Clone, Copy)] | ||||||||
pub enum EmitTo { | ||||||||
/// Emit all groups | ||||||||
/// Emit all groups, will clear all existing group indexes | ||||||||
All, | ||||||||
/// Emit only the first `n` groups and shift all existing group | ||||||||
/// indexes down by `n`. | ||||||||
/// | ||||||||
/// For example, if `n=10`, group_index `0, 1, ... 9` are emitted | ||||||||
/// and group indexes `10, 11, 12, ...` become `0, 1, 2, ...`. | ||||||||
First(usize), | ||||||||
/// Emit next block in the blocked managed groups | ||||||||
/// | ||||||||
/// Similar as `Emit::All`, will also clear all existing group indexes | ||||||||
NextBlock, | ||||||||
} | ||||||||
|
||||||||
impl EmitTo { | ||||||||
|
@@ -39,6 +43,10 @@ impl EmitTo { | |||||||
/// remaining values in `v`. | ||||||||
/// | ||||||||
/// This avoids copying if Self::All | ||||||||
/// | ||||||||
/// NOTICE: only support emit strategies: `Self::All` and `Self::First` | ||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||
/// Will call `panic` if called with `Self::NextBlock` | ||||||||
/// | ||||||||
pub fn take_needed<T>(&self, v: &mut Vec<T>) -> Vec<T> { | ||||||||
match self { | ||||||||
Self::All => { | ||||||||
|
@@ -52,6 +60,7 @@ impl EmitTo { | |||||||
std::mem::swap(v, &mut t); | ||||||||
t | ||||||||
} | ||||||||
Self::NextBlock => unreachable!("don't support take block in take_needed"), | ||||||||
} | ||||||||
} | ||||||||
} | ||||||||
|
@@ -250,4 +259,49 @@ pub trait GroupsAccumulator: Send { | |||||||
/// This function is called once per batch, so it should be `O(n)` to | ||||||||
/// compute, not `O(num_groups)` | ||||||||
fn size(&self) -> usize; | ||||||||
|
||||||||
/// Returns `true` if this accumulator supports blocked groups. | ||||||||
/// | ||||||||
/// Blocked groups(or called blocked management approach) is an optimization | ||||||||
/// to reduce the cost of managing aggregation intermediate states. | ||||||||
/// | ||||||||
/// Here is brief introduction for two states management approaches: | ||||||||
/// - Blocked approach, states are stored and managed in multiple `Vec`s, | ||||||||
/// we call it `Block`s. Organize like this is for avoiding to resize `Vec` | ||||||||
/// and allocate a new `Vec` instead to reduce cost and get better performance. | ||||||||
/// When locating data in `Block`s, we need to use `block_id` to locate the | ||||||||
/// needed `Block` at first, and use `block_offset` to locate the needed | ||||||||
/// data in `Block` after. | ||||||||
/// | ||||||||
/// - Single approach, all states are stored and managed in a single large `Block`. | ||||||||
/// So when locating data, `block_id` will always be 0, and we only need `block_offset` | ||||||||
/// to locate data in the single `Block`. | ||||||||
/// | ||||||||
/// More details can see: | ||||||||
/// <https://github.com/apache/datafusion/issues/7065> | ||||||||
/// | ||||||||
fn supports_blocked_groups(&self) -> bool { | ||||||||
false | ||||||||
} | ||||||||
|
||||||||
/// Alter the block size in the accumulator | ||||||||
/// | ||||||||
/// If the target block size is `None`, it will use a single big | ||||||||
/// block(can think it a `Vec`) to manage the state. | ||||||||
/// | ||||||||
/// If the target block size` is `Some(blk_size)`, it will try to | ||||||||
/// set the block size to `blk_size`, and the try will only success | ||||||||
/// when the accumulator has supported blocked mode. | ||||||||
/// | ||||||||
/// NOTICE: After altering block size, all data in existing accumulators will be cleared. | ||||||||
/// | ||||||||
fn alter_block_size(&mut self, block_size: Option<usize>) -> Result<()> { | ||||||||
if block_size.is_some() { | ||||||||
return Err(DataFusionError::NotImplemented( | ||||||||
"this accumulator doesn't support blocked mode yet".to_string(), | ||||||||
)); | ||||||||
} | ||||||||
|
||||||||
Ok(()) | ||||||||
} | ||||||||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if there is any value to testing the old code path (
enable_aggregation_blocked_groups = false
) if our goal is to remove it eventually.I recommend only testing with the flag set to the default value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but it will only go through the new path when all accumaltors in query support
blocked approach
, otherwise all of them will fallback toflat approach
.So I think we still need to test it now but make sense to remove in future.