Skip to content

[Enhancement] Rank window function optimization #5885

Closed
@liuyehcf

Description

Background

For the following sql

select * from (
    select *, rank() over (partition by v2 order by v3) as rk from t0
) sub_t0
where rk < 5;

The corresponding logical plan is as follows:

PREDICATE ( rank() < 5 )
            ↑
WINDOW_FUNCTION ( partition by v2 order by v3 )
            ↑
SORT ( order by v2, v3 )
            ↑
EXCHANGE ( shuffle by v2 )
            ↑
SCAN

There are two main flaws of this plan:

  1. EXCHANGE Node needs to shuffle full data
  2. SORT Node perform a machine level full sort

Design

Notice that there exists the predicate rk < 5, we can add a PARTITION_TOPN node before exchange node in order to filter data, the expected optimized plan is as follows:

PREDICATE ( rank() < 5 )
            ↑
WINDOW_FUNCTION ( partition by v2 order by v3 )
            ↑
SORT ( order by v2, v3 )
            ↑
EXCHANGE ( shuffle by v2 )
            ↑
PARTITION_TOPN ( partition by v2, sort by v3, limit by rank )
            ↑
SCAN

For rank window function, including rankdense_rankrow_number, if it has a related predicate (rk < 5), then it can be optimized by inserting a PartitionTopN operator before the Sort operator of window function.

The main purpose of PartitionTopN is to filter data, and it's output still remain unordered. It consists of three components:

  • partitioner:Divide the input chunk based on the partition exprs
  • sorter(topn):Each partition has an instance of sorter and is sorted independently
  • gather:fetch chunks from all sorters into one data stream, so the data is still unordered after gahtering. Moreover, gather is a only logical concetp, not an actual component


                                   ┌────► topn─────┐
                                   │               │
 (unordered)                       │               │                  (unordered)
 inputChunks ───────► partitioner ─┼────► topn ────┼─► gather ─────► outputChunks
                                   │               │
                                   │               │
                                   └────► topn ────┘

The implementation on the optimizer and the executor is a little different:

  • In optimizer, for simplicity, we do not define a new pair of {Logical/Physical}PartitionTopNOperator but reuse the existing {Logical/Physical}TopNOperator by adding a new field partitionByExprs to record the partition by information. Besides, we need to pay attentation to the following things:
    • Make sure that we cannot derive sort property from PartitionTopN
    • Make sure that ExchangeNode not set limit if PartitionTopN
  • In executor, we define a new pair of LocalPartitionTopN{Sink/Source}Operator, and we may use different implementation based on the field partitionByExprs
    • if partitionByExprs is unset or empty, then the original pair of PartitionSortSinkOperator/LocalMergeSortSourceOperator is used
    • if partitionByExprs is not empty, then pair of LocalPartitionTopN{Sink/Source}Operator is used

Tasks

Metadata

Assignees

No one assigned

    Labels

    type/enhancementMake an enhancement to StarRocks

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions