Skip to content

Commit

Permalink
[BugFix] Fix wrong plan on limit with subquery TOP-N (backport #54507) (
Browse files Browse the repository at this point in the history
#54567)

Co-authored-by: stdpain <34912776+stdpain@users.noreply.github.com>
  • Loading branch information
mergify[bot] and stdpain authored Jan 3, 2025
1 parent f216698 commit 89786f4
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ public boolean isMerge() {
return mergeInfo != null;
}

public long getOffset() {
return offset;
}
public void setOffset(long offset) {
this.offset = offset;
}

public void setReceiveColumns(List<Integer> receiveColumns) {
this.receiveColumns = receiveColumns;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3252,10 +3252,13 @@ public PlanFragment visitPhysicalLimit(OptExpression optExpression, ExecPlan con
// now sr only support offset on merge-exchange node
// 1. if child is exchange node, meanings child was enforced gather property
// a. only limit and no offset, only need set limit on child
// b. has offset, should trans Exchange to Merge-Exchange node
// b. has offset
// b.1 not merge exchange should trans Exchange to Merge-Exchange node
// b.2 is merge exchange update offset
// 2. if child isn't exchange node, meanings child satisfy gather property
// a. only limit and no offset, no need add exchange node, only need set limit on child
// b. has offset, need add exchange node, sr doesn't support a special node to handle offset

if (limit.hasOffset()) {
if (!(child.getPlanRoot() instanceof ExchangeNode)) {
// use merge-exchange
Expand All @@ -3275,11 +3278,17 @@ public PlanFragment visitPhysicalLimit(OptExpression optExpression, ExecPlan con
context.getFragments().add(fragment);
child = fragment;
}

ExchangeNode exchangeNode = (ExchangeNode) child.getPlanRoot();
SortInfo sortInfo = new SortInfo(Lists.newArrayList(), Operator.DEFAULT_LIMIT,
Lists.newArrayList(new IntLiteral(1)), Lists.newArrayList(true), Lists.newArrayList(false));
exchangeNode.setMergeInfo(sortInfo, limit.getOffset());
if (!exchangeNode.isMerge()) {
SortInfo sortInfo = new SortInfo(Lists.newArrayList(), Operator.DEFAULT_LIMIT,
Lists.newArrayList(new IntLiteral(1)), Lists.newArrayList(true), Lists.newArrayList(false));
exchangeNode.setMergeInfo(sortInfo, limit.getOffset());
} else if (exchangeNode.getOffset() <= 0) {
exchangeNode.setOffset(limit.getOffset());
} else if (exchangeNode.getOffset() > 0) {
exchangeNode.setOffset(limit.getOffset() + exchangeNode.getOffset());
}

exchangeNode.computeStatistics(optExpression.getStatistics());
}

Expand Down
22 changes: 22 additions & 0 deletions fe/fe-core/src/test/java/com/starrocks/sql/plan/LimitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public void testIntersectLimit() throws Exception {
+ " 1:OlapScanNode"));
}


@Test
public void testCountStarWithLimitForOneAggStage() throws Exception {
connectContext.getSessionVariable().setNewPlanerAggStage(2);
Expand Down Expand Up @@ -633,6 +634,27 @@ public void testLimitPushDownJoin() throws Exception {
" avgRowSize=3.0\n"));
}

@Test
public void testOffsetWithSubTopN() throws Exception {
String sql;
String plan;
sql = "select v1 from (\n" +
" select * from (select v1, v2 from t0 order by v1 asc limit 1000, 600) l\n" +
" left join (select null as cx, '1' as c1) r\n" +
" on l.v1 =r.cx\n" +
") b limit 600;";
plan = getThriftPlan(sql);
assertContains(plan, "TExchangeNode(input_row_tuples:[1], sort_info:" +
"TSortInfo(ordering_exprs:[TExpr(nodes:[TExprNode(node_type:SLOT_REF");

sql = "select * from (select v1, v2 from t0 order by v1 asc limit 1000, 600) l limit 200, 600";
plan = getFragmentPlan(sql);
assertContains(plan, "\n" +
" 2:MERGING-EXCHANGE\n" +
" offset: 1200\n" +
" limit: 400");
}

@Test
public void testUnionLimit() throws Exception {
String queryStr = "select 1 from (select 4, 3 from t0 union all select 2, 3 ) as a limit 3";
Expand Down
29 changes: 29 additions & 0 deletions test/sql/test_sort/R/test_offset_with_sub_query_topn
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
-- name: test_offset_with_sub_query_topn
CREATE TABLE `tx3` (
`c0` int(11) NULL COMMENT "",
`c1` varchar(20) NULL COMMENT "",
`c2` varchar(200) NULL COMMENT "",
`c3` int(11) NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`c0`, `c1`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`c0`, `c1`) BUCKETS 2
PROPERTIES (
"replication_num" = "1",
"compression" = "LZ4"
);
-- result:
-- !result
insert into tx3 SELECT generate_series, generate_series, generate_series, generate_series FROM TABLE(generate_series(1, 1400));
-- result:
-- !result
select sum(c0) from (
select c0 from (
select * from (select c0, c1 from tx3 order by c0 asc limit 1000, 600) l
left join (select null as cx, '1' as c1) r
on l.c0 =r.cx
) b limit 600
) x ;
-- result:
480200
-- !result
28 changes: 28 additions & 0 deletions test/sql/test_sort/T/test_offset_with_sub_query_topn
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- name: test_offset_with_sub_query_topn

CREATE TABLE `tx3` (
`c0` int(11) NULL COMMENT "",
`c1` varchar(20) NULL COMMENT "",
`c2` varchar(200) NULL COMMENT "",
`c3` int(11) NULL COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`c0`, `c1`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`c0`, `c1`) BUCKETS 2
PROPERTIES (
"replication_num" = "1",
"compression" = "LZ4"
);

insert into tx3 SELECT generate_series, generate_series, generate_series, generate_series FROM TABLE(generate_series(1, 1400));

select sum(c0) from (
select c0 from (
select * from (select c0, c1 from tx3 order by c0 asc limit 1000, 600) l
left join (select null as cx, '1' as c1) r
on l.c0 =r.cx
) b limit 600
) x ;



0 comments on commit 89786f4

Please sign in to comment.