Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement parquet page-level skipping with column index, using min/ma… #3780

Merged
merged 5 commits into from
Oct 15, 2022

Conversation

Ted-Jiang
Copy link
Member

…x stats

Signed-off-by: yangjiang yangjiang@ebay.com

Which issue does this PR close?

Related #847.

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

…x stats

Signed-off-by: yangjiang <yangjiang@ebay.com>
@Ted-Jiang Ted-Jiang marked this pull request as draft October 10, 2022 12:30
@github-actions github-actions bot added the core Core DataFusion crate label Oct 10, 2022
Signed-off-by: yangjiang <yangjiang@ebay.com>
Signed-off-by: yangjiang <yangjiang@ebay.com>
@Ted-Jiang
Copy link
Member Author

I run the tpch-q1 modify the filter ">=" to "<=" for skipping more pages

yangjiang@LM-SHC-15009782 datafusion-cli % cargo run --release --bin datafusion-cli
    Finished release [optimized] target(s) in 1.14s
     Running `target/release/datafusion-cli`
DataFusion CLI v13.0.0
❯ create external table lineitem stored as parquet location '/Users/yangjiang/test-data/1g_tpch_pageIndex/lineitem_order';
0 rows in set. Query took 0.004 seconds.
❯ select
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
        l_shipdate <= date '1996-09-02'
group by
    l_returnflag,
    l_linestatus
order by
    l_returnflag,
    l_linestatus;
+--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------------+-------------+
| l_returnflag | l_linestatus | sum_qty  | sum_base_price     | sum_disc_price     | sum_charge         | avg_qty            | avg_price          | avg_disc            | count_order |
+--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------------+-------------+
| A            | F            | 37734107 | 56586554400.729996 | 53758257134.87001  | 55909065222.8277   | 25.522005853257337 | 38273.12973462167  | 0.04998529583839799 | 1478493     |
| N            | F            | 991417   | 1487504710.379999  | 1413082168.0540993 | 1469649223.194375  | 25.516471920522985 | 38284.467760848274 | 0.05009342667421619 | 38854       |
| N            | O            | 28196030 | 42296870106.47     | 40182807989.9758   | 41790214897.953186 | 25.51279800791189  | 38271.75327140934  | 0.050005284245347   | 1105172     |
| R            | F            | 37719753 | 56568041380.90001  | 53741292684.604    | 55889619119.831924 | 25.50579361269077  | 38250.854626099666 | 0.05000940583012741 | 1478870     |
+--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+---------------------+-------------+
4 rows in set. Query took 0.245 seconds.

then set with_page_index

select
    l_returnflag,
    l_linestatus,
    sum(l_quantity) as sum_qty,
    sum(l_extendedprice) as sum_base_price,
    sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
    sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
    avg(l_quantity) as avg_qty,
    avg(l_extendedprice) as avg_price,
    avg(l_discount) as avg_disc,
    count(*) as count_order
from
    lineitem
where
        l_shipdate <= date '1996-09-02'
group by
    l_returnflag,
    l_linestatus
order by
    l_returnflag,
    l_linestatus;
+--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+
| l_returnflag | l_linestatus | sum_qty  | sum_base_price     | sum_disc_price     | sum_charge         | avg_qty            | avg_price          | avg_disc             | count_order |
+--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+
| A            | F            | 37734107 | 56586554400.73     | 53758257134.869995 | 55909065222.82769  | 25.522005853257337 | 38273.129734621674 | 0.04998529583839799  | 1478493     |
| N            | F            | 991417   | 1487504710.3799992 | 1413082168.0540993 | 1469649223.1943748 | 25.516471920522985 | 38284.46776084828  | 0.05009342667421619  | 38854       |
| N            | O            | 28196030 | 42296870106.47002  | 40182807989.97579  | 41790214897.95318  | 25.51279800791189  | 38271.753271409354 | 0.050005284245347004 | 1105172     |
| R            | F            | 37719753 | 56568041380.9      | 53741292684.604004 | 55889619119.831924 | 25.50579361269077  | 38250.85462609966  | 0.05000940583012743  | 1478870     |
+--------------+--------------+----------+--------------------+--------------------+--------------------+--------------------+--------------------+----------------------+-------------+
4 rows in set. Query took 0.174 seconds.

And with log=debug got

[2022-10-11T11:35:38Z DEBUG datafusion::physical_plan::file_format::parquet] 
Use filter and page index create RowSelection
 [[RowSelector { row_count: 3820000, skip: false }], 
[RowSelector { row_count: 300000, skip: false }, 
RowSelector { row_count: 1900000, skip: true }]]

Seems get ~1.4x Performance gain.

}

fn num_containers(&self) -> usize {
self.offset_indexes.get(self.col_id).unwrap().len()
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb PTAL, 🤔 for now num_containers only return on values
I think we should modify it to

fn num_containers(&self, column: &Column) -> usize {

because each column chunk in one row group has different page numbers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea of containers was that it was the unit of thing that has statistics

So since each RowGroup (may) have statistics, when pruning entire RowGroups the num_containers is the same as the number of RowGroups.

When pruning pages, I would expect the number of containers to be the same as the number of pages that have statistics.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to explain this more clearly in the diagram

Copy link
Member Author

@Ted-Jiang Ted-Jiang Oct 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree! as you mention:

It also means that the "number of containers" in Column A will be "2" (as it as pages 1 and 2) and the number of containers for Column B will be "3" (as it has pages 3, 4, 5).

we should base on the column identification to return the number of containers, but hard to do this base on this method
~~ https://github.com/apache/arrow-datafusion/blob/e7edac5b66f8b929382a582021165f97392bd50c/datafusion/core/src/physical_optimizer/pruning.rs#L81-L83 ~~

I will modify this in following pr and try support multi cols predicates.
--edit
Yes, after split the filter-expr, it only related to one col, so no need change it and no need pass all cols index!
👍 Thanks

// for now we only support pushDown on one col, because each col may have different page numbers, its hard to get
// `num_containers` from `PruningStatistics`.
let cols = predicate.need_input_columns_ids();
//Todo more specific rules
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now, cause of num_containers we only support only one col. In the future, we could add more specific rules.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is a guarantee that the data pages for each column break at the same row index -- since the statistics are stored per page (and thus for a single column) I am not sure when/if we'll be able to support multi column predicates (aka predicates that refer to different columns).

I tried to draw a diagram to illustrate what I think is going on:

┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ 
   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ┃
┃     ┌──────────────┐  │     ┌──────────────┐  │  ┃
┃  │  │              │     │  │              │     ┃
┃     │              │  │     │     Page     │  │   
   │  │              │     │  │      3       │     ┃
┃     │              │  │     │   min: "A"   │  │  ┃
┃  │  │              │     │  │   max: "C"   │     ┃
┃     │     Page     │  │     │ first_row: 0 │  │   
   │  │      1       │     │  │              │     ┃
┃     │   min: 10    │  │     └──────────────┘  │  ┃
┃  │  │   max: 20    │     │  ┌──────────────┐     ┃
┃     │ first_row: 0 │  │     │              │  │   
   │  │              │     │  │     Page     │     ┃
┃     │              │  │     │      4       │  │  ┃
┃  │  │              │     │  │   min: "D"   │     ┃
┃     │              │  │     │   max: "G"   │  │   
   │  │              │     │  │first_row: 100│     ┃
┃     └──────────────┘  │     │              │  │  ┃
┃  │  ┌──────────────┐     │  │              │     ┃
┃     │              │  │     └──────────────┘  │   
   │  │     Page     │     │  ┌──────────────┐     ┃
┃     │      2       │  │     │              │  │  ┃
┃  │  │   min: 30    │     │  │     Page     │     ┃
┃     │   max: 40    │  │     │      5       │  │   
   │  │first_row: 200│     │  │   min: "H"   │     ┃
┃     │              │  │     │   max: "Z"   │  │  ┃
┃  │  │              │     │  │first_row: 250│     ┃
┃     └──────────────┘  │     │              │  │   
   │                       │  └──────────────┘     ┃
┃   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃       ColumnChunk            ColumnChunk         ┃
┃            A                      B               
 ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━┛
                                                    
                                                    
   Parquet RowGroup with 2 columns, A and B.        
                                                    
 Column A is stored in 2 pages and Column B is      
               stored in 3 pages                    
                                                    
                                                    
                                                   

Given the predicate A > 35 , we could rule out page 1 using statistics (max is 20)

This means we can avoid decoding row_index 0->199 for ColumnChunk B (and skip page 3 entirely)

Multi Column Predicates

Given the predicate A > 35 AND B = "F":

Using A > 35, we can rule out page 1 using statistics as before

Using the B = "F" part, we could rule out pages 3 and 5 (row_index 0->99 and row_index 250->onward)

Thus only rows in indexes 100->244 need to be decoded (Only those ranges are needed in theRowSelector)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So among other things, this implies we probably want to break the pruning predicate down into conjuncts (aka split on AND) to try and isolate predicates that refer only to a single column

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also means that the "number of containers" in Column A will be "2" (as it as pages 1 and 2) and the number of containers for Column B will be "3" (as it has pages 3, 4, 5).

Copy link
Member Author

@Ted-Jiang Ted-Jiang Oct 14, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So among other things, this implies we probably want to break the pruning predicate down into conjuncts (aka split on AND) to try and isolate predicates that refer only to a single column

Yes, i used to think it has been done in FilterPushDownRule, like break like rewrite the filter-expr to CNF( I means top level operator are all and).

But after check the code, i am wrong there not any split logic in logical plan, i think we should do this in logical plan optimizer like in filter pushdown as early as possible and do it once🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forget above 😂, yes we need rewrite the pruning predicate down, combine by and, each sub-expr should refer only one col, if not we will not push it to parquet.
after evaluate each sub-expr one by one will do the intersection get the final result.

The reason why we cannot evaluate muiti-column is in the build_statistics_record_batch it will return a recordBatch: each col must have the same length(not suitable for pageIndex).
https://github.com/apache/arrow-datafusion/blob/096627a2340aa33526c46eb551283eb88daabbea/datafusion/core/src/physical_optimizer/pruning.rs#L176-L177

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think this is somewhat more specialized than just breaking the expr down into CNF

The difference is that we can only evaluate conjuncts that have a single column reference

@Ted-Jiang Ted-Jiang marked this pull request as ready for review October 11, 2022 11:53
} else {
RowSelector::skip(sum_row)
};
vec.push(selector);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the selector must be alternative skip or select , but not show in the doc. I think it should make one api in arrow-rs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb
Copy link
Contributor

alamb commented Oct 11, 2022

Seems get ~1.4x Performance gain.

Nice!

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try and review this in more detail later today

@alamb
Copy link
Contributor

alamb commented Oct 12, 2022

I ran out of time to day -- will do tomorrow

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @Ted-Jiang -- this is looking quite good.

I left some comments about how to handle multiple columns

I would like to see some more tests. Specifically:

  1. Test that combines column metadata pruning with row filtering
  2. More targeted tests of prune_pages_in_one_row_group (specifically setting up various scenarios (including multiple Indexes) and validating that the output RowSelectors were good

Given this feature is turned off by default, I also think we could merge this PR as is and iterate on master if you prefer and thus I am approving it.

datafusion/core/src/physical_plan/file_format/parquet.rs Outdated Show resolved Hide resolved
}

fn num_containers(&self) -> usize {
self.offset_indexes.get(self.col_id).unwrap().len()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the idea of containers was that it was the unit of thing that has statistics

So since each RowGroup (may) have statistics, when pruning entire RowGroups the num_containers is the same as the number of RowGroups.

When pruning pages, I would expect the number of containers to be the same as the number of pages that have statistics.

// for now we only support pushDown on one col, because each col may have different page numbers, its hard to get
// `num_containers` from `PruningStatistics`.
let cols = predicate.need_input_columns_ids();
//Todo more specific rules
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is a guarantee that the data pages for each column break at the same row index -- since the statistics are stored per page (and thus for a single column) I am not sure when/if we'll be able to support multi column predicates (aka predicates that refer to different columns).

I tried to draw a diagram to illustrate what I think is going on:

┏━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ 
   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─   ┃
┃     ┌──────────────┐  │     ┌──────────────┐  │  ┃
┃  │  │              │     │  │              │     ┃
┃     │              │  │     │     Page     │  │   
   │  │              │     │  │      3       │     ┃
┃     │              │  │     │   min: "A"   │  │  ┃
┃  │  │              │     │  │   max: "C"   │     ┃
┃     │     Page     │  │     │ first_row: 0 │  │   
   │  │      1       │     │  │              │     ┃
┃     │   min: 10    │  │     └──────────────┘  │  ┃
┃  │  │   max: 20    │     │  ┌──────────────┐     ┃
┃     │ first_row: 0 │  │     │              │  │   
   │  │              │     │  │     Page     │     ┃
┃     │              │  │     │      4       │  │  ┃
┃  │  │              │     │  │   min: "D"   │     ┃
┃     │              │  │     │   max: "G"   │  │   
   │  │              │     │  │first_row: 100│     ┃
┃     └──────────────┘  │     │              │  │  ┃
┃  │  ┌──────────────┐     │  │              │     ┃
┃     │              │  │     └──────────────┘  │   
   │  │     Page     │     │  ┌──────────────┐     ┃
┃     │      2       │  │     │              │  │  ┃
┃  │  │   min: 30    │     │  │     Page     │     ┃
┃     │   max: 40    │  │     │      5       │  │   
   │  │first_row: 200│     │  │   min: "H"   │     ┃
┃     │              │  │     │   max: "Z"   │  │  ┃
┃  │  │              │     │  │first_row: 250│     ┃
┃     └──────────────┘  │     │              │  │   
   │                       │  └──────────────┘     ┃
┃   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘   ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘  ┃
┃       ColumnChunk            ColumnChunk         ┃
┃            A                      B               
 ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━━ ━━┛
                                                    
                                                    
   Parquet RowGroup with 2 columns, A and B.        
                                                    
 Column A is stored in 2 pages and Column B is      
               stored in 3 pages                    
                                                    
                                                    
                                                   

Given the predicate A > 35 , we could rule out page 1 using statistics (max is 20)

This means we can avoid decoding row_index 0->199 for ColumnChunk B (and skip page 3 entirely)

Multi Column Predicates

Given the predicate A > 35 AND B = "F":

Using A > 35, we can rule out page 1 using statistics as before

Using the B = "F" part, we could rule out pages 3 and 5 (row_index 0->99 and row_index 250->onward)

Thus only rows in indexes 100->244 need to be decoded (Only those ranges are needed in theRowSelector)

// for now we only support pushDown on one col, because each col may have different page numbers, its hard to get
// `num_containers` from `PruningStatistics`.
let cols = predicate.need_input_columns_ids();
//Todo more specific rules
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So among other things, this implies we probably want to break the pruning predicate down into conjuncts (aka split on AND) to try and isolate predicates that refer only to a single column

// for now we only support pushDown on one col, because each col may have different page numbers, its hard to get
// `num_containers` from `PruningStatistics`.
let cols = predicate.need_input_columns_ids();
//Todo more specific rules
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also means that the "number of containers" in Column A will be "2" (as it as pages 1 and 2) and the number of containers for Column B will be "3" (as it has pages 3, 4, 5).

offset_indexes: Option<&Vec<Vec<PageLocation>>>,
page_indexes: Option<&Vec<Index>>,
metrics: &ParquetFileMetrics,
) -> Result<Vec<RowSelector>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what you think about using RowSelection https://docs.rs/parquet/24.0.0/parquet/arrow/arrow_reader/struct.RowSelection.html rather than Vec<RowSelector>?

It might be easier to manipulate / update (especially when this is combined with predicate evaluation)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used to use RowSelection but when facing multi-rowgroups it's hard to combine these RowSelections for each groups, so i changed.

.clone()
.with_scan_options(ParquetScanOptions::default().with_page_index(true));

let mut results = parquet_exec_page_index.execute(0, task_ctx)?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Signed-off-by: yangjiang <yangjiang@ebay.com>
Signed-off-by: yangjiang <yangjiang@ebay.com>
@Ted-Jiang
Copy link
Member Author

Ted-Jiang commented Oct 14, 2022

Thanks @Ted-Jiang -- this is looking quite good.

I left some comments about how to handle multiple columns

I would like to see some more tests. Specifically:

  1. Test that combines column metadata pruning with row filtering
  2. More targeted tests of prune_pages_in_one_row_group (specifically setting up various scenarios (including multiple Indexes) and validating that the output RowSelectors were good

Given this feature is turned off by default, I also think we could merge this PR as is and iterate on master if you prefer and thus I am approving it.

@alamb Thanks for your kindly review! 👍👍
i prefer iterate it on master. I have file the following issue:
#3834 , #3833 and add more tests 💪

@alamb
Copy link
Contributor

alamb commented Oct 15, 2022

i prefer iterate it on master. I have file the following issue:

Sounds good -- thanks @Ted-Jiang

@alamb alamb merged commit 011bcf4 into apache:master Oct 15, 2022
@alamb
Copy link
Contributor

alamb commented Oct 15, 2022

Thanks again @Ted-Jiang

@ursabot
Copy link

ursabot commented Oct 15, 2022

Benchmark runs are scheduled for baseline = aa0ded7 and contender = 011bcf4. 011bcf4 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants