Skip to content

Add a sort rule to remove unnecessary SortExecs from physical plan #4686

@mustafasrepo

Description

@mustafasrepo

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
(This section helps Arrow developers understand the context and why for this feature, in addition to the what)
When we write complex queries final physical plan may include SortExecs that are unnecessary. Consider the query below

SELECT count(*) as global_count FROM
                 (SELECT count(*), c1
                  FROM aggregate_test_100
                  WHERE c13 != 'C2GT5KVyOPZpgKVl110TyZO0NcJ434'
                  GROUP BY c1
                  ORDER BY c1 ) AS a

Its physical plan is as follows

    "ProjectionExec: expr=[COUNT(UInt8(1))@0 as global_count]",
    "  AggregateExec: mode=Final, gby=[], aggr=[COUNT(UInt8(1))]",
    "    CoalescePartitionsExec",
    "      AggregateExec: mode=Partial, gby=[], aggr=[COUNT(UInt8(1))]",
    "        RepartitionExec: partitioning=RoundRobinBatch(2)",
    "          SortExec: [c1@0 ASC NULLS LAST]",
    "            CoalescePartitionsExec",
    "              AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]",
    "                CoalesceBatchesExec: target_batch_size=4096",
    "                  RepartitionExec: partitioning=Hash([Column { name: \"c1\", index: 0 }], 2)",
    "                    AggregateExec: mode=Partial, gby=[c1@0 as c1], aggr=[COUNT(UInt8(1))]",
    "                      CoalesceBatchesExec: target_batch_size=4096",
    "                        FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434",
    "                          RepartitionExec: partitioning=RoundRobinBatch(2)",

SortExec in the physical plan is unnecessary because following executor (RepartitionExec) doesn't require input ordering and doesn't maintain input ordering either. Hence even if we sort before this executor, the executor will overwrite sorting and ordering will be lost. Add support for removing SortExec in these scenarios.

Describe the solution you'd like
A clear and concise description of what you want to happen.
We can traverse physical plan bottom up and if ordering information introduced by SortExec is overwritten by another executor without using ordering information itself. We can remove corresponding SortExec from the plan. Additionally, some executors can produce their result with reversed order also. Consider the query below

SELECT count(*) OVER(ORDER BY c9 ROWS BETWEEN 3 PRECEDING AND 5 FOLLOWING) as cnt1,
                     count(*) OVER(ORDER BY c9 DESC ROWS BETWEEN 3 PRECEDING AND 5 FOLLOWING) as cnt2
                    FROM aggregate_test_100

whose physical plan is

    "ProjectionExec: expr=[COUNT(UInt8(1)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 3 PRECEDING AND 5 FOLLOWING@0 as cnt1, COUNT(UInt8(1)) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 5 FOLLOWING@1 as cnt2]",
    "  WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(3)), end_bound: Following(UInt64(5)) }]",
    "    SortExec: [c9@1 ASC NULLS LAST]",
    "      WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(3)), end_bound: Following(UInt64(5)) }]",
    "        SortExec: [c9@0 DESC]",

Its physical plan include second Sorting to reverse ordering. However, count aggregator can produce its result with reverse order also. By exploiting this property we can turn above physical plan into the below

    "ProjectionExec: expr=[COUNT(UInt8(1)) ORDER BY [aggregate_test_100.c9 ASC NULLS LAST] ROWS BETWEEN 3 PRECEDING AND 5 FOLLOWING@0 as cnt1, COUNT(UInt8(1)) ORDER BY [aggregate_test_100.c9 DESC NULLS FIRST] ROWS BETWEEN 3 PRECEDING AND 5 FOLLOWING@1 as cnt2]",
    "  WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(3)), end_bound: Following(UInt64(5)) }]",
    "    WindowAggExec: wdw=[COUNT(UInt8(1)): Ok(Field { name: \"COUNT(UInt8(1))\", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(3)), end_bound: Following(UInt64(5)) }]",
    "      SortExec: [c9@0 DESC]",

Describe alternatives you've considered
A clear and concise description of any alternative solutions or features you've considered.

Additional context
Add any other context or screenshots about the feature request here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions