Skip to content

[EPIC] Avoid sort for already sorted Parquet files that do not overlap values on condition #6672

Open
@simonvandel

Description

@simonvandel

Describe the bug

I'm testing performance of querying a number of Parquet files, where I can make some assumptions about the Parquet files.

  • Each Parquet file is already sorted on the column "timestamp".
  • Each Parquet file does not overlap values on the column "timestamp". For instance, file A has values for timestamps for 2022, and file B has values for timestamps 2023.

The schema of the files are:

  • "timestamp": TimestampMillisecond
  • "value": Float64

Consider the following query and it's query plan:

SELECT timestamp, value 
FROM samples 
ORDER BY timestamp ASC
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type         | plan                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                  |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Plan with Metrics | SortPreservingMergeExec: [timestamp@0 ASC], metrics=[output_rows=1000000, elapsed_compute=572.526968ms]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |
|                   |   ParquetExec: file_groups={20 groups: [[0.parquet], [1.parquet], [2.parquet], [3.parquet], [4.parquet], ...]}, projection=[timestamp, value], output_ordering=[timestamp@0 ASC], metrics=[output_rows=1000000, elapsed_compute=20ns, num_predicate_creation_errors=0, predicate_evaluation_errors=0, bytes_scanned=57972, page_index_rows_filtered=0, row_groups_pruned=0, pushdown_rows_filtered=0, time_elapsed_processing=51.918935ms, page_index_eval_time=40ns, time_elapsed_scanning_total=48.94925ms, time_elapsed_opening=2.996325ms, time_elapsed_scanning_until_data=48.311008ms, pushdown_eval_time=40ns] |
|                   |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                       |
+-------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

The 572 milliseconds on the SortPreservingMergeExec seems to be the bottleneck in the query, so I would like to optimize it.

Given the assumptions I can make about the Parquet files, I think that the SortPreservingMergeExec can be replaced by what is essentially a concatenation of each of the Parquet files.

What would be the best approach to remove the SortPreservingMergeExec?
My ideas:

But I would like to hear if there are any better ways.

Related

Infrastructure Tasks 🚧

Major Tasks

Related (though not necessarily required)

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requestperformanceMake DataFusion fasterquestionFurther information is requested

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions