[multistage] Add support for the ranking ROW_NUMBER() window function#10587
Conversation
| @@ -1107,6 +1323,22 @@ | |||
| "\n" | |||
| ] | |||
There was a problem hiding this comment.
Can we also cover the subquery style in the plans ?
Something like
SELECT * FROM (
SELECT
ROW_NUMBER() OVER (.....) rn,
c1,
c2,
c3
FROM
foo
) temp
WHERE
rn > 10 AND rn <=20;
There was a problem hiding this comment.
For the sake of completeness in the planning tests, can we also add WITH / CTE if possible ?
WITH cte AS(
SELECT
ROW_NUMBER() OVER (.......) rn,
c1,
c2,
c3
FROM
foo
)
SELECT
*
FROM
cte
WHERE
rn > 10 AND
rn <= 20;
There was a problem hiding this comment.
@siddharthteotia I actually have a couple of plans of these types at the end of this file. Are these sufficient?:
{
"description": "Window function CTE: row_number WITH statement having OVER with PARTITION BY ORDER BY",
"sql": "EXPLAIN PLAN FOR WITH windowfunc AS (SELECT a.col1, ROW_NUMBER() OVER(PARTITION BY a.col2 ORDER BY a.col3) as rownum from a) SELECT a.col1, a.rownum FROM windowfunc AS a where a.rownum < 5",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], $1=[$3])",
"\n LogicalFilter(condition=[<($3, 5)])",
"\n LogicalWindow(window#0=[window(partition {1} order by [2] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
"\n PinotLogicalSortExchange(distribution=[hash[1]], collation=[[2]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col1=[$0], col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
]
},
{
"description": "Window function subquery: row_number having OVER with PARTITION BY ORDER BY",
"sql": "EXPLAIN PLAN FOR SELECT row_number, col2, col3 FROM (SELECT ROW_NUMBER() OVER(PARTITION BY a.col2 ORDER BY a.col3 DESC) as row_number, a.col2, a.col3 FROM a) WHERE row_number <= 10",
"output": [
"Execution Plan",
"\nLogicalProject(row_number=[$2], col2=[$0], col3=[$1])",
"\n LogicalFilter(condition=[<=($2, 10)])",
"\n LogicalWindow(window#0=[window(partition {0} order by [1 DESC] rows between UNBOUNDED PRECEDING and CURRENT ROW aggs [ROW_NUMBER()])])",
"\n PinotLogicalSortExchange(distribution=[hash[0]], collation=[[1 DESC]], isSortOnSender=[false], isSortOnReceiver=[true])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalTableScan(table=[[a]])",
"\n"
]
}
There was a problem hiding this comment.
Looks good. Thanks for clarifying
Can we add an example of a JOIN query with ROW_NUMBER where the subquery does a JOIN and we pick the particular ROW_NUMBER for each partition in the outer query ?
SELECT
<columnList>
FROM
(
SELECT
<columnList>,
ROW_NUMBER OVER(PARTITION BY .. ORDER BY ... DESC) AS rn
FROM
T1 INNER JOIN T2
ON T1.c1 = T2.c2
WHERE .......
)
WHERE rn = 1
So we pick the highest guy (since each partition is ordered descending) in the outer query from each partition.
| { | ||
| "description": "single OVER(ORDER BY) row_number and select col with select alias", | ||
| "sql": "EXPLAIN PLAN FOR SELECT a.col1 AS value1, ROW_NUMBER() OVER(ORDER BY a.col2) AS row_number FROM a", | ||
| "notes": "TODO: Look into why aliases are getting ignored in the final plan", |
There was a problem hiding this comment.
can we create an issue for this otherwise TODO might just get buried in the code ?
| { | ||
| "description": "single OVER(ORDER BY) row_number and select col", | ||
| "sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER(ORDER BY a.col2) FROM a", | ||
| "output": [ |
There was a problem hiding this comment.
It's hard to review tests with constant scroll up and down to see if the test variant that comes to my mind has been added or not
So please feel free to ignore / resolve if already added the following. I am just trying to see we have proper coverage
SELECT a.col1, ROW_NUMBER() OVER(ORDER BY a.col2)
FROM a ORDER BY a.col1
SELECT a.col1, ROW_NUMBER() OVER(ORDER BY a.col2)
FROM a ORDER BY a.col2
SELECT a.col1, ROW_NUMBER() AS rn OVER(ORDER BY a.col2)
FROM a ORDER BY rn DESC
| "Execution Plan", | ||
| "\nLogicalSort(sort0=[$0], dir0=[ASC], offset=[0])", | ||
| "\n PinotLogicalSortExchange(distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])", | ||
| "\n LogicalSort(sort0=[$0], dir0=[ASC])", |
There was a problem hiding this comment.
I probably have this question because I missed the generic sort discussion while I was out.
Can you please clarify why we have LogicalSort before LogicalSortExchange when the goal is to do exchange without sorting on the sender side and instead do on the receiver side and for that LogicalSort above LogicalSortExchange should take care of that ideally ?
There was a problem hiding this comment.
This pattern of LogicalSort -> PinotLogicalSortExchange -> LogicalSort is a pattern followed for global ORDER BY type queries for the ordering part or to apply LIMITs (e.g. select foo from table a order by bar). It still makes sense to do ordering on the sender side to apply limits (if at the leaf stage). Also note that the data may be distributed below the PinotLogicalSortExchange, and that's why the sorting is needed after the exchange as well.
Our eventual goal is to detect that the incoming data is already sorted in the LogicalSortExchange and then perform a merge sort instead of the full priority queue based sorting. This will greatly speed up ordering throughout the nodes and even allow sending partial results rather than waiting for all the rows at each ordering step.
Let's discuss this in more detail and hopefully that can help clarify.
There are optimization opportunities in the plans btw. Today we don't check that if the window has already sorted the data, and if the ordering required by order by is the same column + collation we can skip the sort above window etc.
There was a problem hiding this comment.
Sounds good. Let's discuss this separately
| }, | ||
| { | ||
| "description": "single OVER(PARTITION BY) row_number with select col and group by", | ||
| "sql": "EXPLAIN PLAN FOR SELECT a.col1, ROW_NUMBER() OVER(PARTITION BY a.col1) FROM a GROUP BY a.col1, a.col3", |
There was a problem hiding this comment.
May be I missed it but I don't see any tests for OVER(PARTITION BY .. ORDER BY ..) variant ? Is that not supported in this PR ?
There was a problem hiding this comment.
nvm... I just saw them. Ignore
| System.arraycopy(existingRow, 0, row, 0, existingRow.length); | ||
| for (int i = 0; i < _windowAccumulators.length; i++) { | ||
| row[i + existingRow.length] = _windowAccumulators[i].getResultForKeys(partitionKey, orderKey); | ||
| if (_windowFrame.getWindowFrameType() == WindowNode.WindowFrameType.RANGE) { |
There was a problem hiding this comment.
(nit) can you add a minor comment indicating that we will enter this block for these .... window functions
| * | ||
| * The window functions supported today are SUM/COUNT/MIN/MAX/AVG/BOOL_OR/BOOL_AND aggregations. Window functions also | ||
| * include other types of functions such as rank and value functions. | ||
| * The window functions supported today are: |
There was a problem hiding this comment.
Suggest adding a minor note on RANGE vs ROW for future readers and which window functions fall into which category and the corresponding TODOs.
|
@siddharthteotia thanks for the review! I'll address these comments as a separate PR (don't want to mix this into the other PR I have open to fix the empty LogicalProject issue as it'll be confusing). |
|
@siddharthteotia opened a PR to address comments: #10684 |
The previous PR #10527 for this was accidentally merged and had to be reverted. This PR is the same
This PR adds support for the ranking ROW_NUMBER() window function in Apache Pinot. ROW_NUMBER() requires ROW type window function support rather than RANGE type for which we added support in Phase 1. This PR sets up a potential framework to use ROW type window functions but only implements this for ROW_NUMBER(). ROW_NUMBER() can be used in the following types of queries:
There are some limitations with ROW_NUMBER() which are:
The design document and issue for window functions support can be found below:
Prior Phase 1 PRs related to window functions:
cc @siddharthteotia @walterddr @vvivekiyer @ankitsultana