Skip to content

Experimental parquet decoder with first-class selection pushdown support #6921

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 32 commits into from

Conversation

XiangpengHao
Copy link
Contributor

@XiangpengHao XiangpengHao commented Dec 30, 2024

Which issue does this PR close?

Many long lasting issues in DataFusion and Parquet. Note that this PR may or may not close these issues, but (imo) it will be the foundation to future more optimizations (e.g., more aggressive selection pushdown as described in this paper).

Why selection pushdown?

Selection pushdown (or late materialization or row-filter or filter pushdown) is great in concept, but can be tricky to implement efficiently. For example, current straightforward implementation actually slow down many queries, which prevents query engine like DataFusion to enable filter pushdown by default. The goal of a super fast row-filter pushdown parquet reader is described by @alamb in #5523 (comment):

is that evaluating predicates in ArrowFilter (aka pushed down predicates) is never worse than decoding the columns first and then filtering them with the filter kernel

Previous discussions have listed many potential optimizations to current selection pushdown, like the ones in #5523 (comment).
However, it's not clear how we can incorporate those optimizations into the current implementation. After thinking more carefully about the design spaces and the implications, I believe the only way to reach that goal is to re-structure the parquet reading pipline, and also reuse as much existing implementation as possible.

Current implementation and the problems

We currently implement a two phase decoding:
Phase 1: Build selectors on each predicate

Empty selector -> decode column 1 -> selection 1
Selection 1 -> decode column 2 -> selection 2
…
Selection K

Phase 2: Decode parquet data using the selector

Selection K -> decode column 1
Selection K -> decode column 2
…
Selection K -> decode column N

The problem is that we have to decode the predicate column twice, for example, if column 1 is being filtered, we need to first decode column 1 while evaluating the predicate, then decode it again to build the array.

Caching is the solution but not that simple

The high level intuition is that, if the problem is decoding twice, we simply cache the first decoding results and reuse later.

Here are the nuances:

  1. In the first stage of decoding, we build the filter over the entire column. To cache the decoded array, we need to cache the entire decoded column in memory, which can be prohibitively expensive.
  2. Each predicate inherit the selection from the previous evaluation, meaning that each predicates have different selection, which is different from the final selection. It's very non-trivial to convert the cached intermediate record batch to the final record batch, as the selection is completely different
  3. The columns being filtered may not be the columns in the final output, and it is especially challenging when we have nested columns, which requires us to do sub-tree matching.

Proposed solutions

The solution consists two parts:

  1. A new decoding pipeline that pushdown the predicate evaluation down to record batch decoding.
  2. **A carefully designed cache that consumes constant amount of memory (up to 2 page per column) to the column size. The extra memory overhead is therefore negligible. **

The pipeline looks like this:

Load predicate columns for batch 1 -> evaluate predicates -> filter 1 -> load & emit batch 1
Load predicate columns for batch 2 -> evaluate predicates -> filter 2 -> load & emit batch 2
...
Load predicate columns for batch N -> evaluate predicates -> filter N -> load & emit batch N

Once we have this pipeline, we can cache the predicate columns for batch N and reuse it when load & emit batch N, this avoids double decoding.

Due to the difficulties mentioned above, this PR cache the decompressed pages, rather than decoded arrow arrays. As some research suggests decompressing pages costs up to twice as much as decoding arrow, if we can cache the decompressed pages, then we only need to decode arrow twice, which might be good enough. Caching decompressed pages is much simpler to implement, as we can reuse the current array_readers and just implement a new PageReader.

What changes are included in this PR?

This PR only implements a reader for async record batch stream. Sync version is left as future work, and should be straightforward based on the async version.

Are there any user-facing changes?

No. The same ParquetRecordBatchStream, will automatically benefit from the changes.

@github-actions github-actions bot added parquet Changes to the parquet crate arrow Changes to the arrow crate labels Dec 30, 2024
@XiangpengHao
Copy link
Contributor Author

Would be great if @alamb @tustvold can take a look and let me know your thoughts!

@alamb
Copy link
Contributor

alamb commented Dec 30, 2024

Would be great if @alamb @tustvold can take a look and let me know your thoughts!

😍 -- looks amazing. I plan to work on arrow-rs reviews tomorrow. I;ll put this one on the top of the list

@XiangpengHao
Copy link
Contributor Author

Implemented some more optimizations and tuning, here are ClickBench numbers on my machine. TLDR: about 15% total time reduction.

We first compare no-pushdown vs our new push down implementation. Only Q27 has meaningful slow down, other queries are either similar or much faster.

The fix for Q27 requires us to actually switch to a boolean mask-based selector implementation, like the one in #6624

┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃ no-pushdown ┃ new-pushdown ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │      0.47ms │       0.43ms │ +1.10x faster │
│ QQuery 1     │     51.10ms │      50.10ms │     no change │
│ QQuery 2     │     68.23ms │      64.49ms │ +1.06x faster │
│ QQuery 3     │     90.68ms │      86.73ms │     no change │
│ QQuery 4     │    458.93ms │     458.59ms │     no change │
│ QQuery 5     │    522.06ms │     478.50ms │ +1.09x faster │
│ QQuery 6     │     49.84ms │      49.94ms │     no change │
│ QQuery 7     │     55.09ms │      55.77ms │     no change │
│ QQuery 8     │    565.26ms │     556.95ms │     no change │
│ QQuery 9     │    575.83ms │     575.05ms │     no change │
│ QQuery 10    │    164.56ms │     178.23ms │  1.08x slower │
│ QQuery 11    │    177.20ms │     191.32ms │  1.08x slower │
│ QQuery 12    │    591.05ms │     569.92ms │     no change │
│ QQuery 13    │    861.06ms │     848.59ms │     no change │
│ QQuery 14    │    596.20ms │     580.73ms │     no change │
│ QQuery 15    │    554.96ms │     548.77ms │     no change │
│ QQuery 16    │   1175.08ms │    1146.07ms │     no change │
│ QQuery 17    │   1150.45ms │    1121.49ms │     no change │
│ QQuery 18    │   2634.75ms │    2494.07ms │ +1.06x faster │
│ QQuery 19    │     90.15ms │      89.24ms │     no change │
│ QQuery 20    │    620.15ms │     591.67ms │     no change │
│ QQuery 21    │    782.38ms │     703.15ms │ +1.11x faster │
│ QQuery 22    │   1927.94ms │    1404.35ms │ +1.37x faster │
│ QQuery 23    │   8104.11ms │    3610.76ms │ +2.24x faster │
│ QQuery 24    │    360.79ms │     330.55ms │ +1.09x faster │
│ QQuery 25    │    290.61ms │     252.54ms │ +1.15x faster │
│ QQuery 26    │    395.18ms │     362.72ms │ +1.09x faster │
│ QQuery 27    │    891.76ms │     959.39ms │  1.08x slower │
│ QQuery 28    │   4059.54ms │    4137.37ms │     no change │
│ QQuery 29    │    235.88ms │     228.99ms │     no change │
│ QQuery 30    │    564.22ms │     584.65ms │     no change │
│ QQuery 31    │    741.20ms │     757.87ms │     no change │
│ QQuery 32    │   2652.48ms │    2574.19ms │     no change │
│ QQuery 33    │   2373.71ms │    2327.10ms │     no change │
│ QQuery 34    │   2391.00ms │    2342.15ms │     no change │
│ QQuery 35    │    700.79ms │     694.51ms │     no change │
│ QQuery 36    │    151.51ms │     152.93ms │     no change │
│ QQuery 37    │    108.18ms │      86.03ms │ +1.26x faster │
│ QQuery 38    │    114.64ms │     106.22ms │ +1.08x faster │
│ QQuery 39    │    260.80ms │     239.13ms │ +1.09x faster │
│ QQuery 40    │     60.74ms │      73.29ms │  1.21x slower │
│ QQuery 41    │     58.75ms │      67.85ms │  1.15x slower │
│ QQuery 42    │     65.49ms │      68.11ms │     no change │
└──────────────┴─────────────┴──────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary           ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (no-pushdown)    │ 38344.79ms │
│ Total Time (new-pushdown)   │ 32800.50ms │
│ Average Time (no-pushdown)  │   891.74ms │
│ Average Time (new-pushdown) │   762.80ms │
│ Queries Faster              │         13 │
│ Queries Slower              │          5 │
│ Queries with No Change      │         25 │
└─────────────────────────────┴────────────┘

Now we compare our new implementation with the old pushdown implementation -- only Q23 is a bit slower, others are either faster or similar.

We do need some extra work to get the optimal performance of Q23. Nonetheless, we are faster than no-pushdown. I believe getting a fix for Q23 does not require foundamental changes to the existing decoding pipeline.

┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃  pushdown ┃ new-pushdown ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ QQuery 0     │    0.48ms │       0.43ms │ +1.12x faster │
│ QQuery 1     │   51.49ms │      50.10ms │     no change │
│ QQuery 2     │   67.83ms │      64.49ms │     no change │
│ QQuery 3     │   89.68ms │      86.73ms │     no change │
│ QQuery 4     │  469.88ms │     458.59ms │     no change │
│ QQuery 5     │  523.97ms │     478.50ms │ +1.10x faster │
│ QQuery 6     │   50.37ms │      49.94ms │     no change │
│ QQuery 7     │   56.89ms │      55.77ms │     no change │
│ QQuery 8     │  560.69ms │     556.95ms │     no change │
│ QQuery 9     │  583.14ms │     575.05ms │     no change │
│ QQuery 10    │  155.75ms │     178.23ms │  1.14x slower │
│ QQuery 11    │  170.31ms │     191.32ms │  1.12x slower │
│ QQuery 12    │  723.13ms │     569.92ms │ +1.27x faster │
│ QQuery 13    │ 1181.34ms │     848.59ms │ +1.39x faster │
│ QQuery 14    │  736.95ms │     580.73ms │ +1.27x faster │
│ QQuery 15    │  551.74ms │     548.77ms │     no change │
│ QQuery 16    │ 1171.99ms │    1146.07ms │     no change │
│ QQuery 17    │ 1152.34ms │    1121.49ms │     no change │
│ QQuery 18    │ 2555.82ms │    2494.07ms │     no change │
│ QQuery 19    │   84.20ms │      89.24ms │  1.06x slower │
│ QQuery 20    │  606.77ms │     591.67ms │     no change │
│ QQuery 21    │  704.86ms │     703.15ms │     no change │
│ QQuery 22    │ 1633.53ms │    1404.35ms │ +1.16x faster │
│ QQuery 23    │ 2691.84ms │    3610.76ms │  1.34x slower │
│ QQuery 24    │  528.09ms │     330.55ms │ +1.60x faster │
│ QQuery 25    │  465.38ms │     252.54ms │ +1.84x faster │
│ QQuery 26    │  562.40ms │     362.72ms │ +1.55x faster │
│ QQuery 27    │ 1121.76ms │     959.39ms │ +1.17x faster │
│ QQuery 28    │ 4455.16ms │    4137.37ms │ +1.08x faster │
│ QQuery 29    │  234.18ms │     228.99ms │     no change │
│ QQuery 30    │  596.22ms │     584.65ms │     no change │
│ QQuery 31    │  754.21ms │     757.87ms │     no change │
│ QQuery 32    │ 2570.52ms │    2574.19ms │     no change │
│ QQuery 33    │ 2357.37ms │    2327.10ms │     no change │
│ QQuery 34    │ 2377.89ms │    2342.15ms │     no change │
│ QQuery 35    │  703.78ms │     694.51ms │     no change │
│ QQuery 36    │  162.29ms │     152.93ms │ +1.06x faster │
│ QQuery 37    │  129.96ms │      86.03ms │ +1.51x faster │
│ QQuery 38    │   90.79ms │     106.22ms │  1.17x slower │
│ QQuery 39    │  220.71ms │     239.13ms │  1.08x slower │
│ QQuery 40    │   72.87ms │      73.29ms │     no change │
│ QQuery 41    │   70.04ms │      67.85ms │     no change │
│ QQuery 42    │   68.17ms │      68.11ms │     no change │
└──────────────┴───────────┴──────────────┴───────────────┘
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
┃ Benchmark Summary           ┃            ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
│ Total Time (pushdown)       │ 34116.80ms │
│ Total Time (new-pushdown)   │ 32800.50ms │
│ Average Time (pushdown)     │   793.41ms │
│ Average Time (new-pushdown) │   762.80ms │
│ Queries Faster              │         13 │
│ Queries Slower              │          6 │
│ Queries with No Change      │         24 │
└─────────────────────────────┴────────────┘

@XiangpengHao
Copy link
Contributor Author

XiangpengHao commented Jan 1, 2025

The implementation of course lacks tons of tests (I tried to mannually verify the clickbench results). If the high level stuff looks good, I'll try to send break down PRs that has tests and documentations.

Like most performance related PRs, some of the code changes can be very non-intuitive, please let me know and I'll try my best to explain why some codes has to implement in that way

@alamb
Copy link
Contributor

alamb commented Jan 1, 2025

Starting to check it out

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @XiangpengHao -- TLDR I think this POC looks really nice and the overall structure makes sense to me. I am willing to help review this PR as it moves closer to reality

There are obvious ways to break this PR up into pieces, which is a nice bonus -- the core caching logic is fairly localized

cc @thinkharderdev @tustvold @Dandandan @etseidl for your comments / reviews as well

I also think the description on the PR is quite good and easy to follow -- thank you for that

(todo: cite myself)

😆 my favorite part of the description

if we can cache the decompressed pages, then we only need to decode arrow twice, which might be good enough.

We can also consider caching arrow as a follow on PR / project. If this initial PR effectively avoids decompressing each page twice (though it still decodes each page to arrow twice) that still seems better than the current main branch which decompresses and decodes twice.

Copy link
Contributor

@thinkharderdev thinkharderdev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice @XiangpengHao. I think this makes a lot of sense.

@alamb
Copy link
Contributor

alamb commented Mar 22, 2025

The test failure is a new test added in modular encryption (doesn't error when it should). I am looking at it

@alamb
Copy link
Contributor

alamb commented Mar 25, 2025

Here is another blog post about this PR:

@@ -464,6 +465,8 @@ impl ByteViewArrayDecoderDictionary {
}
}

output.views.reserve(len);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb
Copy link
Contributor

alamb commented Mar 31, 2025

Update:

Next steps:

  1. @XiangpengHao will write up the current state of the affairs / document the existing code better

This is done via two blogs

  1. We then Rebase the PR against main

Done

  1. Rerun the clickbench / tpch DataFusion benchmarks again

I am in the process of doing this.

If the benchmarks look good, I'll help polish this PR up, get it ready for merge, and file next steps

@alamb
Copy link
Contributor

alamb commented Mar 31, 2025

@alamb
Copy link
Contributor

alamb commented Jul 25, 2025

Let's continue work in #7850

@alamb alamb closed this Jul 25, 2025
alamb added a commit that referenced this pull request Aug 8, 2025
…sync_reader) (#7850)

This is my latest attempt to make pushdown faster. Prior art: #6921

cc @alamb @zhuqi-lucas 

- Part of #8000
- Related to apache/datafusion#3463
- Related to #7456
- Closes #7363
- Closes #8003

## Problems of #6921

1. It proactively loads entire row group into memory. (rather than only
loading pages that passing the filter predicate)
2. It only cache decompressed pages, still paying the decoding cost
twice.

This PR takes a different approach, it does not change the decoding
pipeline, so we avoid the problem 1. It also caches the arrow record
batch, so avoid problem 2.

But this means we need to use more memory to cache data.

## How it works?

1. It instruments the `array_readers` with a transparent
`cached_array_reader`.
2. The cache layer will first consult the `RowGroupCache` to look for a
batch, and only reads from underlying reader on a cache miss.
3. There're cache producer and cache consumer. Producer is when we build
filters we insert arrow arrays into cache, consumer is when we build
outputs, we remove arrow array from cache. So the memory usage should
look like this:
```
    ▲
    │     ╭─╮
    │    ╱   ╲
    │   ╱     ╲
    │  ╱       ╲
    │ ╱         ╲
    │╱           ╲
    └─────────────╲──────► Time
    │      │      │
    Filter  Peak  Consume
    Phase (Built) (Decrease)
```
In a concurrent setup, not all reader may reach the peak point at the
same time, so the peak system memory usage might be lower.

4. It has a max_cache_size knob, this is a per row group setting. If the
row group has used up the budget, the cache stops taking new data. and
the `cached_array_reader` will fallback to read and decode from Parquet.

## Other benefits

1. This architecture allows nested columns (but not implemented in this
pr), i.e., it's future proof.
2. There're many performance optimizations to further squeeze the
performance, but even with current state, it has no regressions.

## How does it perform?

My criterion somehow won't produces a result from `--save-baseline`, so
I asked llm to generate a table from this benchmark:
```
cargo bench --bench arrow_reader_clickbench --features "arrow async" "async"
```

`Baseline` is the implementation for current main branch.
`New Unlimited` is the new pushdown with unlimited memory budget.
`New 100MB` is the new pushdown but the memory budget for a row group
caching is 100MB.

```
Query  | Baseline (ms) | New Unlimited (ms) | Diff (ms)  | New 100MB (ms) | Diff (ms)
-------+--------------+--------------------+-----------+----------------+-----------
Q1     | 0.847          | 0.803               | -0.044     | 0.812          | -0.035    
Q10    | 4.060          | 6.273               | +2.213     | 6.216          | +2.156    
Q11    | 5.088          | 7.152               | +2.064     | 7.193          | +2.105    
Q12    | 18.485         | 14.937              | -3.548     | 14.904         | -3.581    
Q13    | 24.859         | 21.908              | -2.951     | 21.705         | -3.154    
Q14    | 23.994         | 20.691              | -3.303     | 20.467         | -3.527    
Q19    | 1.894          | 1.980               | +0.086     | 1.996          | +0.102    
Q20    | 90.325         | 64.689              | -25.636    | 74.478         | -15.847   
Q21    | 106.610        | 74.766              | -31.844    | 99.557         | -7.053    
Q22    | 232.730        | 101.660             | -131.070   | 204.800        | -27.930   
Q23    | 222.800        | 186.320             | -36.480    | 186.590        | -36.210   
Q24    | 24.840         | 19.762              | -5.078     | 19.908         | -4.932    
Q27    | 80.463         | 47.118              | -33.345    | 49.597         | -30.866   
Q28    | 78.999         | 47.583              | -31.416    | 51.432         | -27.567   
Q30    | 28.587         | 28.710              | +0.123     | 28.926         | +0.339    
Q36    | 80.157         | 57.954              | -22.203    | 58.012         | -22.145   
Q37    | 46.962         | 45.901              | -1.061     | 45.386         | -1.576    
Q38    | 16.324         | 16.492              | +0.168     | 16.522         | +0.198    
Q39    | 20.754         | 20.734              | -0.020     | 20.648         | -0.106    
Q40    | 22.554         | 21.707              | -0.847     | 21.995         | -0.559    
Q41    | 16.430         | 16.391              | -0.039     | 16.581         | +0.151    
Q42    | 6.045          | 6.157               | +0.112     | 6.120          | +0.075    
```

1. If we consider the diff within 5ms to be noise, then we are never
worse than the current implementation.
2. We see significant improvements for string-heavy queries, because
string columns are large, they take time to decompress and decode.
3. 100MB cache budget seems to have small performance impact.


## Limitations

1. It only works for async readers, because sync reader do not follow
the same row group by row group structure.
2. It is memory hungry -- compared to #6921. But changing decoding
pipeline without eager loading entire row group would require
significant changes to the current decoding infrastructure, e.g., we
need to make page iterator an async function.
3. It currently doesn't support nested columns, more specifically, it
doesn't support nested columns with nullable parents. but supporting it
is straightforward, no big changes.
4. The current memory accounting is not accurate, it will overestimate
the memory usage, especially when reading string view arrays, where
multiple string view may share the same underlying buffer, and that
buffer size is counted twice. Anyway, we never exceeds the user
configured memory usage.
5. If one row passes the filter, the entire batch will be cached. We can
probably optimize this though.

## Next steps?

This pr is largely proof of concept, I want to collect some feedback
before sending a multi-thousands pr :)

Some items I can think of:
1. Design an interface for user to specify the cache size limit,
currently it's hard-coded.
2. Don't instrument nested array reader if the parquet file has nullable
parent. currently it will panic
3. More testing, and integration test/benchmark with Datafusion

---------

Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
parquet Changes to the parquet crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants