Skip to content

Commit

Permalink
HIVE-23723: Limit operator pushdown through LOJ (Attila Magyar, revie…
Browse files Browse the repository at this point in the history
…wed by Krisztian Kasa)
  • Loading branch information
zeroflag authored Aug 24, 2020
1 parent 29753d7 commit cb62e1a
Show file tree
Hide file tree
Showing 168 changed files with 5,311 additions and 4,779 deletions.
2 changes: 2 additions & 0 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -2535,6 +2535,8 @@ public static enum ConfVars {
"If the skew information is correctly stored in the metadata, hive.optimize.skewjoin.compiletime\n" +
"would change the query plan to take care of it, and hive.optimize.skewjoin will be a no-op."),

HIVE_OPTIMIZE_LIMIT("hive.optimize.limit", true,
"Optimize limit by pushing through Left Outer Joins and Selects"),
HIVE_OPTIMIZE_TOPNKEY("hive.optimize.topnkey", true, "Whether to enable top n key optimizer."),
HIVE_MAX_TOPN_ALLOWED("hive.optimize.topnkey.max", 128, "Maximum topN value allowed by top n key optimizer.\n" +
"If the LIMIT is greater than this value then top n key optimization won't be used."),
Expand Down
1 change: 1 addition & 0 deletions contrib/src/test/queries/clientpositive/dboutput.q
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
--! qt:dataset:src
set hive.optimize.limit=false;
set hive.mapred.mode=nonstrict;
ADD JAR ${system:maven.local.repository}/org/apache/hive/hive-contrib/${system:hive.version}/hive-contrib-${system:hive.version}.jar;

Expand Down
12 changes: 6 additions & 6 deletions contrib/src/test/results/clientpositive/udf_example_add.q.out
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,12 @@ STAGE PLANS:
TableScan
alias: src
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: 3 (type: int), 6 (type: int), 10 (type: int), 3.3000000000000003D (type: double), 6.6D (type: double), 11.0D (type: double), 10.4D (type: double)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
Statistics: Num rows: 500 Data size: 22000 Basic stats: COMPLETE Column stats: COMPLETE
Limit
Number of rows: 1
Limit
Number of rows: 1
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: 3 (type: int), 6 (type: int), 10 (type: int), 3.3000000000000003D (type: double), 6.6D (type: double), 11.0D (type: double), 10.4D (type: double)
outputColumnNames: _col0, _col1, _col2, _col3, _col4, _col5, _col6
Statistics: Num rows: 1 Data size: 44 Basic stats: COMPLETE Column stats: COMPLETE
ListSink

Expand Down
12 changes: 6 additions & 6 deletions contrib/src/test/results/clientpositive/udf_example_format.q.out
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@ STAGE PLANS:
TableScan
alias: src
Statistics: Num rows: 500 Data size: 5312 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: 'abc' (type: string), '1.1' (type: string), '1.1 1.200000e+00' (type: string), 'a 12 10' (type: string)
outputColumnNames: _col0, _col1, _col2, _col3
Statistics: Num rows: 500 Data size: 182500 Basic stats: COMPLETE Column stats: COMPLETE
Limit
Number of rows: 1
Limit
Number of rows: 1
Statistics: Num rows: 1 Data size: 8 Basic stats: COMPLETE Column stats: COMPLETE
Select Operator
expressions: 'abc' (type: string), '1.1' (type: string), '1.1 1.200000e+00' (type: string), 'a 12 10' (type: string)
outputColumnNames: _col0, _col1, _col2, _col3
Statistics: Num rows: 1 Data size: 365 Basic stats: COMPLETE Column stats: COMPLETE
ListSink

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ public void initialize(HiveConf hiveConf) {
if (HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE) > 0) {
transformations.add(new LimitPushdownOptimizer());
}
if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_LIMIT)) {
transformations.add(new OrderlessLimitPushDownOptimizer());
}
if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) {
transformations.add(new StatsOptimizer());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
/*
*
* * Licensed to the Apache Software Foundation (ASF) under one
* * or more contributor license agreements. See the NOTICE file
* * distributed with this work for additional information
* * regarding copyright ownership. The ASF licenses this file
* * to you under the Apache License, Version 2.0 (the
* * "License"); you may not use this file except in compliance
* * with the License. You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/

package org.apache.hadoop.hive.ql.optimizer;

import static org.apache.hadoop.hive.ql.optimizer.topnkey.TopNKeyProcessor.copyDown;
import static org.apache.hadoop.hive.ql.optimizer.topnkey.TopNKeyPushdownProcessor.moveDown;

import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Stack;

import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
import org.apache.hadoop.hive.ql.exec.LimitOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.lib.RuleRegExp;
import org.apache.hadoop.hive.ql.lib.SemanticGraphWalker;
import org.apache.hadoop.hive.ql.lib.SemanticNodeProcessor;
import org.apache.hadoop.hive.ql.lib.SemanticRule;
import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
import org.apache.hadoop.hive.ql.plan.LimitDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Push LIMIT without an Order By through Selects and Left Outer Joins
*/
public class OrderlessLimitPushDownOptimizer extends Transform {
private static final Logger LOG = LoggerFactory.getLogger(OrderlessLimitPushDownOptimizer.class);

@Override
public ParseContext transform(ParseContext pctx) throws SemanticException {
Map<SemanticRule, SemanticNodeProcessor> opRules = new LinkedHashMap<SemanticRule, SemanticNodeProcessor>();
opRules.put(
new RuleRegExp("LIMIT push down", LimitOperator.getOperatorName() + "%"),
new LimitPushDown());
SemanticGraphWalker walker = new DefaultGraphWalker(new DefaultRuleDispatcher(null, opRules, null));
walker.startWalking(new ArrayList<>(pctx.getTopOps().values()), null);
return pctx;
}

private static class LimitPushDown implements SemanticNodeProcessor {
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs) throws SemanticException {
ReduceSinkOperator reduceSink = findReduceSink(stack);
if (reduceSink == null || !reduceSink.getConf().hasOrderBy()) { // LIMIT + ORDER BY handled by TopNKey push down
pushDown((LimitOperator) nd);
}
return null;
}

private ReduceSinkOperator findReduceSink(Stack<Node> stack) {
for (int i = stack.size() - 2 ; i >= 0; i--) {
Operator<?> operator = (Operator<?>) stack.get(i);
if (operator instanceof ReduceSinkOperator) {
return ((ReduceSinkOperator) operator);
}
}
return null;
}

private void pushDown(LimitOperator limit) throws SemanticException {
Operator<? extends OperatorDesc> parent = limit.getParentOperators().get(0);
if (parent.getNumChild() != 1) {
return;
}
switch (parent.getType()) {
case LIMIT:
combineLimits(limit);
break;
case SELECT:
case FORWARD:
pushdownThroughParent(limit);
break;
case MERGEJOIN:
case JOIN:
case MAPJOIN:
pushThroughLeftOuterJoin(limit);
break;
default:
break;
}
}

private void combineLimits(LimitOperator childLimit) throws SemanticException {
LimitOperator parentLimit = (LimitOperator) childLimit.getParentOperators().get(0);
LimitDesc parentConf = parentLimit.getConf();
LimitDesc childConf = childLimit.getConf();
if (parentConf.getOffset() == childConf.getOffset()) {
int min = Math.min(parentConf.getLimit(), childConf.getLimit());
LOG.debug("Combining two limits child={}, parent={}, newLimit={}", childLimit, parentLimit, min);
parentConf.setLimit(min);
parentLimit.removeChildAndAdoptItsChildren(childLimit);
pushDown(parentLimit);
}
}

private void pushdownThroughParent(LimitOperator limit) throws SemanticException {
Operator<? extends OperatorDesc> parent = limit.getParentOperators().get(0);
LOG.debug("Pushing {} through {}", limit.getName(), parent.getName());
moveDown(limit);
pushDown(limit);
}

private void pushThroughLeftOuterJoin(LimitOperator limit)
throws SemanticException {
CommonJoinOperator<? extends JoinDesc> join =
(CommonJoinOperator<? extends JoinDesc>) limit.getParentOperators().get(0);
JoinCondDesc[] joinConds = join.getConf().getConds();
JoinCondDesc firstJoinCond = joinConds[0];
for (JoinCondDesc joinCond : joinConds) {
if (!firstJoinCond.equals(joinCond)) {
return;
}
}
if (firstJoinCond.getType() == JoinDesc.LEFT_OUTER_JOIN) {
List<Operator<? extends OperatorDesc>> joinInputs = join.getParentOperators();
final ReduceSinkOperator reduceSinkOperator = (ReduceSinkOperator) joinInputs.get(0);

pushDown((LimitOperator) copyDown(reduceSinkOperator, new LimitDesc(limit.getConf())));
// the copied limit will take care of the offset, need to reset the offset in the original to not to lose rows
limit.getConf().setOffset(0);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
return null;
}

static TopNKeyOperator copyDown(Operator<? extends OperatorDesc> child, OperatorDesc operatorDesc) {
public static Operator<? extends OperatorDesc> copyDown(Operator<? extends OperatorDesc> child, OperatorDesc operatorDesc) {
final List<Operator<? extends OperatorDesc>> parents = child.getParentOperators();

final Operator<? extends OperatorDesc> newOperator =
Expand All @@ -118,6 +118,6 @@ static TopNKeyOperator copyDown(Operator<? extends OperatorDesc> child, Operator
child.getParentOperators().clear();
child.getParentOperators().add(newOperator);

return (TopNKeyOperator) newOperator;
return newOperator;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private void pushdownThroughGroupBy(TopNKeyOperator topNKey) throws SemanticExce

LOG.debug("Pushing a copy of {} through {}", topNKey.getName(), groupBy.getName());
final TopNKeyDesc newTopNKeyDesc = topNKeyDesc.combine(commonKeyPrefix);
pushdown(copyDown(groupBy, newTopNKeyDesc));
pushdown((TopNKeyOperator) copyDown(groupBy, newTopNKeyDesc));

if (topNKeyDesc.getKeyColumns().size() == commonKeyPrefix.size()) {
LOG.debug("Removing {} above {}", topNKey.getName(), groupBy.getName());
Expand Down Expand Up @@ -189,7 +189,7 @@ private void pushdownThroughReduceSink(TopNKeyOperator topNKey) throws SemanticE

LOG.debug("Pushing a copy of {} through {}", topNKey.getName(), reduceSink.getName());
final TopNKeyDesc newTopNKeyDesc = topNKeyDesc.combine(commonKeyPrefix);
pushdown(copyDown(reduceSink, newTopNKeyDesc));
pushdown((TopNKeyOperator) copyDown(reduceSink, newTopNKeyDesc));

if (topNKeyDesc.getKeyColumns().size() == commonKeyPrefix.size()) {
LOG.debug("Removing {} above {}", topNKey.getName(), reduceSink.getName());
Expand Down Expand Up @@ -250,7 +250,7 @@ private void pushdownThroughLeftOuterJoin(TopNKeyOperator topNKey) throws Semant
LOG.debug("Pushing a copy of {} through {} and {}",
topNKey.getName(), join.getName(), reduceSinkOperator.getName());
final TopNKeyDesc newTopNKeyDesc = topNKeyDesc.combine(commonKeyPrefix);
pushdown(copyDown(reduceSinkOperator, newTopNKeyDesc));
pushdown((TopNKeyOperator) copyDown(reduceSinkOperator, newTopNKeyDesc));

if (topNKeyDesc.getKeyColumns().size() == commonKeyPrefix.size()) {
LOG.debug("Removing {} above {}", topNKey.getName(), join.getName());
Expand Down Expand Up @@ -290,7 +290,7 @@ private void pushdownInnerJoin(TopNKeyOperator topNKey, int fkJoinInputIndex, bo
LOG.debug("Pushing a copy of {} through {} and {}",
topNKey.getName(), join.getName(), fkJoinInput.getName());
final TopNKeyDesc newTopNKeyDesc = topNKeyDesc.combine(commonKeyPrefix);
pushdown(copyDown(fkJoinInput, newTopNKeyDesc));
pushdown((TopNKeyOperator) copyDown(fkJoinInput, newTopNKeyDesc));

if (topNKeyDesc.getKeyColumns().size() == commonKeyPrefix.size()) {
LOG.debug("Removing {} above {}", topNKey.getName(), join.getName());
Expand Down Expand Up @@ -379,22 +379,22 @@ private static boolean hasSameTopNKeyDesc(Operator<? extends OperatorDesc> opera
return opDesc.isSame(desc);
}

private static void moveDown(TopNKeyOperator topNKey) throws SemanticException {
public static void moveDown(Operator<? extends OperatorDesc> operator) throws SemanticException {

assert topNKey.getNumParent() == 1;
final Operator<? extends OperatorDesc> parent = topNKey.getParentOperators().get(0);
assert operator.getNumParent() == 1;
final Operator<? extends OperatorDesc> parent = operator.getParentOperators().get(0);
final List<Operator<? extends OperatorDesc>> grandParents = parent.getParentOperators();
parent.removeChildAndAdoptItsChildren(topNKey);
parent.removeChildAndAdoptItsChildren(operator);
for (Operator<? extends OperatorDesc> grandParent : grandParents) {
grandParent.replaceChild(parent, topNKey);
grandParent.replaceChild(parent, operator);
}
topNKey.getParentOperators().clear();
topNKey.getParentOperators().addAll(grandParents);
operator.getParentOperators().clear();
operator.getParentOperators().addAll(grandParents);

topNKey.getChildOperators().clear();
topNKey.getChildOperators().add(parent);
operator.getChildOperators().clear();
operator.getChildOperators().add(parent);

parent.getParentOperators().clear();
parent.getParentOperators().add(topNKey);
parent.getParentOperators().add(operator);
}
}
5 changes: 5 additions & 0 deletions ql/src/java/org/apache/hadoop/hive/ql/plan/LimitDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public LimitDesc(final int offset, final int limit) {
this.limit = limit;
}

public LimitDesc(LimitDesc conf) {
this(conf.getOffset() == null ? 0 : conf.getOffset() , conf.getLimit());
this.leastRows = conf.leastRows;
}

/**
* not to print the offset if it is 0 we need to turn null.
* use Integer instead of int.
Expand Down
33 changes: 33 additions & 0 deletions ql/src/test/queries/clientpositive/limit_pushdown4.q
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
--! qt:dataset:src
--! qt:dataset:alltypesorc
set hive.mapred.mode=nonstrict;
set hive.explain.user=false;
set hive.limit.pushdown.memory.usage=0.3f;
set hive.optimize.reducededuplication.min.reducer=1;

-- SORT_QUERY_RESULTS

-- push through LOJ

set hive.optimize.limit=true;

select 'positive LOJ';
explain
SELECT src1.key, src2.value FROM src src1 LEFT OUTER JOIN src src2 ON (src1.key = src2.key) LIMIT 5;
SELECT src1.key, src2.value FROM src src1 LEFT OUTER JOIN src src2 ON (src1.key = src2.key) LIMIT 5;

select 'negative - order';
explain
SELECT src1.key, src2.value FROM src src1 LEFT OUTER JOIN src src2 ON (src1.key = src2.key) ORDER BY src1.key LIMIT 5;

select 'negative - group by';
explain
SELECT src1.key, count(src2.value) FROM src src1 LEFT OUTER JOIN src src2 ON (src1.key = src2.key) GROUP BY src1.key LIMIT 5;

set hive.optimize.limit=false;

select 'negative - disabled';

explain
SELECT src1.key, src2.value FROM src src1 LEFT OUTER JOIN src src2 ON (src1.key = src2.key) LIMIT 5;
SELECT src1.key, src2.value FROM src src1 LEFT OUTER JOIN src src2 ON (src1.key = src2.key) LIMIT 5;
1 change: 1 addition & 0 deletions ql/src/test/queries/clientpositive/vector_orc_null_check.q
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
--! qt:dataset:src
SET hive.vectorized.execution.enabled=true;
set hive.fetch.task.conversion=none;

Expand Down
1 change: 1 addition & 0 deletions ql/src/test/queries/clientpositive/vector_udf_inline.q
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
--! qt:dataset:src
set hive.fetch.task.conversion=none;
SET hive.vectorized.execution.enabled=true;

Expand Down
Loading

0 comments on commit cb62e1a

Please sign in to comment.