-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Generate GroupByHash output in multiple RecordBatches #11758
base: main
Are you sure you want to change the base?
Generate GroupByHash output in multiple RecordBatches #11758
Conversation
Thank you @JasonLi-cn I wonder if we have tested the performance of this branch? I worry that the incremental output generation will result in a copying the values multiple times (as each If this turns out to be a large performance overhead, then I think we could look into updating the accumulators to remember where they have emitted to or something (or maybe add a |
Thank you @alamb . I'll run the benchmark of aggregate. |
f0daecf
to
c6d640b
Compare
Benchmark(main VS this branch)
Gnuplot not found, using plotters backend
aggregate_query_no_group_by 15 12
time: [2.1563 ms 2.1686 ms 2.1824 ms]
change: [-0.1065% +1.0107% +2.0667%] (p = 0.07 > 0.05)
No change in performance detected.
Found 9 outliers among 100 measurements (9.00%)
7 (7.00%) high mild
2 (2.00%) high severe
aggregate_query_no_group_by_min_max_f64
time: [2.0533 ms 2.0655 ms 2.0782 ms]
change: [-0.2686% +0.5618% +1.4604%] (p = 0.21 > 0.05)
No change in performance detected.
Found 3 outliers among 100 measurements (3.00%)
3 (3.00%) high mild
aggregate_query_no_group_by_count_distinct_wide
time: [2.8570 ms 2.8720 ms 2.8884 ms]
change: [-1.1717% -0.2697% +0.6437%] (p = 0.57 > 0.05)
No change in performance detected.
Found 11 outliers among 100 measurements (11.00%)
6 (6.00%) high mild
5 (5.00%) high severe
aggregate_query_no_group_by_count_distinct_narrow
time: [2.6355 ms 2.6427 ms 2.6502 ms]
change: [-0.3158% +0.1024% +0.5289%] (p = 0.64 > 0.05)
No change in performance detected.
Found 4 outliers among 100 measurements (4.00%)
1 (1.00%) low mild
3 (3.00%) high mild
aggregate_query_group_by
time: [3.1874 ms 3.2218 ms 3.2633 ms]
change: [+0.8142% +2.0424% +3.3431%] (p = 0.00 < 0.05)
Change within noise threshold.
Found 10 outliers among 100 measurements (10.00%)
1 (1.00%) low mild
2 (2.00%) high mild
7 (7.00%) high severe
aggregate_query_group_by_with_filter
time: [3.2268 ms 3.2375 ms 3.2499 ms]
change: [-0.8765% -0.3444% +0.1488%] (p = 0.19 > 0.05)
No change in performance detected.
Found 5 outliers among 100 measurements (5.00%)
3 (3.00%) high mild
2 (2.00%) high severe
aggregate_query_group_by_u64 15 12
time: [2.9613 ms 2.9724 ms 2.9841 ms]
change: [-1.6244% -1.0331% -0.4294%] (p = 0.00 < 0.05)
Change within noise threshold.
Found 2 outliers among 100 measurements (2.00%)
2 (2.00%) high mild
aggregate_query_group_by_with_filter_u64 15 12
time: [3.1977 ms 3.2092 ms 3.2215 ms]
change: [-0.5511% -0.0732% +0.4691%] (p = 0.77 > 0.05)
No change in performance detected.
Found 10 outliers among 100 measurements (10.00%)
1 (1.00%) low mild
5 (5.00%) high mild
4 (4.00%) high severe
aggregate_query_group_by_u64_multiple_keys
time: [6.4762 ms 6.5397 ms 6.6054 ms]
change: [+0.9528% +2.4183% +3.9063%] (p = 0.00 < 0.05)
Change within noise threshold.
Found 1 outliers among 100 measurements (1.00%)
1 (1.00%) high mild
aggregate_query_approx_percentile_cont_on_u64
time: [6.2755 ms 6.3326 ms 6.3915 ms]
change: [-4.7095% -2.6423% -0.8346%] (p = 0.01 < 0.05)
Change within noise threshold.
Found 1 outliers among 100 measurements (1.00%)
1 (1.00%) high mild
aggregate_query_approx_percentile_cont_on_f32
time: [5.4463 ms 5.4856 ms 5.5275 ms]
change: [-1.1171% -0.1131% +0.9433%] (p = 0.83 > 0.05)
No change in performance detected.
Found 2 outliers among 100 measurements (2.00%)
2 (2.00%) high mild
aggregate_query_distinct_median
time: [4.1459 ms 4.1710 ms 4.1962 ms]
change: [-1.0176% -0.1285% +0.7244%] (p = 0.78 > 0.05)
No change in performance detected. The performance of # aggregate_query_group_by
SELECT utf8, MIN(f64), AVG(f64), COUNT(f64) FROM t GROUP BY utf8
# aggregate_query_group_by_u64_multiple_keys
SELECT u64_wide, utf8, MIN(f64), AVG(f64), COUNT(f64) FROM t GROUP BY u64_wide, utf8 This test may be incomplete, do you @alamb have any better test suggestions? 🤔 |
Hi @JasonLi-cn -- yes I think we should run the ClickBench and TPCH benchmarks using the script here https://github.com/apache/datafusion/tree/main/benchmarks I am doing so now and will report results here |
I hit a bug #11833 that has been fixed on main when trying to run the benchmarks on this branch:
Thus I a going to merge up from main and try again |
…pByHash_output_in_multiple_rb
🤔
It appears on this branch the kernel killed the process due to out of memory (which does not happen on main)
|
Thank you @alamb 🙏. Let me analyze it further 🤔 |
In order to actually generate the output in multiple batches and gain performance, I think we would need to change:
This would likely require some sort of API change to the accumulators / etc I wonder if we could find some way to do the implementation incrementally |
I agree, finally it should be a big change which switches the group values and related states managed by block like duckdb , and I am working on this(#11931). But maybe just splitting the emit result still have benefits? Seems that it can avoid calling the |
I think personally suggest sketching out what this would look like in a first PR without worrying about getting all the tests passing / compiling etc. If we try to port all code at once to being managed in blocks it is going to be a very large change I am thinking maybe we can have a incremental approach (like for example separately adding the ability to do blocked emission for https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.GroupsAccumulator.html and
If we can set this up so that we get the pattern and a few common implementations setup in the first PR then we can make subsequent PRs to port over the other parts of the aggregation 🤔 |
Yes, I want to do the similar things for
Yes... I try to do something like it but still not thorough enough, and I found it is actually hard to support the exact I am planning to switch to the special blocked emission impl now...
What do you think this way to supprot the special blocked emission? |
Do we need to put |
@JasonLi-cn As I think, Maybe we should impl the special block based
I am making a try about it in #11943 , and have done some related code changes. |
OK. How do we determine the value of block size? |
I think maybe we make it equal to |
Yes I think this is a good strategy |
Thank you for your contribution. Unfortunately, this pull request is stale because it has been open 60 days with no activity. Please remove the stale label or comment or this will be closed in 7 days. |
I believe the plan here is that we will work to improve the coverage of aggregates and then revisit / revive this design |
Which issue does this PR close?
Closes #9562
If the community thinks this PR is reasonable, I will continue the work:
In addition, we need to discuss whether we need to emit by
batch_size
when spill istrue
.During the cargo test, I found that if emitting by
batch_size
when spill is true, some test cases such asaggregate_source_not_yielding_with_spill
could not pass. Because the number ofRecordBatches
has increased, resulting in 'BatchBuilder' call 'push_batch' consumes more memory inupdate_merged_stream
.datafusion/datafusion/physical-plan/src/sorts/builder.rs
Lines 71 to 80 in 0f554fa
Personally, I prefer to emit by batch_size when spill is true. Otherwise, the panic (apache/arrow-rs#6112 (comment)) is easily triggered by spill.
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?