-
Notifications
You must be signed in to change notification settings - Fork 988
Experimental parquet decoder with first-class selection pushdown support #6921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Implemented some more optimizations and tuning, here are ClickBench numbers on my machine. TLDR: about 15% total time reduction. We first compare no-pushdown vs our new push down implementation. Only Q27 has meaningful slow down, other queries are either similar or much faster. The fix for Q27 requires us to actually switch to a boolean mask-based selector implementation, like the one in #6624
Now we compare our new implementation with the old pushdown implementation -- only Q23 is a bit slower, others are either faster or similar. We do need some extra work to get the optimal performance of Q23. Nonetheless, we are faster than no-pushdown. I believe getting a fix for Q23 does not require foundamental changes to the existing decoding pipeline.
|
The implementation of course lacks tons of tests (I tried to mannually verify the clickbench results). If the high level stuff looks good, I'll try to send break down PRs that has tests and documentations. Like most performance related PRs, some of the code changes can be very non-intuitive, please let me know and I'll try my best to explain why some codes has to implement in that way |
Starting to check it out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @XiangpengHao -- TLDR I think this POC looks really nice and the overall structure makes sense to me. I am willing to help review this PR as it moves closer to reality
There are obvious ways to break this PR up into pieces, which is a nice bonus -- the core caching logic is fairly localized
cc @thinkharderdev @tustvold @Dandandan @etseidl for your comments / reviews as well
I also think the description on the PR is quite good and easy to follow -- thank you for that
(todo: cite myself)
😆 my favorite part of the description
if we can cache the decompressed pages, then we only need to decode arrow twice, which might be good enough.
We can also consider caching arrow as a follow on PR / project. If this initial PR effectively avoids decompressing each page twice (though it still decodes each page to arrow twice) that still seems better than the current main
branch which decompresses and decodes twice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
very nice @XiangpengHao. I think this makes a lot of sense.
b394ff9
to
ea8e68a
Compare
The test failure is a new test added in modular encryption (doesn't error when it should). I am looking at it |
Here is another blog post about this PR: |
@@ -464,6 +465,8 @@ impl ByteViewArrayDecoderDictionary { | |||
} | |||
} | |||
|
|||
output.views.reserve(len); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I broke this change out into a new PR
Update:
This is done via two blogs
Done
I am in the process of doing this. If the benchmarks look good, I'll help polish this PR up, get it ready for merge, and file next steps |
Tracking my plan with |
Let's continue work in #7850 |
…sync_reader) (#7850) This is my latest attempt to make pushdown faster. Prior art: #6921 cc @alamb @zhuqi-lucas - Part of #8000 - Related to apache/datafusion#3463 - Related to #7456 - Closes #7363 - Closes #8003 ## Problems of #6921 1. It proactively loads entire row group into memory. (rather than only loading pages that passing the filter predicate) 2. It only cache decompressed pages, still paying the decoding cost twice. This PR takes a different approach, it does not change the decoding pipeline, so we avoid the problem 1. It also caches the arrow record batch, so avoid problem 2. But this means we need to use more memory to cache data. ## How it works? 1. It instruments the `array_readers` with a transparent `cached_array_reader`. 2. The cache layer will first consult the `RowGroupCache` to look for a batch, and only reads from underlying reader on a cache miss. 3. There're cache producer and cache consumer. Producer is when we build filters we insert arrow arrays into cache, consumer is when we build outputs, we remove arrow array from cache. So the memory usage should look like this: ``` ▲ │ ╭─╮ │ ╱ ╲ │ ╱ ╲ │ ╱ ╲ │ ╱ ╲ │╱ ╲ └─────────────╲──────► Time │ │ │ Filter Peak Consume Phase (Built) (Decrease) ``` In a concurrent setup, not all reader may reach the peak point at the same time, so the peak system memory usage might be lower. 4. It has a max_cache_size knob, this is a per row group setting. If the row group has used up the budget, the cache stops taking new data. and the `cached_array_reader` will fallback to read and decode from Parquet. ## Other benefits 1. This architecture allows nested columns (but not implemented in this pr), i.e., it's future proof. 2. There're many performance optimizations to further squeeze the performance, but even with current state, it has no regressions. ## How does it perform? My criterion somehow won't produces a result from `--save-baseline`, so I asked llm to generate a table from this benchmark: ``` cargo bench --bench arrow_reader_clickbench --features "arrow async" "async" ``` `Baseline` is the implementation for current main branch. `New Unlimited` is the new pushdown with unlimited memory budget. `New 100MB` is the new pushdown but the memory budget for a row group caching is 100MB. ``` Query | Baseline (ms) | New Unlimited (ms) | Diff (ms) | New 100MB (ms) | Diff (ms) -------+--------------+--------------------+-----------+----------------+----------- Q1 | 0.847 | 0.803 | -0.044 | 0.812 | -0.035 Q10 | 4.060 | 6.273 | +2.213 | 6.216 | +2.156 Q11 | 5.088 | 7.152 | +2.064 | 7.193 | +2.105 Q12 | 18.485 | 14.937 | -3.548 | 14.904 | -3.581 Q13 | 24.859 | 21.908 | -2.951 | 21.705 | -3.154 Q14 | 23.994 | 20.691 | -3.303 | 20.467 | -3.527 Q19 | 1.894 | 1.980 | +0.086 | 1.996 | +0.102 Q20 | 90.325 | 64.689 | -25.636 | 74.478 | -15.847 Q21 | 106.610 | 74.766 | -31.844 | 99.557 | -7.053 Q22 | 232.730 | 101.660 | -131.070 | 204.800 | -27.930 Q23 | 222.800 | 186.320 | -36.480 | 186.590 | -36.210 Q24 | 24.840 | 19.762 | -5.078 | 19.908 | -4.932 Q27 | 80.463 | 47.118 | -33.345 | 49.597 | -30.866 Q28 | 78.999 | 47.583 | -31.416 | 51.432 | -27.567 Q30 | 28.587 | 28.710 | +0.123 | 28.926 | +0.339 Q36 | 80.157 | 57.954 | -22.203 | 58.012 | -22.145 Q37 | 46.962 | 45.901 | -1.061 | 45.386 | -1.576 Q38 | 16.324 | 16.492 | +0.168 | 16.522 | +0.198 Q39 | 20.754 | 20.734 | -0.020 | 20.648 | -0.106 Q40 | 22.554 | 21.707 | -0.847 | 21.995 | -0.559 Q41 | 16.430 | 16.391 | -0.039 | 16.581 | +0.151 Q42 | 6.045 | 6.157 | +0.112 | 6.120 | +0.075 ``` 1. If we consider the diff within 5ms to be noise, then we are never worse than the current implementation. 2. We see significant improvements for string-heavy queries, because string columns are large, they take time to decompress and decode. 3. 100MB cache budget seems to have small performance impact. ## Limitations 1. It only works for async readers, because sync reader do not follow the same row group by row group structure. 2. It is memory hungry -- compared to #6921. But changing decoding pipeline without eager loading entire row group would require significant changes to the current decoding infrastructure, e.g., we need to make page iterator an async function. 3. It currently doesn't support nested columns, more specifically, it doesn't support nested columns with nullable parents. but supporting it is straightforward, no big changes. 4. The current memory accounting is not accurate, it will overestimate the memory usage, especially when reading string view arrays, where multiple string view may share the same underlying buffer, and that buffer size is counted twice. Anyway, we never exceeds the user configured memory usage. 5. If one row passes the filter, the entire batch will be cached. We can probably optimize this though. ## Next steps? This pr is largely proof of concept, I want to collect some feedback before sending a multi-thousands pr :) Some items I can think of: 1. Design an interface for user to specify the cache size limit, currently it's hard-coded. 2. Don't instrument nested array reader if the parquet file has nullable parent. currently it will panic 3. More testing, and integration test/benchmark with Datafusion --------- Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Which issue does this PR close?
Many long lasting issues in DataFusion and Parquet. Note that this PR may or may not close these issues, but (imo) it will be the foundation to future more optimizations (e.g., more aggressive selection pushdown as described in this paper).
parquet::column::reader::GenericColumnReader::skip_records
still decompresses most data #6454filter_pushdown
enabled filter takes too much time to process datafusion#13298filter_pushdown
) by default datafusion#3463Why selection pushdown?
Selection pushdown (or late materialization or row-filter or filter pushdown) is great in concept, but can be tricky to implement efficiently. For example, current straightforward implementation actually slow down many queries, which prevents query engine like DataFusion to enable filter pushdown by default. The goal of a super fast row-filter pushdown parquet reader is described by @alamb in #5523 (comment):
Previous discussions have listed many potential optimizations to current selection pushdown, like the ones in #5523 (comment).
However, it's not clear how we can incorporate those optimizations into the current implementation. After thinking more carefully about the design spaces and the implications, I believe the only way to reach that goal is to re-structure the parquet reading pipline, and also reuse as much existing implementation as possible.
Current implementation and the problems
We currently implement a two phase decoding:
Phase 1: Build selectors on each predicate
Phase 2: Decode parquet data using the selector
The problem is that we have to decode the predicate column twice, for example, if column 1 is being filtered, we need to first decode column 1 while evaluating the predicate, then decode it again to build the array.
Caching is the solution but not that simple
The high level intuition is that, if the problem is decoding twice, we simply cache the first decoding results and reuse later.
Here are the nuances:
Proposed solutions
The solution consists two parts:
The pipeline looks like this:
Once we have this pipeline, we can cache the
predicate columns for batch N
and reuse it whenload & emit batch N
, this avoids double decoding.Due to the difficulties mentioned above, this PR cache the decompressed pages, rather than decoded arrow arrays. As some research suggests decompressing pages costs up to twice as much as decoding arrow, if we can cache the decompressed pages, then we only need to decode arrow twice, which might be good enough. Caching decompressed pages is much simpler to implement, as we can reuse the current array_readers and just implement a new PageReader.
What changes are included in this PR?
This PR only implements a reader for async record batch stream. Sync version is left as future work, and should be straightforward based on the async version.
Are there any user-facing changes?
No. The same
ParquetRecordBatchStream
, will automatically benefit from the changes.