Skip to content

Commit

Permalink
HIVE-23937: Take null ordering into consideration when pushing TNK th…
Browse files Browse the repository at this point in the history
…rough inner joins (Attila Magyar, reviewed by Krisztian Kasa)
  • Loading branch information
zeroflag authored Aug 5, 2020
1 parent 1c1dfee commit 05bbaf4
Show file tree
Hide file tree
Showing 9 changed files with 387 additions and 446 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.lib.SemanticNodeProcessor;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDescUtils;
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
import org.apache.hadoop.hive.ql.plan.JoinDesc;
Expand Down Expand Up @@ -278,87 +276,28 @@ private void pushdownInnerJoin(TopNKeyOperator topNKey, int fkJoinInputIndex, bo
LOG.debug("Not pushing {} through {} as non FK side of the join is filtered", topNKey.getName(), join.getName());
return;
}
// Check column origins:
// 1. If all OrderBy columns are coming from the child (FK) table:
// -> move TopNKeyOperator
// 2. If the first n OrderBy columns are coming from the child (FK) table:
// -> copy TopNKeyOperator with the first n columns, and leave the original in place
int prefixLength = keyColumnPrefixLength(join, topNKey, fkJoinInputIndex, topNKey.getConf().getKeyColumns());
if (prefixLength == 0) {
LOG.debug("Not pushing {} through {} as common key column prefix length is 0", topNKey.getName(), join.getName());
CommonKeyPrefix commonKeyPrefix = CommonKeyPrefix.map(
mapUntilColumnEquals(topNKeyDesc.getKeyColumns(), join.getColumnExprMap()),
topNKeyDesc.getColumnSortOrder(),
topNKeyDesc.getNullOrder(),
fkJoinInput.getConf().getKeyCols(),
fkJoinInput.getConf().getColumnExprMap(),
fkJoinInput.getConf().getOrder(),
fkJoinInput.getConf().getNullOrder());
if (commonKeyPrefix.isEmpty() || commonKeyPrefix.size() == topNKeyDesc.getPartitionKeyColumns().size()) {
return;
}
LOG.debug("Pushing a copy of {} through {} and {}",
topNKey.getName(), join.getName(), fkJoinInput.getName());
TopNKeyDesc newTopNKeyDesc = topNKeyDesc.withKeyColumns(prefixLength);
newTopNKeyDesc.setKeyColumns(remapColumns(join, fkJoinInput, newTopNKeyDesc.getKeyColumns()));
final TopNKeyDesc newTopNKeyDesc = topNKeyDesc.combine(commonKeyPrefix);
pushdown(copyDown(fkJoinInput, newTopNKeyDesc));
if (topNKeyDesc.getKeyColumns().size() == prefixLength) {

if (topNKeyDesc.getKeyColumns().size() == commonKeyPrefix.size()) {
LOG.debug("Removing {} above {}", topNKey.getName(), join.getName());
join.removeChildAndAdoptItsChildren(topNKey);
}
}

/**
* Check if the first n keyColumns of the TopNKeyFilter
* are coming from expected side of the join (indicated by the expectedTag).
* @return n
*/
private int keyColumnPrefixLength(
CommonJoinOperator<? extends JoinDesc> join,
TopNKeyOperator topNKeyOperator,
int expectedTag,
List<ExprNodeDesc> keyColumns) {
int commonPrefixLength = 0;
for (ExprNodeDesc orderByKey : keyColumns) {
if (tag(join, topNKeyOperator, orderByKey) == expectedTag) {
commonPrefixLength++;
} else {
return commonPrefixLength;
}
}
return commonPrefixLength;
}

private int tag(CommonJoinOperator<? extends JoinDesc> join, TopNKeyOperator topNKeyOperator, ExprNodeDesc column) {
String colName = columnOutputName(join, topNKeyOperator, column);
if (colName == null) {
return -1;
}
Byte tag = join.getConf().getReversedExprs().get(colName);
return tag == null ? -1 : tag;
}

private String columnOutputName(CommonJoinOperator<? extends JoinDesc> join, TopNKeyOperator topNKeyOperator, ExprNodeDesc column) {
ExprNodeDesc joinExprNode = backtrack(join, topNKeyOperator, column);
if (joinExprNode == null) {
return null;
}
for (Map.Entry<String, ExprNodeDesc> e : join.getColumnExprMap().entrySet()) {
if (e.getValue() == joinExprNode) {
return e.getKey();
}
}
return null;
}

private ExprNodeDesc backtrack(CommonJoinOperator<? extends JoinDesc> join, TopNKeyOperator topNKeyOperator, ExprNodeDesc column) {
try {
ExprNodeDesc joinExprNode = ExprNodeDescUtils.backtrack(column, topNKeyOperator, join);
if (joinExprNode == null || !(joinExprNode instanceof ExprNodeColumnDesc)) {
return null;
}
return joinExprNode;
} catch (SemanticException e) {
throw new RuntimeException(e);
}
}

private List<ExprNodeDesc> remapColumns(CommonJoinOperator<? extends JoinDesc> join, ReduceSinkOperator fkJoinInput, List<ExprNodeDesc> topNKeyColumns) {
List<ExprNodeDesc> joinCols = mapColumns(topNKeyColumns, join.getColumnExprMap());
return mapColumns(joinCols, fkJoinInput.getColumnExprMap());
}

private List<ExprNodeDesc> mapUntilColumnEquals(List<ExprNodeDesc> columns, Map<String,
ExprNodeDesc> colExprMap) {
if (colExprMap == null) {
Expand Down
10 changes: 5 additions & 5 deletions ql/src/test/queries/clientpositive/topnkey_inner_join.q
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ set hive.optimize.topnkey=true;
set hive.optimize.limittranspose=false;

select 'positive: order by columns are coming from child table';
explain select * from orders join customer on customer.id = orders.customer_id order by orders.amount limit 3;
explain select * from orders join customer on customer.id = orders.customer_id order by orders.customer_id limit 3;
explain select * from orders join customer on customer.id = orders.customer_id order by orders.customer_id, orders.amount limit 3;
explain select * from customer join orders on orders.customer_id = customer.id order by orders.amount, orders.customer_id limit 3;
select * from orders join customer on customer.id = orders.customer_id order by orders.amount limit 3;
select * from orders join customer on customer.id = orders.customer_id order by orders.customer_id limit 3;
select * from orders join customer on customer.id = orders.customer_id order by orders.customer_id, orders.amount limit 3;
select * from customer join orders on orders.customer_id = customer.id order by orders.amount, orders.customer_id limit 3;

Expand All @@ -43,10 +43,10 @@ explain select * from orders join customer on customer.id = orders.customer_id o
select * from orders join customer on customer.id = orders.customer_id order by customer.name, orders.amount limit 3;

select 'mixed/positive: 1st n order by columns are coming from child table';
explain select * from orders join customer on customer.id = orders.customer_id order by orders.amount, customer.name limit 3;
select * from orders join customer on customer.id = orders.customer_id order by orders.amount, customer.name limit 3;
explain select * from orders join customer on customer.id = orders.customer_id order by orders.customer_id, customer.name limit 3;
select * from orders join customer on customer.id = orders.customer_id order by orders.customer_id, customer.name limit 3;

select 'positive: nulls first';
select 'negative: nulls first';
explain select * from customer join orders on orders.customer_id = customer.id order by customer_id nulls first limit 1;
select * from customer join orders on orders.customer_id = customer.id order by customer_id nulls first limit 1;

Expand Down
4 changes: 2 additions & 2 deletions ql/src/test/queries/clientpositive/topnkey_inner_join2.q
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,12 @@ explain select name, city
from customer join address
on customer.address_id = address.id
and name in ('Joe', 'Robert','Heisenberg')
order by customer.id
order by customer.address_id
limit 3;

select name, city
from customer join address
on customer.address_id = address.id
and name in ('Joe', 'Robert','Heisenberg')
order by customer.id
order by customer.address_id
limit 3;
Loading

0 comments on commit 05bbaf4

Please sign in to comment.