Open
Description
Describe the bug
I'm testing performance of querying a number of Parquet files, where I can make some assumptions about the Parquet files.
- Each Parquet file is already sorted on the column "timestamp".
- Each Parquet file does not overlap values on the column "timestamp". For instance, file A has values for timestamps for 2022, and file B has values for timestamps 2023.
The schema of the files are:
- "timestamp": TimestampMillisecond
- "value": Float64
Consider the following query and it's query plan:
SELECT timestamp, value
FROM samples
ORDER BY timestamp ASC
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | SortPreservingMergeExec: [timestamp@0 ASC], metrics=[output_rows=1000000, elapsed_compute=572.526968ms] |
| | ParquetExec: file_groups={20 groups: [[0.parquet], [1.parquet], [2.parquet], [3.parquet], [4.parquet], ...]}, projection=[timestamp, value], output_ordering=[timestamp@0 ASC], metrics=[output_rows=1000000, elapsed_compute=20ns, num_predicate_creation_errors=0, predicate_evaluation_errors=0, bytes_scanned=57972, page_index_rows_filtered=0, row_groups_pruned=0, pushdown_rows_filtered=0, time_elapsed_processing=51.918935ms, page_index_eval_time=40ns, time_elapsed_scanning_total=48.94925ms, time_elapsed_opening=2.996325ms, time_elapsed_scanning_until_data=48.311008ms, pushdown_eval_time=40ns] |
| | |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
The 572 milliseconds on the SortPreservingMergeExec
seems to be the bottleneck in the query, so I would like to optimize it.
Given the assumptions I can make about the Parquet files, I think that the SortPreservingMergeExec
can be replaced by what is essentially a concatenation of each of the Parquet files.
What would be the best approach to remove the SortPreservingMergeExec
?
My ideas:
- Manually re-partition the Parquet files into a single Parquet file using this new API: https://docs.rs/parquet/latest/parquet/file/writer/struct.SerializedRowGroupWriter.html#method.append_column
- I have an idea of implementing a custom
PhysicalOptimizerRule
that looks for theSortPreservingMergeExec ParquetExec
pattern, and replaces it with a concatenation instead.
But I would like to hear if there are any better ways.
Related
- Blog post about this optimization: https://www.influxdata.com/blog/making-recent-value-queries-hundreds-times-faster/
Infrastructure Tasks 🚧
- Support
merge
forDistribution
#15290 - Support computing statistics for FileGroup #15432
- Refactor: add
FileGroup
structure forVec<PartitionedFile>
#15379 - ListingTable statistics improperly merges statistics when files have different schemas #15689
- Analysis to support
SortPreservingMerge
-->ProgressiveEval
#15191 - Fix: after repartitioning, the
PartitionedFile
andFileGroup
statistics should be inexact/recomputed #15539
Major Tasks
- Add
statistics_by_partition
API toExecutionPlan
#15495 - Optimized version of
SortPreservingMerge
that doesn't actually compare sort keys of the key ranges are ordered #10316