Skip to content

Commit

Permalink
KYLIN-4905 Support limit .. offset ... in spark query engine
Browse files Browse the repository at this point in the history
  • Loading branch information
zhengshengjun authored and hit-lacus committed Mar 1, 2021
1 parent 88ad6ec commit 01036af
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 2 deletions.
24 changes: 24 additions & 0 deletions kylin-it/src/test/resources/query/sql_limit/query00.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

select slr_segment_cd, sum(price)
from test_kylin_fact
group by slr_segment_cd
order by sum(price) desc
limit 4 offset 2
;{"scanRowCount":300,"scanBytes":0,"scanFiles":1,"cuboidId":14336}
Binary file not shown.
Binary file not shown.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
13,597780.2600
11,592133.2300
12,570202.3700
14,570158.2000
5,567907.4600
16,556006.0200
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ object LimitPlan {
val offset = BigDecimal(rel.localOffset.accept(visitor).toString).toInt
inputs
.get(0)
//TODO KYLIN-4905 currently spark doesn't support limit...offset, support this in kylin server side
.limit(offset + limit)
//.limitRange(offset, offset + limit)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ private List<QueryCallable> prepareAndGenQueryTasks() throws Exception {
tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_unionall"));
tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_values"));
tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_window"));
tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_limit"));
}
logger.info("Total {} tasks.", tasks.size());
return tasks;
Expand Down
18 changes: 16 additions & 2 deletions query/src/main/java/org/apache/kylin/query/exec/SparkExec.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import org.apache.calcite.linq4j.Enumerable;
import org.apache.calcite.linq4j.Linq4j;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rex.RexLiteral;
import org.apache.kylin.common.QueryContextFacade;
import org.apache.kylin.common.debug.BackdoorToggles;
import org.apache.kylin.query.relnode.OLAPLimitRel;
import org.apache.kylin.query.relnode.OLAPRel;

public class SparkExec {
Expand All @@ -37,7 +39,13 @@ public static Enumerable<Object[]> collectToEnumerable(DataContext dataContext)
RelDataType rowType = (RelDataType) QueryContextFacade.current().getResultType();
try {
Enumerable<Object[]> computer = QueryEngineFactory.compute(dataContext, olapRel, rowType);
return computer;
//TODO KYLIN-4905 currently spark doesn't support limit...offset.., support this in kylin server side
if (olapRel instanceof OLAPLimitRel && ((OLAPLimitRel) olapRel).localOffset != null) {
RexLiteral literal = (RexLiteral) ((OLAPLimitRel) olapRel).localOffset;
return computer.skip(Integer.valueOf(literal.getValue().toString()));
} else {
return computer;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -52,7 +60,13 @@ public static Enumerable<Object> collectToScalarEnumerable(DataContext dataConte
RelDataType rowType = (RelDataType) QueryContextFacade.current().getResultType();
try {
Enumerable<Object> objects = QueryEngineFactory.computeSCALA(dataContext, olapRel, rowType);
return objects;
//TODO KYLIN-4905 currently spark doesn't support limit...offset.., support this in kylin server side
if (olapRel instanceof OLAPLimitRel && ((OLAPLimitRel) olapRel).localOffset != null) {
RexLiteral literal = (RexLiteral) ((OLAPLimitRel) olapRel).localOffset;
return objects.skip(Integer.valueOf(literal.getValue().toString()));
} else {
return objects;
}

} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.calcite.rel.rules.SemiJoinRule;
import org.apache.calcite.rel.rules.SortJoinTransposeRule;
import org.apache.calcite.rel.rules.SortUnionTransposeRule;
import org.apache.calcite.rel.rules.SortProjectTransposeRule;
import org.apache.calcite.rel.type.RelDataType;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rel.type.RelDataTypeField;
Expand Down Expand Up @@ -211,6 +212,30 @@ public void register(RelOptPlanner planner) {

// see Dec 26th email @ http://mail-archives.apache.org/mod_mbox/calcite-dev/201412.mbox/browser
planner.removeRule(ExpandConversionRule.INSTANCE);

/*** TODO KYLIN-4905
* Spark doesn't support limit...offset.., we implement this in KYLIN query server.
* The key is to keep OLAPLimitRel always the root RelNode, then take result indexed from (offset) to (offset + limit).
* But SortProjectTransposeRule will break the key, which transpose sort and project.
* eg: select sum(price), seller_id from kylin_sales group by seller_id order by sum(price) limit 10 offset 3
1. Calcite optimized plan with SortProjectTransposeRule enabled:
OLAPProjectRel
|_OLAPLimitRel (offset=3,fetch=10)
|_OLAPSortRel
|_OLAPAggregateRel
|_OLAPProjectRel
|_OLAPTableScan
2. Calcite optimized plan with SortProjectTransposeRule removed:
OLAPLimitRel (offset=3,fetch=10)
|_OLAPSortRel
|_ OLAPAggregateRel
|_ OLAPProjectRel
|_OLAPTableScan
* ***/
planner.removeRule(SortProjectTransposeRule.INSTANCE);
}

protected void addRules(final RelOptPlanner planner, List<String> rules) {
Expand Down

0 comments on commit 01036af

Please sign in to comment.