Closed
Description
Background
For the following sql
select * from (
select *, rank() over (partition by v2 order by v3) as rk from t0
) sub_t0
where rk < 5;
The corresponding logical plan is as follows:
PREDICATE ( rank() < 5 )
↑
WINDOW_FUNCTION ( partition by v2 order by v3 )
↑
SORT ( order by v2, v3 )
↑
EXCHANGE ( shuffle by v2 )
↑
SCAN
There are two main flaws of this plan:
EXCHANGE
Node needs to shuffle full dataSORT
Node perform a machine level full sort
Design
Notice that there exists the predicate rk < 5
, we can add a PARTITION_TOPN
node before exchange node in order to filter data, the expected optimized plan is as follows:
PREDICATE ( rank() < 5 )
↑
WINDOW_FUNCTION ( partition by v2 order by v3 )
↑
SORT ( order by v2, v3 )
↑
EXCHANGE ( shuffle by v2 )
↑
PARTITION_TOPN ( partition by v2, sort by v3, limit by rank )
↑
SCAN
For rank window function, including rank
、dense_rank
、row_number
, if it has a related predicate (rk < 5
), then it can be optimized by inserting a PartitionTopN
operator before the Sort
operator of window function.
The main purpose of PartitionTopN
is to filter data, and it's output still remain unordered. It consists of three components:
partitioner
:Divide the input chunk based on the partition exprssorter(topn)
:Each partition has an instance of sorter and is sorted independentlygather
:fetch chunks from all sorters into one data stream, so the data is still unordered after gahtering. Moreover, gather is a only logical concetp, not an actual component
┌────► topn─────┐
│ │
(unordered) │ │ (unordered)
inputChunks ───────► partitioner ─┼────► topn ────┼─► gather ─────► outputChunks
│ │
│ │
└────► topn ────┘
The implementation on the optimizer and the executor is a little different:
- In optimizer, for simplicity, we do not define a new pair of
{Logical/Physical}PartitionTopNOperator
but reuse the existing{Logical/Physical}TopNOperator
by adding a new fieldpartitionByExprs
to record the partition by information. Besides, we need to pay attentation to the following things:- Make sure that we cannot derive sort property from PartitionTopN
- Make sure that ExchangeNode not set limit if PartitionTopN
- In executor, we define a new pair of
LocalPartitionTopN{Sink/Source}Operator
, and we may use different implementation based on the fieldpartitionByExprs
- if
partitionByExprs
is unset or empty, then the original pair ofPartitionSortSinkOperator/LocalMergeSortSourceOperator
is used - if
partitionByExprs
is not empty, then pair ofLocalPartitionTopN{Sink/Source}Operator
is used
- if
Tasks
- Support component partitioner, [Enhancement] rank window function optimization, add partitioner (1) #5886
- only support one partition expr right now
- Support PartitionTopN, [Enhancement] rank window function optimization, add partition topn (2) #6118
- Support
row_number
, [Enhancement] rank window function optimization, support row_number (3) #6119 - Partitioner support degradation when number of partitions reach a threshold, [Enhancement] rank window function optimization, add threshold of partition number (4) #6411
- Support
rank
, [Enhancement] window function optimization, support rank (5) #6120- by supporting TopN limit by rank
- Adjust downgrade strategy of chunks_partitioner, [Enhancement] Adjust downgrade strategy of chunks_partitioner #8696
- Support
dense_rank
- by supporting TopN limit by dense_rank
- Support multi partition exprs
- Optimize PartitionTopN by partitioner supporting streaming style
- Currently, data will be sent to
topn_sorter
until all the data have been partitioned, which is saved in hash table. But it is unnecessary becausetopn_sorter
can help filter data so that we don't need to hold all the data
- Currently, data will be sent to
- Merge normal version and nullable version of PartitionHashMap, such as
PartitionHashMapWithOneNumberKey
andPartitionHashMapWithOneNullableNumberKey