-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Intermediate result blocked approach to aggregation memory management #15591
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Intermediate result blocked approach to aggregation memory management #15591
Conversation
Hi @Rachelint I think I have a alternative proposal that seems relatively easy to implement. |
Really thanks. This design in pr indeed still introduces quite a few code changes... I tried to not modify anythings about
But I found this way will introduce too many extra cost... Maybe we place the |
cc37eba
to
f690940
Compare
95c6a36
to
a4c6f42
Compare
2100a5b
to
0ee951c
Compare
Has finished development(and test) of all needed common structs!
|
c51d409
to
2863809
Compare
It is very close, just need to add more tests! |
31d660d
to
2b8dd1e
Compare
@alamb @Dandandan It is sorry that, I still working on other some new ideas about this pr, so switch to draft again. |
🤖 |
Will do
No worries -- I am sorry I have not given it as much attention as I would like. Is there anything else I can do to help (like implement one of the grouped memory examples)? I am about to file a ticket to start organizing the follow on work |
No worries |
Thanks!
I think still need some time to evaluate what benefits can this brings After experiment, I found:
But inspired by the |
🤖: Benchmark completed Details
|
🤖 |
🤖: Benchmark completed Details
|
I wonder what happens if we make it more like at least 1 million or 1MiB so the effect on cache-friendliness is smaller? We could optimize a growing strategy for the first allocated |
Me too -- I looked at the flamegraph you provided and I agree it seems like almost half the allocation time is spent with pagefaults / zeroing memory. However, I can't tell if that is because there is slowness with the underlying Vec that wasn't initialized or if there is something else going on.
Yes, that was my understanding -- that blocked aggregation would only help performance when the number of intermediate groups was large (which forced additional memory allocations)
I suspect you already know this, but I think you can get back the original Vec from an array via
However, in the high cardinality case, I am not sure there are buffers to reuse during aggregation (the buffers are all held until the output is needed, and then once output is needed they don't get re-created) |
I think I am somewhat lost with the current state. Your comment on #15591 (comment) states
And the most recent benchmark run #15591 (comment) seems to confirm this finding:
However, some of the results on shorter running queries (few groups) shows perhaps a slowdown: #15591 (comment) From those numbers, is it a fair assessment that the blocked approach improves performance when there is a large number of intermediate groups, but does not when there is a small number of groups? |
}; | ||
|
||
fn generate_group_indices(len: usize) -> Vec<usize> { | ||
(0..len).collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does using indices 0..len
mean this benchmark is testing the case where each input has a new unique group?
If this is the case, then I think it would make sense that the benchmark shows lots of time allocating / zeroing memory: the bulk of the work will be copying each value into a new accumulator slot.
I have tried to larger the block size(8 * batch, 16 * batch...), but it seems make slight difference to the performance. So after experiement, I think
|
I think I nearly understand why about this, it is possibly led by
Got it!
I think it maybe be possible in this situation?
Yes, in current implementation, it only help to performance with |
Yeah I think that was expected.
Yeah it is quite efficient, although problematic for large inputs
|
If I don't misunderstand, does it mean strategy like that:
Agree. It also leads to large memory usage, because we only release memory after all the batches are returned(we hold the really large single batch in memory, and only return slice of it now, and only release memory at once after all slices are returned). |
Yes - exactly! |
I wonder what the plan is for this PR? From what I understand, it currently improves performance for aggregates with large numbers of groups, but (slightly) slows down aggregates for smaller numbers of groups. I think this is due to accessing group storage via two indirections (block index / actual index) It seems like the proposal is to have some sort of adaptive structure that uses one part indexes for small numbers of groups and then switches to two part indexes for larger numbers. |
I am experimenting if something we can do basing on this pr to improve performance more, like [memory reuse] (#15591 (comment)). Actually #16135 is part of the attempt. My biggest concern is if we can get more obvious improvement to make the change worthy... |
As I understood the way to get a bigger improvement would be to implement the chunked approach for more group storage / aggregates so that more queries in our benchmarks (like ClickBench) could use the new code path Though of course that would make this PR even bigger We could also make a "POC" type PR with some more implementation to prove out the performance and then break it into smaller pieces for review 🤔 |
Yes, I am trying to implement it for But according to the flamegraph, I found all such queries' bottleneck is actually I think the benefit from this one currently is:
Do you think it still worthy continuing to push forward for above benefits? |
Without having a full understanding of this PR (I have just been following the conversation because the change is exciting) my 2¢ is: for us memory management is currently one of the biggest thorns with DataFusion. It is quite hard at the moment to run with a fixed memory budget given the mix of exceeding memory through under accounting and refusing to run operators that can't spill / try to claim more memory than they actually use. |
I agree with @adriangb , even when it doesn't provide performance improvements still super valuable to push it forward. |
Thanks @adriangb @Dandandan . I just start my new job this week and a bit busy, and I will continue to push it forward this weekend. The new targets for this one may be
So the rest works I think:
|
Thanks for all your help @Rachelint and congratulations on the new job |
thanks @Rachelint and congratulations! |
Which issue does this PR close?
Rationale for this change
As mentioned in #7065 , we use a single
Vec
to manageaggregation intermediate results
both inGroupAccumulator
andGroupValues
.It is simple but not efficient enough in high-cardinality aggregation, because when
Vec
is not large enough, we need to allocate a newVec
and copy all data from the old one.So this pr introduces a
blocked approach
to manage theaggregation intermediate results
. We will never resize theVec
in the approach, and instead we split the data to blocks, when the capacity is not enough, we just allocate a new block. Detail can see #7065What changes are included in this PR?
PrimitiveGroupsAccumulator
andGroupValuesPrimitive
as the exampleAre these changes tested?
Test by exist tests. And new unit tests, new fuzzy tests.
Are there any user-facing changes?
Two functions are added to
GroupValues
andGroupAccumulator
trait.But as you can see, there are default implementations for them, and users can choose to really support the blocked approach when wanting a better performance for their
udaf
s.