-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[C++][Parquet] Using BMI to implement filter pushdown #37559
Comments
When it goes to rep-def levels, I believe we already have some BMI2 impls: arrow/cpp/src/parquet/level_conversion.cc Line 128 in a526ba6
However, other decode part missing this. Besides, currently, the Arrow Parquet implemention even don't implement the Predicate pushdown in C++. Acero has some Maybe we can evaluate and implement that step by step, like Velox 🤔? |
I am currently working on gluten and clickhouse backend. Our team try to implement a filter push down like velox but using clickhouse expression evaluate framework which is not good since we have to build full The paper gives right direction, and I think we should implement filter push donw like this in apache arrow. Would we have a meeting on this? |
What kind of interface does ClickHouse using around Parquet? I guess there are lots of work todo. first the expression should be cast to Technically, the way like arrow-rs might be followed. As for BMI itself, I guess it's better an optional optimization rather than a neccessary-opt (like decode rep-deps and unpack in current parquet implemention) |
Agreed!
see ArrowBlockInputFormat , clickhouse use
We lack knowledge of how to build an expression evaluate framework based on arrow. It would be great that the arrow community could do this. By the way, we also find that memmory copy hurt performanace, is it possible to directly decode into Clickhouse's data structure? The current code path looks like this :
|
Hmmm If ClickHouse support non-owned Buffer, or other, it could be simpler 🤔 Otherwise you need to implement the logic under |
Yes! that's why filter push down is important, which colud reduce memory copy significantly. |
Another thing is that have you do the field pruning and RowGroup statistics pruning? This could be done easily |
Maybe you can email me and @wgtmac . And when we have some RFC maybe we can deliver it in arrow weekly meeting? |
We're at UTC-8 now. We can chat these in mail since github issue is public, and it might not directly related to the issue? |
I'd suggest that we focus on filter pushdown first for the low-level parquet bindings and be able to plumb that through Arrow. For the lower level bindings we might want to consider something generic like substrait expressions? Also, not everyone goes to the Arrow sync, sharing a google doc on the mailing list would be appreciated in conjunction to any conversation in Arrow. CC @fatemehp |
+1 for @emkornfield 's suggestion. It would be good to have a draft design in google doc and share to the public. I have just read the paper and it simply focuses on row-level filtering optimization. However, without predicate pushdown (which is a prerequisite in this case), it still involves a lot of unnecessary I/O and filter evaluation (even using BMI on encoded values) on pages that can be filtered by page index. But these features (i.e. predicate pushdown and selection pushdown) are orthogonal, therefore I am not objecting to implement selection pushdown using BMI. We need to choose expression and selection vector which can be used in the low-level parquet-cpp library and then integrate into the arrow layer. |
I Propose that:
Status StatisticsAsScalars(const Statistics& statistics,
std::shared_ptr<::arrow::Scalar>* min,
std::shared_ptr<::arrow::Scalar>* max);
Status ColumnIndexAsScalars(...,
std::shared_ptr<::arrow::Scalar>* min,
std::shared_ptr<::arrow::Scalar>* max); The result might be a
|
I would really like to try to decouple Arrow types from the core parquet implementation if possible, but this might be too large of a hurdle. There is already a callback at the page level that doesn't Arrow can make use of for page page level statistics. Figuring out a similar mechanism for the index structure makes sense to me. This only gets to statistics pruning, as noted by the original request we likely need a more advanced API to cover lower level filtering. |
@emkornfield Do you have any proposed API or reference on this? |
Yes, field pruning and row group statistics are already implemented in Clickhouse. I think that the first thing is to implement filter pushdown in cpp parquet, this is a very basic optimization. Then we can try to implement selection push down implies by the paper maybe using BMI to accelerate. |
There are several cpp parquet implementations in the c++ community, like starrocks, doris, duckdb, velox to avoid internal layer data structure transformation leading to about 2x performance boost. But the arrow parquet is the most mature implementation, clickhouse use it for long time used by gluten project. It's time to add some new things to arrows cpp-parquet. |
+1 to implementing the most basic building blocks first. The building blocks here seem to be: We need to also have the option of returning the matching row ranges. |
I have thought the same thing. But it requires a brand-new expression interface that deals with parquet physical and logical types. I have implemented a native expression interface to the Apache ORC C++ library, so I know how much effort it takes. In our in-house platform, we have ended up with a solution pretty much similar to the implementation in the parquet-mr:
So my proposal is to use a list of selected row ranges to be the adapter between parquet layer and arrow layer to implement the predicate push down:
The proposal above aligns with the idea presented by the paper. We can use a selection bitmap to represent the list of selected row ranges. Then the idea to apply BMI to push down selection can be integrated seamlessly. |
@wgtmac Another problem is that, for RowSelector, we need to implement a "lazy" logic. i.e. use different filter in |
What does lazy mean? Does it mean late materilization? If so, that would be the required feature. From experience of implementing filter push down in clickhouse (not in arrow), we have found decoding unnecessary filter columns hurt performance for tpch q6. |
Yeah. I think during read. The original logic is:
When it turns to filter pushdown, we might need some Late materialization techniques. This might change the procedure to:
[1] https://issues.apache.org/jira/browse/SPARK-36527 The link above uses this technique. Note that I guess it not always improve the CPU performance. e.g: For filter output like. (And I know ClickHouse user would like to remove un-selected row when Build arrow |
I think this is where BMI goes into. Without BMI, I think such case may hurt performance. |
This is not only the BMI Part. The underlying data decoding might origin works like |
From the paper, IIUC, they copy the selected encode value out, and then decode them as if all records are decoded. The following cited from section 4.1
|
Ooops, sorry for misunderstand the logic here. After take a glance at the paper, seems it combine
This also means putting |
Agreed, I think this should be achievable with the callback that was added, or at least we can maybe extend it?
Agreed this is a first good step and I think yields the decoupling I was looking for. As long as we are reading arrow data, it makes sense to use the arrow expression library, but my aim was to not intermingle arrow types more directly with the core parquet reader.
I think the proposal this provides a good first pass. IIUC correctly we probably want to take this further at some point to have a callback that can be presented with encoded (or at least an intermediate form) of data where it makes sense to apply the filter (some forms might be more useful then others) to get back the row set. @mapleFU haven't given thoughts to specifics beyond this. I think the net new item is an interface for getting presented with encoded parquet data. As noted above at a pseudo-code level, we'd probably needs to be something that can be integrated into the decoders directly that will continue to emit values, but also record filtered ranges. |
🤔So the user-callback should directly handle the Parquet itself only decode data into compact raw-data array, maybe we need a interface for this. |
Also cc @pitrou for some advice |
From an architectural POV, I think @westonpace and @lidavidm would be the most apt at giving advice. I just have a couple questions:
Also cc @cyb70289 for more expert SIMD advice. |
Are we considering row-level filter evaluation or just pruning based on page stats? |
Two answer some questions:
BMI2,
Per above, I think we already have the necessary instructions modelled with dynamic dispatch in parquet (we might want o revisit settings for AMD CPUs, as I think later versions now support the instructions efficiently
Maybe, this would be an area to look into.
It sounds like both. |
Could we support a simple approach to filter push down first, leaving the door open for more complicated optimizations later? |
Agreed. We had an offline discussion with @zhanglistar and @baibaichen about possible solutions and collaborations. @mapleFU will sort out things in detail and post it here for review. |
I'm to busy these days and only may have time this weekend, I just list some skeleton and might talk about it in arrow meeting tonight https://docs.google.com/document/d/1SeVcYudu6uD9rb9zRAnlLGgdauutaNZlAaS0gVzjkgM/edit |
Thanks a lot @mapleFU for putting together the doc. It is very helpful. A few comments:
|
I'm a graduate student in Harbin Institute of Technology, I'm very interested in this issue and I want to jointly reproduce the operation of pushdown using BMI. |
Thanks for the summary document @mapleFU . This all sounds pretty cool to me.
I agree, there is a lot to figure out here. However, I do think arrow-cpp's compute module has a lot of the pieces that are needed already, and I think parquet-cpp already depends on the compute module. If you want to keep depending on arrow-cpp for compute then I think page filtering with statistics should be straightforward (can mostly copy what is in datasets). The rest would be more effort. It sounds like there is some plan to use selection vectors (which makes a lot of sense) but arrow compute doesn't use selection vectors today. However, there are a lot of really good bitmap utilities in arrow-cpp. I think anyone wanting to implement selection vectors should probably review what's available there first. Also, https://github.com/RoaringBitmap/CRoaring might be an interesting concept to read up on when it comes to selection vectors. Maybe a good way to get started on this work would be to create some microbenchmarks that do not currently perform well in parquet-cpp? |
@SZn5489 Thanks, the BMI part is the final part of filter pushdown, it just works on You can first try to take a look at int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits,
int64_t valid_bits_offset) override {
num_values = std::min(num_values, num_values_);
if (num_values != idx_decoder_.GetBatchWithDictSpaced(
reinterpret_cast<const T*>(dictionary_->data()),
dictionary_length_, buffer, num_values, null_count, valid_bits,
valid_bits_offset)) {
ParquetException::EofException();
}
num_values_ -= num_values;
return num_values;
} And try to using BMI2 to optimize it. Welcome adding benchmark and optimize this. You can also try to use BMI to implement |
🤔I think the most tricky part is that:
Maybe I should implement the filter kernel in parquet/arrow module, and parquet-cpp module only handled the selector. Otherwise dispatching the filter on physical type would be a disaster. |
@mapleFU Thank you for your guidance. I'm going to try to understand the code and try to implement Selection vector. |
Finally I have got some time to complete the design doc drafted by @mapleFU: https://docs.google.com/document/d/1SeVcYudu6uD9rb9zRAnlLGgdauutaNZlAaS0gVzjkgM/. The proposal mainly is based on the discussion with @baibaichen @binmahone. I think it would be good to get a consensus from the community before getting hands dirty on the implementation. Please take a look and any feedback are welcome! @emkornfield @fatemehp @pitrou @westonpace |
This proposes a number of reader APIs based on row ranges, but never says how row ranges are computed in the first place? |
@pitrou I think the intent is that it is specifically abstract. I think there are a few different methods to produce the ranges:
@wgtmac I think this is partially covered in the design but it would be good to maybe make this more explicit |
I agree with @emkornfield that there are many approaches to produce row ranges. AFAIK, many downstream projects have different expression APIs and only use the arrow layer of parquet-cpp (not the dataset layer). It is difficult to determine a single approach of producing row ranges for all downstream projects, but it is easy to make the agreement to push down row ranges to the parquet reader to achieve filtering. Therefore I leave the freedom of different engines to design their own logic to produce row ranges. At the moment, I have marked filtering support of parquet dataset reader as a non-goal in the doc. My idea is to use expressions from Arrow compute to do something similar to what parquet-mr does: https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java#L65-L104. |
Why is it necessary to have a RowRanges API? The ::parquet::internal::RecordReader has ReadRecords and SkipRecords APIs, this should be sufficient to read/skip ranges of rows. This will reduce the burden of converting whatever upstream format of row ranges to the one compatible with what we define here. P.S. We have considered removing the RecordReader from the internal scope: |
IMO, it would be too late to optimize I/O and decoding when SkipRecords is called. Pushing down row ranges has good separation and makes it easy to do the planning before reading. |
I added a comment to the doc, I think we should think more about the format of the ranges both in terms of memory and performance. I would consider analyzing the following options before making a decision:
|
@fatemehp I think we are all ok with this, it is just waiting a PR: #37003 (comment) |
do we have a PR for BMI now |
@wordhardqi Not yet, we should first planning a interface for high level pruning, and the work is on-going now However, if you're interesting in BMI implemention, implement better |
Please, stop obsessing over BMI or other "magical" optimization methods (which SIMD instructions are often viewed as). The correct way to reason about this should be 1) find bottlenecks 2) find portable methods to optimize them 3) only as a last resort, consider SIMD optimizations. |
There is a lot in the |
Describe the enhancement requested
From Selection Pushdown in Column Stores using Bit Manipulation Instructions, they said
And
BMI was instroduced 10 years ago, so implementing this feature should have big benefits.
Component(s)
C++, Parquet
The text was updated successfully, but these errors were encountered: