-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Reuse Rows allocation in RowCursorStream #16647
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
zhuqi-lucas
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thank you @Dandandan for great work!
| #[derive(Debug)] | ||
| pub struct RowValues { | ||
| rows: Rows, | ||
| rows: Arc<Rows>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It makes sense, thank you @Dandandan !
|
|
||
| self.rows[stream_idx][1] = Some(Arc::clone(&rows)); | ||
|
|
||
| // swap the curent with the previous one, so that the next poll can reuse the Rows from the previous poll |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @Dandandan , this implementation is really clever:
Double‐buffer swap for smooth handoff
After appending the new data into the “cur” slot, swapping the two slots with std::mem::swap transparently rotates which buffer will be reused next. This means you always have one slot holding the “previous” data for downstream consumers and an idle slot ready for your next try_unwrap.
|
LGTM,I will try running the benchmark for |
|
This is the benchmark scenario where the test data has not been modified by default(multi large string): Benchmarking bench_merge_sorted_preserving/multiple_large_string_columns_with_1m_rows: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 50.2s.
bench_merge_sorted_preserving/multiple_large_string_columns_with_1m_rows
time: [5.0435 s 5.0615 s 5.0813 s]
Found 3 outliers among 10 measurements (30.00%)
1 (10.00%) low mild
2 (20.00%) high severe
Benchmarking bench_merge_sorted_preserving/multiple_u64_columns_with_1m_rows: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 8.6s or enable flat sampling.
bench_merge_sorted_preserving/multiple_u64_columns_with_1m_rows
time: [157.82 ms 160.78 ms 163.05 ms]
➜ arrow-datafusion git:(main) git checkout reuse_rows root@VM-250-221-tencentos arrow-datafusion #
branch 'reuse_rows' set up to track 'origin/reuse_rows'.
Switched to a new branch 'reuse_rows'
➜ arrow-datafusion git:(reuse_rows) cargo bench --bench sort_preserving_merge -- --sample-size=10
Benchmarking bench_merge_sorted_preserving/multiple_large_string_columns_with_1m_rows: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 51.2s.
bench_merge_sorted_preserving/multiple_large_string_columns_with_1m_rows
time: [5.0404 s 5.0613 s 5.0831 s]
change: [-0.5635% -0.0039% +0.5493%] (p = 0.99 > 0.05)
No change in performance detected.
Benchmarking bench_merge_sorted_preserving/multiple_u64_columns_with_1m_rows: Warming up for 3.0000 s
Warning: Unable to complete 10 samples in 5.0s. You may wish to increase target time to 8.6s or enable flat sampling.
bench_merge_sorted_preserving/multiple_u64_columns_with_1m_rows
time: [155.99 ms 157.30 ms 159.18 ms]
change: [-3.1635% -1.4444% +0.3068%] (p = 0.15 > 0.05)
No change in performance detected.
Found 1 outliers among 10 measurements (10.00%)
1 (10.00%) high mildThe performance improvement in the test data above appears to be minimal. I suspect this might be due to the length of the string used for testing being too large, making the memory allocation overhead negligible in comparison. So I tried to make the string smaller, and the test results are as follows: bench_merge_sorted_preserving/multiple_large_string_columns_with_1m_rows
time: [757.06 ms 760.87 ms 764.68 ms]
bench_merge_sorted_preserving/multiple_u64_columns_with_1m_rows
time: [209.89 ms 210.70 ms 211.52 ms]
➜ arrow-datafusion git:(main) git checkout reuse_rows
bench_merge_sorted_preserving/multiple_large_string_columns_with_1m_rows
time: [755.94 ms 758.84 ms 762.58 ms]
change: [-0.9202% -0.2676% +0.4455%] (p = 0.47 > 0.05)
No change in performance detected.
Found 1 outliers among 10 measurements (10.00%)
1 (10.00%) high mild
bench_merge_sorted_preserving/multiple_u64_columns_with_1m_rows
time: [209.22 ms 210.43 ms 212.07 ms]
change: [-0.8397% -0.1278% +0.7042%] (p = 0.78 > 0.05)
No change in performance detected.
Found 1 outliers among 10 measurements (10.00%)
1 (10.00%) high severeThe performance improvement compared to before is indeed more noticeable. |
I had a look at this benchmark. It seems it only is testing a single 1M batch per partition/column? |
You mean streaming back RecordBatch (e.g., batch of 8192 rows) instead of 1M all at once, right? I'll try to tweak this aspect to make it meet expectations. |
Yes, otherwise the "optimization" will have no effect, as it can only re-use allocations across batches. |
|
Ah ok I see you changed |
|
I believe we can increase the in-place memory for sorting benchmark here, here the default is 1MB. The result will largely affected by the in place sort memory buffer from previous experience, details: |
I would prefer to keep config default changes in a separate PR as they can be a mix in terms of performance improvements / regressions. |
|
@alamb may I request some benchmark run? |
|
🤖 |
I really need to figure out how to script this automatically. I will see if I can get claude to do something for me |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
Hm interesting, less pronounced than in my case 🤔 |
|
🤖 |
|
🤖 |
|
🤖: Benchmark completed Details
|
alamb
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is quite cool @Dandandan
My only real concern is that this code will be tricky to maintain and could easily get reverted / regressed as part of a follow on change
I suggest we try and encapsulate it into a struct (I left a suggestion)
Thank you (as always) for the reviews @zhuqi-lucas
cc @tustvold and @crepererum for your amusement
Yeah - I agree. Another option might be to panic or return an error on not maintaining the condition of keeping max 1 (extra) batch buffered in the consumer. This way the tests would fail, but it would be a small breaking change for |
|
Sometimes, i found sort_tpch10 will get the more accurate or good result when we optimize the merge part, because our in_mem sort buffer is 1MB, so the sort_tpch will have less count for merge compare count, i added the sort_tpch10 to bench.sh, hope it will be helpful: |
Ah, that makes sense, maybe that explains the difference in results. Let's compare sort_tpch10 as well. |
@alamb could you maybe run it on |
I have queued this up and it should run in a few minutes |
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
|
🤖 |
|
🤖: Benchmark completed Details
|
LOL |
😺 Is it due to no updating for sort_tpch10 for the benchmark script? |
Co-authored-by: Oleks V <comphead@users.noreply.github.com>
* Reuse Rows in RowCursorStream * WIP * Fmt * Add comment, make it backwards compatible * Add comment, make it backwards compatible * Add comment, make it backwards compatible * Clippy * Clippy * Return error on non-unique reference * Comment * Update datafusion/physical-plan/src/sorts/stream.rs Co-authored-by: Oleks V <comphead@users.noreply.github.com> * Fix * Extract logic * Doc fix --------- Co-authored-by: Oleks V <comphead@users.noreply.github.com>
|
|
||
| /// A pair of `Arc<Rows>` that can be reused | ||
| #[derive(Debug)] | ||
| struct ReusableRows { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
|
As a random aside, this is the kind of micro optimization that I would have been terrified of making in C/C++ land without extreme care to avoid concurrent access. With Rust I trust the compiler 🦾 |


Which issue does this PR close?
RowCursorStream#15720Rationale for this change
Speedup / reduce allocations for multi column sorting.
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?