From abc0ab80a04f1bab95f93901f1b802aecaf72382 Mon Sep 17 00:00:00 2001 From: nichunen Date: Sun, 11 Aug 2019 18:56:57 +0800 Subject: [PATCH] KYLIN-2820 Query can't read window function's result from subquery --- .../kylin/metadata/model/TblColRef.java | 104 ++++++++++-------- .../resources/query/sql_window/query13.sql | 22 ++++ .../kylin/query/relnode/ColumnRowType.java | 14 ++- .../kylin/query/relnode/OLAPAggregateRel.java | 50 +++++---- .../kylin/query/relnode/OLAPProjectRel.java | 26 +++-- .../kylin/query/relnode/OLAPWindowRel.java | 11 ++ .../visitor/TupleExpressionVisitor.java | 7 +- 7 files changed, 153 insertions(+), 81 deletions(-) create mode 100644 kylin-it/src/test/resources/query/sql_window/query13.sql diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java index 0dc08a98ab6..960cc192f67 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java @@ -21,12 +21,13 @@ import static com.google.common.base.Preconditions.checkArgument; import java.io.Serializable; - +import java.util.List; import java.util.Locale; import java.util.function.Function; import org.apache.commons.lang.StringUtils; import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.expression.TupleExpression; /** */ @@ -34,27 +35,36 @@ public class TblColRef implements Serializable { private static final String INNER_TABLE_NAME = "_kylin_table"; + private static final DataModelDesc UNKNOWN_MODEL = new DataModelDesc(); - // used by projection rewrite, see OLAPProjectRel - public enum InnerDataTypeEnum { - - LITERAL("_literal_type"), DERIVED("_derived_type"); - - private final String dateType; + static { + UNKNOWN_MODEL.setName("UNKNOWN_MODEL"); + } - private InnerDataTypeEnum(String name) { - this.dateType = name; - } + private TableRef table; + private TableRef backupTable;// only used in fixTableRef() + private ColumnDesc column; + private String identity; + private String parserDescription; + //used in window function + private List subTupleExps; + /** + * Function used to get quoted identitier + */ + private transient Function quotedFunc; - public String getDataType() { - return dateType; - } + TblColRef(ColumnDesc column) { + this.column = column; + } - public static boolean contains(String name) { - return LITERAL.getDataType().equals(name) || DERIVED.getDataType().equals(name); - } + TblColRef(TableRef table, ColumnDesc column) { + checkArgument(table.getTableDesc().getIdentity().equals(column.getTable().getIdentity())); + this.table = table; + this.column = column; } + // ============================================================================ + // used by projection rewrite, see OLAPProjectRel public static TblColRef newInnerColumn(String columnName, InnerDataTypeEnum dataType) { return newInnerColumn(columnName, dataType, null); @@ -72,11 +82,6 @@ public static TblColRef newInnerColumn(String columnName, InnerDataTypeEnum data return colRef; } - private static final DataModelDesc UNKNOWN_MODEL = new DataModelDesc(); - static { - UNKNOWN_MODEL.setName("UNKNOWN_MODEL"); - } - public static TableRef tableForUnknownModel(String tempTableAlias, TableDesc table) { return new TableRef(UNKNOWN_MODEL, tempTableAlias, table, false); } @@ -103,7 +108,8 @@ public static TblColRef mockup(TableDesc table, int oneBasedColumnIndex, String } // for test mainly - public static TblColRef mockup(TableDesc table, int oneBasedColumnIndex, String name, String datatype, String comment) { + public static TblColRef mockup(TableDesc table, int oneBasedColumnIndex, String name, String datatype, + String comment) { ColumnDesc desc = new ColumnDesc(); String id = "" + oneBasedColumnIndex; desc.setId(id); @@ -114,33 +120,10 @@ public static TblColRef mockup(TableDesc table, int oneBasedColumnIndex, String return new TblColRef(desc); } - // ============================================================================ - - private TableRef table; - private TableRef backupTable;// only used in fixTableRef() - private ColumnDesc column; - private String identity; - private String parserDescription; - - /** - * Function used to get quoted identitier - */ - private transient Function quotedFunc; - public void setQuotedFunc(Function quotedFunc) { this.quotedFunc = quotedFunc; } - TblColRef(ColumnDesc column) { - this.column = column; - } - - TblColRef(TableRef table, ColumnDesc column) { - checkArgument(table.getTableDesc().getIdentity().equals(column.getTable().getIdentity())); - this.table = table; - this.column = column; - } - public void fixTableRef(TableRef tableRef) { this.backupTable = this.table; this.table = tableRef; @@ -199,9 +182,18 @@ public DataType getType() { return column.getType(); } - public String getBackupTableAlias(){ + public List getSubTupleExps() { + return subTupleExps; + } + + public void setSubTupleExps(List subTubleExps) { + this.subTupleExps = subTubleExps; + } + + public String getBackupTableAlias() { return backupTable.getAlias(); } + private void markInnerColumn(InnerDataTypeEnum dataType) { this.column.setDatatype(dataType.getDataType()); this.column.getTable().setName(INNER_TABLE_NAME); @@ -286,4 +278,24 @@ public String getTableWithSchema() { public String getColumWithTableAndSchema() { return (getTableWithSchema() + "." + column.getName()).toUpperCase(Locale.ROOT); } + + // used by projection rewrite, see OLAPProjectRel + public enum InnerDataTypeEnum { + + LITERAL("_literal_type"), DERIVED("_derived_type"); + + private final String dateType; + + private InnerDataTypeEnum(String name) { + this.dateType = name; + } + + public static boolean contains(String name) { + return LITERAL.getDataType().equals(name) || DERIVED.getDataType().equals(name); + } + + public String getDataType() { + return dateType; + } + } } diff --git a/kylin-it/src/test/resources/query/sql_window/query13.sql b/kylin-it/src/test/resources/query/sql_window/query13.sql new file mode 100644 index 00000000000..9807fd64494 --- /dev/null +++ b/kylin-it/src/test/resources/query/sql_window/query13.sql @@ -0,0 +1,22 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one +-- or more contributor license agreements. See the NOTICE file +-- distributed with this work for additional information +-- regarding copyright ownership. The ASF licenses this file +-- to you under the Apache License, Version 2.0 (the +-- "License"); you may not use this file except in compliance +-- with the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +select t.first_seller_id as first_seller_id_test, count(*) from ( +select first_value(seller_id) over (partition by buyer_id) as first_seller_id +from test_kylin_fact inner join test_order on test_kylin_fact.order_id=test_order.order_id +) +as t group by t.first_seller_id \ No newline at end of file diff --git a/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java b/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java index 65a00e66a25..941d341a7fd 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java @@ -18,6 +18,7 @@ package org.apache.kylin.query.relnode; +import java.util.ArrayList; import java.util.List; import org.apache.kylin.common.util.Pair; @@ -81,7 +82,7 @@ public Pair replaceColumnByIndex(int index, TblColRe return new Pair<>(oldCol, oldExpr); } - public TupleExpression getSourceColumnsByIndex(int i) { + public TupleExpression getTupleExpressionByIndex(int i) { TupleExpression result = null; if (sourceColumns != null) { result = sourceColumns.get(i); @@ -92,6 +93,17 @@ public TupleExpression getSourceColumnsByIndex(int i) { return result; } + public List getSourceColumns() { + if (sourceColumns == null) { + List sources = new ArrayList<>(); + for (int i = 0; i < columns.size(); i++) { + sources.add(getTupleExpressionByIndex(i)); + } + sourceColumns = sources; + } + return sourceColumns; + } + public List getAllColumns() { return columns; } diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java index 9def697b853..344a34e034f 100755 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java @@ -18,6 +18,8 @@ package org.apache.kylin.query.relnode; +import static org.apache.kylin.metadata.expression.TupleExpression.ExpressionOperatorEnum.COLUMN; + import java.lang.reflect.Method; import java.util.ArrayList; import java.util.HashMap; @@ -85,7 +87,6 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import static org.apache.kylin.metadata.expression.TupleExpression.ExpressionOperatorEnum.COLUMN; /** */ public class OLAPAggregateRel extends Aggregate implements OLAPRel { @@ -111,30 +112,14 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel { for (String func : udafs.keySet()) { try { AGGR_FUNC_PARAM_AS_MEASURE_MAP.put(func, - ((ParamAsMeasureCount) (udafs.get(func).getDeclaredConstructor().newInstance())).getParamAsMeasureCount()); + ((ParamAsMeasureCount) (udafs.get(func).getDeclaredConstructor().newInstance())) + .getParamAsMeasureCount()); } catch (Exception e) { throw new RuntimeException(e); } } } - static String getSqlFuncName(AggregateCall aggCall) { - String sqlName = aggCall.getAggregation().getName(); - if (aggCall.isDistinct()) { - sqlName = sqlName + "_DISTINCT"; - } - return sqlName; - } - - public static String getAggrFuncName(AggregateCall aggCall) { - String sqlName = getSqlFuncName(aggCall); - String funcName = AGGR_FUNC_MAP.get(sqlName); - if (funcName == null) { - throw new IllegalStateException("Non-support aggregation " + sqlName); - } - return funcName; - } - OLAPContext context; ColumnRowType columnRowType; private boolean afterAggregate; @@ -143,7 +128,6 @@ public static String getAggrFuncName(AggregateCall aggCall) { private List groups; private List aggregations; private boolean rewriting; - public OLAPAggregateRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, boolean indicator, ImmutableBitSet groupSet, List groupSets, List aggCalls) throws InvalidRelException { @@ -154,6 +138,23 @@ public OLAPAggregateRel(RelOptCluster cluster, RelTraitSet traits, RelNode child this.rowType = getRowType(); } + static String getSqlFuncName(AggregateCall aggCall) { + String sqlName = aggCall.getAggregation().getName(); + if (aggCall.isDistinct()) { + sqlName = sqlName + "_DISTINCT"; + } + return sqlName; + } + + public static String getAggrFuncName(AggregateCall aggCall) { + String sqlName = getSqlFuncName(aggCall); + String funcName = AGGR_FUNC_MAP.get(sqlName); + if (funcName == null) { + throw new IllegalStateException("Non-support aggregation " + sqlName); + } + return funcName; + } + @Override public Aggregate copy(RelTraitSet traitSet, RelNode input, boolean indicator, ImmutableBitSet groupSet, List groupSets, List aggCalls) { @@ -268,10 +269,11 @@ void buildGroups() { ColumnRowType inputColumnRowType = ((OLAPRel) getInput()).getColumnRowType(); this.groups = Lists.newArrayList(); for (int i = getGroupSet().nextSetBit(0); i >= 0; i = getGroupSet().nextSetBit(i + 1)) { - TupleExpression tupleExpression = inputColumnRowType.getSourceColumnsByIndex(i); + TupleExpression tupleExpression = inputColumnRowType.getTupleExpressionByIndex(i); // group by column with operator - if (this.context.groupByExpression == false && !(COLUMN.equals(tupleExpression.getOperator()) && tupleExpression.getChildren().isEmpty())) { + if (this.context.groupByExpression == false + && !(COLUMN.equals(tupleExpression.getOperator()) && tupleExpression.getChildren().isEmpty())) { this.context.groupByExpression = true; } @@ -350,7 +352,7 @@ void buildAggregations() { // Check dynamic aggregation if (this.context.isDynamicColumnEnabled() && !afterAggregate && !rewriting && argList.size() == 1) { int iRowIdx = argList.get(0); - TupleExpression tupleExpr = inputColumnRowType.getSourceColumnsByIndex(iRowIdx); + TupleExpression tupleExpr = inputColumnRowType.getTupleExpressionByIndex(iRowIdx); if (aggCall.getAggregation() instanceof SqlSumAggFunction || aggCall.getAggregation() instanceof SqlSumEmptyIsZeroAggFunction) { // sum (expression) @@ -571,7 +573,7 @@ private AggregateCall rewriteAggregateCall(AggregateCall aggCall, FunctionDesc f String callName = getSqlFuncName(aggCall); RelDataType fieldType = aggCall.getType(); SqlAggFunction newAgg = aggCall.getAggregation(); - + Map> udafMap = func.getMeasureType().getRewriteCalciteAggrFunctions(); if (func.isCount()) { newAgg = SqlStdOperatorTable.SUM0; diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java index 316337675fa..59d2c385a15 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java @@ -63,6 +63,7 @@ import org.apache.kylin.metadata.expression.ExpressionColCollector; import org.apache.kylin.metadata.expression.NoneTupleExpression; import org.apache.kylin.metadata.expression.NumberTupleExpression; +import org.apache.kylin.metadata.expression.RexCallTupleExpression; import org.apache.kylin.metadata.expression.StringTupleExpression; import org.apache.kylin.metadata.expression.TupleExpression; import org.apache.kylin.metadata.model.MeasureDesc; @@ -80,8 +81,14 @@ */ public class OLAPProjectRel extends Project implements OLAPRel { - OLAPContext context; + private final BasicSqlType dateType = new BasicSqlType(getCluster().getTypeFactory().getTypeSystem(), + SqlTypeName.DATE); + private final BasicSqlType timestampType = new BasicSqlType(getCluster().getTypeFactory().getTypeSystem(), + SqlTypeName.TIMESTAMP); + private final ArraySqlType dateArrayType = new ArraySqlType(dateType, true); + private final ArraySqlType timestampArrayType = new ArraySqlType(timestampType, true); public List rewriteProjects; + OLAPContext context; boolean rewriting; ColumnRowType columnRowType; boolean hasJoin; @@ -89,12 +96,6 @@ public class OLAPProjectRel extends Project implements OLAPRel { boolean afterAggregate; boolean isMerelyPermutation = false;//project additionally added by OLAPJoinPushThroughJoinRule private int caseCount = 0; - - private final BasicSqlType dateType = new BasicSqlType(getCluster().getTypeFactory().getTypeSystem(), SqlTypeName.DATE); - private final BasicSqlType timestampType = new BasicSqlType(getCluster().getTypeFactory().getTypeSystem(), SqlTypeName.TIMESTAMP); - private final ArraySqlType dateArrayType = new ArraySqlType(dateType, true); - private final ArraySqlType timestampArrayType = new ArraySqlType(timestampType, true); - /** * A flag indicate whether has intersect_count in query */ @@ -187,7 +188,7 @@ ColumnRowType buildColumnRowType() { String fieldName = columnField.getName(); TupleExpression tupleExpr = rex.accept(visitor); - TblColRef column = translateRexNode(tupleExpr, fieldName); + TblColRef column = translateRexNode(rex, inputColumnRowType, tupleExpr, fieldName); if (!this.rewriting && !this.afterAggregate && !isMerelyPermutation) { Set srcCols = ExpressionColCollector.collectColumns(tupleExpr); // remove cols not belonging to context tables @@ -219,7 +220,8 @@ ColumnRowType buildColumnRowType() { return new ColumnRowType(columns, sourceColumns); } - private TblColRef translateRexNode(TupleExpression tupleExpr, String fieldName) { + private TblColRef translateRexNode(RexNode rexNode, ColumnRowType inputColumnRowType, TupleExpression tupleExpr, + String fieldName) { if (tupleExpr instanceof ColumnTupleExpression) { return ((ColumnTupleExpression) tupleExpr).getColumn(); } else if (tupleExpr instanceof NumberTupleExpression) { @@ -228,6 +230,12 @@ private TblColRef translateRexNode(TupleExpression tupleExpr, String fieldName) } else if (tupleExpr instanceof StringTupleExpression) { Object value = ((StringTupleExpression) tupleExpr).getValue(); return TblColRef.newInnerColumn(value == null ? "null" : value.toString(), InnerDataTypeEnum.LITERAL); + } else if (tupleExpr instanceof RexCallTupleExpression && rexNode instanceof RexInputRef) { + RexInputRef inputRef = (RexInputRef) rexNode; + int index = inputRef.getIndex(); + if (index < inputColumnRowType.size()) { + return inputColumnRowType.getColumnByIndex(index); + } } return TblColRef.newInnerColumn(fieldName, InnerDataTypeEnum.LITERAL, tupleExpr.getDigest()); } diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPWindowRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPWindowRel.java index a4ca1b72c13..7c9721af057 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPWindowRel.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPWindowRel.java @@ -29,6 +29,7 @@ import org.apache.calcite.plan.RelOptPlanner; import org.apache.calcite.plan.RelTrait; import org.apache.calcite.plan.RelTraitSet; +import org.apache.calcite.rel.RelFieldCollation; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.RelWriter; import org.apache.calcite.rel.core.AggregateCall; @@ -36,6 +37,8 @@ import org.apache.calcite.rel.metadata.RelMetadataQuery; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rex.RexLiteral; +import org.apache.commons.compress.utils.Lists; +import org.apache.kylin.metadata.expression.TupleExpression; import org.apache.kylin.metadata.model.TblColRef; import com.google.common.base.Preconditions; @@ -93,9 +96,17 @@ ColumnRowType buildColumnRowType() { // add window aggregate calls column for (Group group : groups) { + List sourceColOuter = Lists.newArrayList(); + group.keys.asSet().stream().map(inputColumnRowType::getTupleExpressionByIndex).forEach(sourceColOuter::add); + group.orderKeys.getFieldCollations().stream().map(RelFieldCollation::getFieldIndex) + .map(inputColumnRowType::getTupleExpressionByIndex).forEach(sourceColOuter::add); for (AggregateCall aggrCall : group.getAggregateCalls(this)) { TblColRef aggrCallCol = TblColRef.newInnerColumn(aggrCall.getName(), TblColRef.InnerDataTypeEnum.LITERAL); + List sourceColInner = Lists.newArrayList(sourceColOuter.iterator()); + aggrCall.getArgList().stream().filter(i -> i < inputColumnRowType.size()) + .map(inputColumnRowType::getTupleExpressionByIndex).forEach(sourceColInner::add); + aggrCallCol.setSubTupleExps(sourceColInner); columns.add(aggrCallCol); } } diff --git a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleExpressionVisitor.java b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleExpressionVisitor.java index 03b58dc9b1f..027939743d4 100644 --- a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleExpressionVisitor.java +++ b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleExpressionVisitor.java @@ -158,7 +158,12 @@ public TupleExpression visitInputRef(RexInputRef inputRef) { // check it for rewrite count if (index < inputRowType.size()) { TblColRef column = inputRowType.getColumnByIndex(index); - TupleExpression tuple = new ColumnTupleExpression(column); + TupleExpression tuple; + if (column.getSubTupleExps() != null) { + tuple = new RexCallTupleExpression(column.getSubTupleExps()); + } else { + tuple = new ColumnTupleExpression(column); + } tuple.setDigest(inputRef.toString()); return tuple; } else {