-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement parquet page-level skipping with column index, using min/ma… #3780
Conversation
…x stats Signed-off-by: yangjiang <yangjiang@ebay.com>
Signed-off-by: yangjiang <yangjiang@ebay.com>
I run the tpch-q1 modify the filter ">=" to "<=" for skipping more pages
then set
And with log=debug got
Seems get ~1.4x Performance gain. |
} | ||
|
||
fn num_containers(&self) -> usize { | ||
self.offset_indexes.get(self.col_id).unwrap().len() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb PTAL, 🤔 for now num_containers
only return on values
I think we should modify it to
fn num_containers(&self, column: &Column) -> usize {
because each column chunk in one row group has different page numbers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the idea of containers
was that it was the unit of thing that has statistics
So since each RowGroup (may) have statistics, when pruning entire RowGroups the num_containers is the same as the number of RowGroups.
When pruning pages, I would expect the number of containers to be the same as the number of pages that have statistics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to explain this more clearly in the diagram
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree! as you mention:
It also means that the "number of containers" in Column A will be "2" (as it as pages 1 and 2) and the number of containers for Column B will be "3" (as it has pages 3, 4, 5).
we should base on the column identification to return the number of containers, but hard to do this base on this method
~~ https://github.com/apache/arrow-datafusion/blob/e7edac5b66f8b929382a582021165f97392bd50c/datafusion/core/src/physical_optimizer/pruning.rs#L81-L83 ~~
I will modify this in following pr and try support multi cols predicates.
--edit
Yes, after split the filter-expr, it only related to one col, so no need change it and no need pass all cols index!
👍 Thanks
// for now we only support pushDown on one col, because each col may have different page numbers, its hard to get | ||
// `num_containers` from `PruningStatistics`. | ||
let cols = predicate.need_input_columns_ids(); | ||
//Todo more specific rules |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now, cause of num_containers
we only support only one col. In the future, we could add more specific rules.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is a guarantee that the data pages for each column break at the same row index -- since the statistics are stored per page (and thus for a single column) I am not sure when/if we'll be able to support multi column predicates (aka predicates that refer to different columns).
I tried to draw a diagram to illustrate what I think is going on:
┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃
┃ ┌──────────────┐ │ ┌──────────────┐ │ ┃
┃ │ │ │ │ │ │ ┃
┃ │ │ │ │ Page │ │
│ │ │ │ │ 3 │ ┃
┃ │ │ │ │ min: "A" │ │ ┃
┃ │ │ │ │ │ max: "C" │ ┃
┃ │ Page │ │ │ first_row: 0 │ │
│ │ 1 │ │ │ │ ┃
┃ │ min: 10 │ │ └──────────────┘ │ ┃
┃ │ │ max: 20 │ │ ┌──────────────┐ ┃
┃ │ first_row: 0 │ │ │ │ │
│ │ │ │ │ Page │ ┃
┃ │ │ │ │ 4 │ │ ┃
┃ │ │ │ │ │ min: "D" │ ┃
┃ │ │ │ │ max: "G" │ │
│ │ │ │ │first_row: 100│ ┃
┃ └──────────────┘ │ │ │ │ ┃
┃ │ ┌──────────────┐ │ │ │ ┃
┃ │ │ │ └──────────────┘ │
│ │ Page │ │ ┌──────────────┐ ┃
┃ │ 2 │ │ │ │ │ ┃
┃ │ │ min: 30 │ │ │ Page │ ┃
┃ │ max: 40 │ │ │ 5 │ │
│ │first_row: 200│ │ │ min: "H" │ ┃
┃ │ │ │ │ max: "Z" │ │ ┃
┃ │ │ │ │ │first_row: 250│ ┃
┃ └──────────────┘ │ │ │ │
│ │ └──────────────┘ ┃
┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃
┃ ColumnChunk ColumnChunk ┃
┃ A B
━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━┛
Parquet RowGroup with 2 columns, A and B.
Column A is stored in 2 pages and Column B is
stored in 3 pages
Given the predicate A > 35
, we could rule out page 1 using statistics (max is 20)
This means we can avoid decoding row_index 0->199 for ColumnChunk B (and skip page 3 entirely)
Multi Column Predicates
Given the predicate A > 35 AND B = "F"
:
Using A > 35
, we can rule out page 1 using statistics as before
Using the B = "F"
part, we could rule out pages 3 and 5 (row_index 0->99 and row_index 250->onward)
Thus only rows in indexes 100->244 need to be decoded (Only those ranges are needed in theRowSelector
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So among other things, this implies we probably want to break the pruning predicate down into conjuncts (aka split on AND
) to try and isolate predicates that refer only to a single column
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also means that the "number of containers" in Column A will be "2" (as it as pages 1 and 2) and the number of containers for Column B will be "3" (as it has pages 3, 4, 5).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So among other things, this implies we probably want to break the pruning predicate down into conjuncts (aka split on AND) to try and isolate predicates that refer only to a single column
Yes, i used to think it has been done in FilterPushDownRule
, like break like rewrite the filter-expr to CNF( I means top level operator
are all and
).
But after check the code, i am wrong there not any split logic in logical plan
, i think we should do this in logical plan optimizer like in filter pushdown
as early as possible and do it once🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forget above 😂, yes we need rewrite the pruning predicate down, combine by and
, each sub-expr should refer only one col, if not we will not push it to parquet.
after evaluate each sub-expr one by one will do the intersection get the final result.
The reason why we cannot evaluate muiti-column is in the build_statistics_record_batch
it will return a recordBatch: each col must have the same length(not suitable for pageIndex).
https://github.com/apache/arrow-datafusion/blob/096627a2340aa33526c46eb551283eb88daabbea/datafusion/core/src/physical_optimizer/pruning.rs#L176-L177
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think this is somewhat more specialized than just breaking the expr down into CNF
The difference is that we can only evaluate conjuncts that have a single column reference
} else { | ||
RowSelector::skip(sum_row) | ||
}; | ||
vec.push(selector); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the selector must be alternative skip
or select
, but not show in the doc. I think it should make one api in arrow-rs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
file apache/arrow-rs#2858
Nice! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will try and review this in more detail later today
I ran out of time to day -- will do tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @Ted-Jiang -- this is looking quite good.
I left some comments about how to handle multiple columns
I would like to see some more tests. Specifically:
- Test that combines column metadata pruning with row filtering
- More targeted tests of
prune_pages_in_one_row_group
(specifically setting up various scenarios (including multipleIndex
es) and validating that the output RowSelectors were good
Given this feature is turned off by default, I also think we could merge this PR as is and iterate on master if you prefer and thus I am approving it.
} | ||
|
||
fn num_containers(&self) -> usize { | ||
self.offset_indexes.get(self.col_id).unwrap().len() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the idea of containers
was that it was the unit of thing that has statistics
So since each RowGroup (may) have statistics, when pruning entire RowGroups the num_containers is the same as the number of RowGroups.
When pruning pages, I would expect the number of containers to be the same as the number of pages that have statistics.
// for now we only support pushDown on one col, because each col may have different page numbers, its hard to get | ||
// `num_containers` from `PruningStatistics`. | ||
let cols = predicate.need_input_columns_ids(); | ||
//Todo more specific rules |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is a guarantee that the data pages for each column break at the same row index -- since the statistics are stored per page (and thus for a single column) I am not sure when/if we'll be able to support multi column predicates (aka predicates that refer to different columns).
I tried to draw a diagram to illustrate what I think is going on:
┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┃
┃ ┌──────────────┐ │ ┌──────────────┐ │ ┃
┃ │ │ │ │ │ │ ┃
┃ │ │ │ │ Page │ │
│ │ │ │ │ 3 │ ┃
┃ │ │ │ │ min: "A" │ │ ┃
┃ │ │ │ │ │ max: "C" │ ┃
┃ │ Page │ │ │ first_row: 0 │ │
│ │ 1 │ │ │ │ ┃
┃ │ min: 10 │ │ └──────────────┘ │ ┃
┃ │ │ max: 20 │ │ ┌──────────────┐ ┃
┃ │ first_row: 0 │ │ │ │ │
│ │ │ │ │ Page │ ┃
┃ │ │ │ │ 4 │ │ ┃
┃ │ │ │ │ │ min: "D" │ ┃
┃ │ │ │ │ max: "G" │ │
│ │ │ │ │first_row: 100│ ┃
┃ └──────────────┘ │ │ │ │ ┃
┃ │ ┌──────────────┐ │ │ │ ┃
┃ │ │ │ └──────────────┘ │
│ │ Page │ │ ┌──────────────┐ ┃
┃ │ 2 │ │ │ │ │ ┃
┃ │ │ min: 30 │ │ │ Page │ ┃
┃ │ max: 40 │ │ │ 5 │ │
│ │first_row: 200│ │ │ min: "H" │ ┃
┃ │ │ │ │ max: "Z" │ │ ┃
┃ │ │ │ │ │first_row: 250│ ┃
┃ └──────────────┘ │ │ │ │
│ │ └──────────────┘ ┃
┃ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘ ┃
┃ ColumnChunk ColumnChunk ┃
┃ A B
━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━┛
Parquet RowGroup with 2 columns, A and B.
Column A is stored in 2 pages and Column B is
stored in 3 pages
Given the predicate A > 35
, we could rule out page 1 using statistics (max is 20)
This means we can avoid decoding row_index 0->199 for ColumnChunk B (and skip page 3 entirely)
Multi Column Predicates
Given the predicate A > 35 AND B = "F"
:
Using A > 35
, we can rule out page 1 using statistics as before
Using the B = "F"
part, we could rule out pages 3 and 5 (row_index 0->99 and row_index 250->onward)
Thus only rows in indexes 100->244 need to be decoded (Only those ranges are needed in theRowSelector
)
// for now we only support pushDown on one col, because each col may have different page numbers, its hard to get | ||
// `num_containers` from `PruningStatistics`. | ||
let cols = predicate.need_input_columns_ids(); | ||
//Todo more specific rules |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So among other things, this implies we probably want to break the pruning predicate down into conjuncts (aka split on AND
) to try and isolate predicates that refer only to a single column
// for now we only support pushDown on one col, because each col may have different page numbers, its hard to get | ||
// `num_containers` from `PruningStatistics`. | ||
let cols = predicate.need_input_columns_ids(); | ||
//Todo more specific rules |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also means that the "number of containers" in Column A will be "2" (as it as pages 1 and 2) and the number of containers for Column B will be "3" (as it has pages 3, 4, 5).
offset_indexes: Option<&Vec<Vec<PageLocation>>>, | ||
page_indexes: Option<&Vec<Index>>, | ||
metrics: &ParquetFileMetrics, | ||
) -> Result<Vec<RowSelector>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder what you think about using RowSelection
https://docs.rs/parquet/24.0.0/parquet/arrow/arrow_reader/struct.RowSelection.html rather than Vec<RowSelector>
?
It might be easier to manipulate / update (especially when this is combined with predicate evaluation)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used to use RowSelection
but when facing multi-rowgroups it's hard to combine these RowSelections for each groups, so i changed.
.clone() | ||
.with_scan_options(ParquetScanOptions::default().with_page_index(true)); | ||
|
||
let mut results = parquet_exec_page_index.execute(0, task_ctx)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
Signed-off-by: yangjiang <yangjiang@ebay.com>
@alamb Thanks for your kindly review! 👍👍 |
Sounds good -- thanks @Ted-Jiang |
Thanks again @Ted-Jiang |
Benchmark runs are scheduled for baseline = aa0ded7 and contender = 011bcf4. 011bcf4 is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
…x stats
Signed-off-by: yangjiang yangjiang@ebay.com
Which issue does this PR close?
Related #847.
Rationale for this change
What changes are included in this PR?
Are there any user-facing changes?