-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat: implement GroupArrayAggAccumulator attempt 3 #17915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
73910cd to
e620847
Compare
100da16 to
7106918
Compare
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very very cool @duongcongtoai
| stacked_batches: Vec<ArrayRef>, | ||
| // address items of each group within the stacked_batches | ||
| // this is maintained to perform kernel::interleave | ||
| stacked_group_indices: Vec<Vec<(usize, usize)>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As written I think this effectively has an extra allocation per group
If you saved the internal state using using a single Vec<usize> that represents the group for each element in the ArrayRefs, you could then sort it (remembering the original index as well) to determine the final arguments to interleave
| opt_filter: Option<&BooleanArray>, | ||
| total_num_groups: usize, | ||
| ) -> Result<()> { | ||
| // TODO: all the reference to this function always result into this opt_filter as none |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
filter_opt is passed for a query like SELECT ARRAY_AGG(x FILTER y > 5)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then i think we should add a testcase for it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually for merge_batch i think the opt_filter will always be None, for example the implementation of
median:
| // Since aggregate filter should be applied in partial stage, in final stage there should be no filter |
count:
| _opt_filter: Option<&BooleanArray>, |
|
some error on memory reservation, i'm not sure if the calculation if |
| } | ||
|
|
||
| fn size(&self) -> usize { | ||
| size_of_val(self) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some error on memory reservation, i'm not sure if the calculation if
fn sizeis wrong, but as i understand, we only account for the buffer newly created by the implementation, not the buffer we borrowed somewhere (i.e the stacked ArrayRef everytime we receive fromfn merge_batchorfn update_batch. Maybe it's a good chance for me learn how mem reservation/spilling works
I took a quick look at this code -- One thing we probably need to account for is the memory in the held ArrayRefs -- specifically by calling https://docs.rs/arrow/latest/arrow/array/trait.Array.html#tymethod.get_array_memory_size on all the stacked arrays
However, I bet the fuzz test failure is due to actually better accounting of memory. Maybe we need to readjust the parameters or something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the stacked arrays size will grow overtime, accounting their size will likely make this implementation infeasible under memory pressure. I wonder, should we introduce an option for user to either use the old implementation of GroupAccumulatorAdaptor vs this implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually after reducing batch_size of the fuzz test it passed, this happens during spill merge. But interesting the size that makes the merge overflow is the size of the output (not the size of the accumulator), then how the new implementation cause the output to use more mem 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for batch in batches {
in_progress_file.append_batch(&batch)?;
max_record_batch_size = max_record_batch_size.max(batch.get_sliced_size()?);
}
max_record_batch_size decides the allocation during the spill merge, maybe this is the difference
I tried benchmark again with this branch, the result is improved compared to datafusion 50.0.0, however i have to modify the original script a bit, because writing to parquet fails in the same way described in this issue I'll also take a look on that later |
Which issue does this PR close?
array_agg, Polars is 4 times faster for equal query #17446This shouldn't affect existing queries (In the first round of benchmark the result looks mixed for some reason, maybe because i leave so many running-process open)
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?