Skip to content

Commit

Permalink
KYLIN-4791 Fix exception 'UnsupportedOperationException: empty.reduce…
Browse files Browse the repository at this point in the history
…Left' when there are cast expressions in the filters of FilePruner

When execute function 'pruneSegments' of FilePruner, if there are some cast expressions in filter, it will throw exception 'UnsupportedOperationException: empty.reduceLeft'.

Solution:
Convert cast expressions in filter to attribute before translating filter.
  • Loading branch information
zzcclp authored and hit-lacus committed Oct 20, 2020
1 parent 3683c24 commit d4cd28d
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 29 deletions.
22 changes: 22 additions & 0 deletions kylin-it/src/test/resources/query/sql_castprunesegs/query01.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
SELECT sum(price) as sum_price
FROM TEST_KYLIN_FACT
WHERE CAL_DT > cast(TIMESTAMPADD(Day, -15000, CURRENT_DATE) as DATE)
GROUP BY CAL_DT
;{"scanRowCount":0,"scanBytes":0,"scanFiles":0,"cuboidId":262144}
22 changes: 22 additions & 0 deletions kylin-it/src/test/resources/query/sql_castprunesegs/query02.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
SELECT sum(price) as sum_price
FROM TEST_KYLIN_FACT
WHERE CAL_DT > '2013-06-01'
GROUP BY CAL_DT
;{"scanRowCount":0,"scanBytes":0,"scanFiles":0,"cuboidId":262144}
31 changes: 31 additions & 0 deletions kylin-it/src/test/resources/query/sql_castprunesegs/query03.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--

select test_cal_dt.cal_dt, sum(test_kylin_fact.price) as GMV
, count(*) as TRANS_CNT
from test_kylin_fact
inner JOIN edw.test_cal_dt as test_cal_dt
ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
inner JOIN test_category_groupings
ON test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id AND test_kylin_fact.lstg_site_id = test_category_groupings.site_id
inner JOIN edw.test_sites as test_sites
ON test_kylin_fact.lstg_site_id = test_sites.site_id
where
extract(DAY from test_cal_dt.cal_dt) = 12
group by test_cal_dt.cal_dt
;{"scanRowCount":0,"scanBytes":0,"scanFiles":0,"cuboidId":262144}
Original file line number Diff line number Diff line change
Expand Up @@ -260,47 +260,54 @@ class FilePruner(
val filteredStatuses = if (filters.isEmpty) {
segDirs
} else {
val reducedFilter = filters.flatMap(DataSourceStrategy.translateFilter).reduceLeft(And)
segDirs.filter {
e => {
val tsRange = cubeInstance.getSegment(e.segmentName, SegmentStatusEnum.READY).getTSRange
SegFilters(tsRange.startValue, tsRange.endValue, pattern).foldFilter(reducedFilter) match {
case Trivial(true) => true
case Trivial(false) => false
val translatedFilter = filters.map(filter => convertCastFilter(filter))
.flatMap(DataSourceStrategy.translateFilter)
if (translatedFilter.isEmpty) {
logInfo("Can not use filters to prune segments.")
segDirs
} else {
val reducedFilter = translatedFilter.reduceLeft(And)
val pruned = segDirs.filter {
e => {
val tsRange = cubeInstance.getSegment(e.segmentName, SegmentStatusEnum.READY).getTSRange
SegFilters(tsRange.startValue, tsRange.endValue, pattern).foldFilter(reducedFilter) match {
case Trivial(true) => true
case Trivial(false) => false
}
}
}
logInfo(s"Selected files after segments pruning:" + pruned.map(_.segmentName))
pruned
}
}
logInfo(s"Selected files after segments pruning:" + filteredStatuses.map(_.segmentName))
filteredStatuses
}

private def pruneShards(
filters: Seq[Expression],
segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = {
val filteredStatuses = if (layoutEntity.getShardByColumns.size() != 1) {
segDirs
} else {
val normalizedFiltersAndExpr = filters.reduce(expressions.And)
private def pruneShards(filters: Seq[Expression],
segDirs: Seq[SegmentDirectory]): Seq[SegmentDirectory] = {
val filteredStatuses = if (layoutEntity.getShardByColumns.size() != 1) {
segDirs
} else {
val normalizedFiltersAndExpr = filters.reduce(expressions.And)

val pruned = segDirs.map { case SegmentDirectory(segName, segIdentifier, files) =>
val segment = cubeInstance.getSegment(segName, SegmentStatusEnum.READY);
val partitionNumber = segment.getCuboidShardNum(layoutEntity.getId).toInt
require(partitionNumber > 0, "Shards num with shard by col should greater than 0.")
val pruned = segDirs.map { case SegmentDirectory(segName, segIdentifier, files) =>
val segment = cubeInstance.getSegment(segName, SegmentStatusEnum.READY);
val partitionNumber = segment.getCuboidShardNum(layoutEntity.getId).toInt
require(partitionNumber > 0, "Shards num with shard by col should greater than 0.")

val bitSet = getExpressionShards(normalizedFiltersAndExpr, shardByColumn.name, partitionNumber)
val bitSet = getExpressionShards(normalizedFiltersAndExpr, shardByColumn.name, partitionNumber)

val selected = files.filter(f => {
val partitionId = FilePruner.getPartitionId(f.getPath)
bitSet.get(partitionId)
})
SegmentDirectory(segName, segIdentifier, selected)
}
logInfo(s"Selected files after shards pruning:" + pruned.flatMap(_.files).map(_.getPath.toString).mkString(";"))
pruned
val selected = files.filter(f => {
val partitionId = FilePruner.getPartitionId(f.getPath)
bitSet.get(partitionId)
})
SegmentDirectory(segName, segIdentifier, selected)
}
filteredStatuses
logInfo(s"Selected files after shards pruning:" + pruned.flatMap(_.files).map(_.getPath.toString).mkString(";"))
pruned
}
filteredStatuses
}

override lazy val inputFiles: Array[String] = Array.empty[String]

Expand Down Expand Up @@ -358,6 +365,39 @@ class FilePruner(
matchedShards
}
}

// translate for filter type match
private def convertCastFilter(filter: Expression): Expression = {
filter match {
case expressions.EqualTo(expressions.Cast(a: Attribute, _, _), Literal(v, t)) =>
expressions.EqualTo(a, Literal(v, t))
case expressions.EqualTo(Literal(v, t), expressions.Cast(a: Attribute, _, _)) =>
expressions.EqualTo(Literal(v, t), a)
case expressions.GreaterThan(expressions.Cast(a: Attribute, _, _), Literal(v, t)) =>
expressions.GreaterThan(a, Literal(v, t))
case expressions.GreaterThan(Literal(v, t), expressions.Cast(a: Attribute, _, _)) =>
expressions.GreaterThan(Literal(v, t), a)
case expressions.LessThan(expressions.Cast(a: Attribute, _, _), Literal(v, t)) =>
expressions.LessThan(a, Literal(v, t))
case expressions.LessThan(Literal(v, t), expressions.Cast(a: Attribute, _, _)) =>
expressions.LessThan(Literal(v, t), a)
case expressions.GreaterThanOrEqual(expressions.Cast(a: Attribute, _, _), Literal(v, t)) =>
expressions.GreaterThanOrEqual(a, Literal(v, t))
case expressions.GreaterThanOrEqual(Literal(v, t), expressions.Cast(a: Attribute, _, _)) =>
expressions.GreaterThanOrEqual(Literal(v, t), a)
case expressions.LessThanOrEqual(expressions.Cast(a: Attribute, _, _), Literal(v, t)) =>
expressions.LessThanOrEqual(a, Literal(v, t))
case expressions.LessThanOrEqual(Literal(v, t), expressions.Cast(a: Attribute, _, _)) =>
expressions.LessThanOrEqual(Literal(v, t), a)
case expressions.Or(left, right) =>
expressions.Or(convertCastFilter(left), convertCastFilter(right))
case expressions.And(left, right) =>
expressions.And(convertCastFilter(left), convertCastFilter(right))
case expressions.Not(child) =>
expressions.Not(convertCastFilter(child))
case _ => filter
}
}
}

object FilePruner {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ private List<QueryCallable> prepareAndGenQueryTasks() throws Exception {
tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql"));
tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_lookup"));
tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_casewhen"));
tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_castprunesegs"));

tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_like"));
tasks.add(new QueryCallable(CompareLevel.SAME, joinType, "sql_cache"));
Expand Down

0 comments on commit d4cd28d

Please sign in to comment.