Conversation
9789b9f to
d5c4c5f
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
It's happening! |
This comment has been minimized.
This comment has been minimized.
Yes! Finally -- and no regressions this time! I have a pile of stacked PRs:
Now that I have some evidence that this PR make them faster, I'll go back and get them ready to review |
…er` (#21327) ~(Draft until I am sure I can use this API to make FileStream behave better)~ ## Which issue does this PR close? - part of #20529 - Needed for #21351 - Broken out of #20820 - Closes #21427 ## Rationale for this change I can get 10% faster on many ClickBench queries by reordeirng files at runtime. You can see it all working together here: #21351 To do do, I need to rework the FileStream so that it can reorder operations at runtime. Eventually that will include both CPU and IO. This PR is a step in the direction by introducing the main Morsel API and implementing it for Parquet. The next PR (#21342) rewrites FileStream in terms of the Morsel API ## What changes are included in this PR? 1. Add proposed `Morsel` API 2. Rewrite Parquet opener in terms of that API 3. Add an adapter layer (back to FileOpener, so I don't have to rewrite FileStream in the same PR) My next PR will rewrite the FileStream to use the Morsel API ## Are these changes tested? Yes by existing CI. I will work on adding additional tests for just Parquet opener in a follow on PR ## Are there any user-facing changes? No
acde88a to
6b79a6f
Compare
This comment has been minimized.
This comment has been minimized.
f1f0079 to
19f5120
Compare
Yes |
There was a problem hiding this comment.
This PR has almost 500 lines of tests / test infrastructure. The actual code changes are pretty small
I recommend reviewing this PR:
- Start with the API changes to the
DataSourcetrait - Look at the changes to ScanState to see how the shared state is used
- Review the tests in file_stream/mod.rs
- Most of the rest of the code changes are plumbing to get the shared state into ScanState
| self.open_with_args(OpenArgs::new(partition, context)) | ||
| } | ||
|
|
||
| fn open_with_args(&self, args: OpenArgs) -> Result<SendableRecordBatchStream> { |
There was a problem hiding this comment.
Added an open_with_args API to mirror other with_args APIs such as TableSource::scan_with_args:
datafusion/datafusion/catalog/src/table.rs
Lines 209 to 221 in bb1c8e6
The new API was required to pass in the shared state (aka to connect sibling streams so they can share / reorder work)
| ----- Partition 0 ----- | ||
| Done | ||
| ----- Partition 1 ----- | ||
| Batch: 101 |
There was a problem hiding this comment.
note that partition 1 has run all the files
| mod builder; | ||
| mod metrics; | ||
| mod scan_state; | ||
| pub(crate) mod work_source; |
There was a problem hiding this comment.
Note that there are no changes to the FIleStream -- this is only test changes
| /// Files that still need to be planned. | ||
| file_iter: VecDeque<PartitionedFile>, | ||
| /// Unopened files that still need to be planned for this stream. | ||
| work_source: WorkSource, |
There was a problem hiding this comment.
Here is the key difference -- instead of a local queue there is now a (potentially) shared work source
| } | ||
| } | ||
|
|
||
| /// Shared source of work for sibling `FileStream`s |
There was a problem hiding this comment.
At the moment, the work source only supports entire files, but I can imagine it getting more sophisticated and supporting morsels and morsel planners too (to do work stealing, etc)
| None | ||
| } | ||
|
|
||
| /// Create per execution state to share across sibling instances of this |
There was a problem hiding this comment.
These are they key new APIs
2354cf6 to
1b1b586
Compare
|
run benchmarks |
|
run benchmark clickbench_partitioned |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing alamb/reschedule_io (1b1b586) to 961c5fc (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing alamb/reschedule_io (1b1b586) to 961c5fc (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing alamb/reschedule_io (1b1b586) to 961c5fc (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing alamb/reschedule_io (1b1b586) to 961c5fc (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
Mind taking it out of draft if it is ready for review? |
Yes I will do so for sure. I wanted to make sure it was ready (passing tests, comments all polished) before doing so. I expect to mark it ready very soon |
|
Ok, this is ready for review. I am so sorry for the size of the PR but it is mostly comments and tests |
|
Awesome, so the PR changes who reads which file at runtime using morselizer, would be extremely interesting to try this on many small files environments. Do we expect improvements for even partitions(partition have the similar number of files with similar sizes)? Is it planned to morselize deeper to process row groups in parallel? This activity actually reminds me of #19815 benchmark. |
In my experience, there is always a some partition skew even for very balanced scans on local FS.
Yes - it is the plan to split morsels into sub-row-group morsels, so smaller datasets (e.g. TPC-DS at SF=1 which has single-row group files) or high-cpu machines (due to too little parallelism) will benefit more as well. Currently parallelism is limited in datasets with few row groups as we can't go beyond row groups. |
| /// | ||
| /// Streams that may share work across siblings use [`WorkSource::Shared`], | ||
| /// while streams that can not share work (e.g. because they must preserve file | ||
| /// order) use [`WorkSource::Local`]. |
There was a problem hiding this comment.
| /// order) use [`WorkSource::Local`]. | |
| /// order) use [`WorkSource::Local`]. |
|
run benchmark tpch_csv tpch_csv10 h2o_medium |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing alamb/reschedule_io (5287210) to 961c5fc (merge-base) diff File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing alamb/reschedule_io (5287210) to 961c5fc (merge-base) diff File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |
|
🤖 Criterion benchmark running (GKE) | trigger CPU Details (lscpu)Comparing alamb/reschedule_io (5287210) to 961c5fc (merge-base) diff File an issue against this benchmark runner |
|
Benchmark for this request failed. Last 20 lines of output: Click to expandFile an issue against this benchmark runner |

Which issue does this PR close?
Rationale for this change
This PR finally enables dynamic work scheduling in the FileStream (so that if a task is done it can look at any remaining work)
This improves performance on queries that scan multiple files and the work is not balanced evenly across partitions in the plan (e.g. we have dynamic filtering that reduces work significantly)
It is the last of a sequence of several PRs:
ParquetOpenertoParquetMorselizer#21327What changes are included in this PR?
Note there are a bunch of other things that are NOT included in this PR, including
As @Dandandan proposes below, I expect we can work on those changes as follow on PRs.
Are these changes tested?
Yes by existing functional and benchmark tests, as well as new functional tests
Are there any user-facing changes?
Yes, faster performance (see benchmarks): #21351 (comment)