Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public boolean matches(RelOptRuleCall call) {
RelNode right = join.getRight() instanceof HepRelVertex ? ((HepRelVertex) join.getRight()).getCurrentRel()
: join.getRight();
return left instanceof Exchange && right instanceof Exchange
&& PinotRuleUtils.noExchangeInSubtree(left.getInput(0))
&& PinotRuleUtils.canPushDynamicBroadcastToLeaf(left.getInput(0))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not introduced in this PR, but can we break these checks into multiple steps? We should avoid doing the expensive canPushDynamicBroadcastToLeaf() unless all other checks pass

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes i think this is in the TODO item for this function. unfortunately, there's not too much we can do here.

// default enable dynamic broadcast for SEMI join unless other join strategy were specified
&& (!explicitOtherStrategy && join.getJoinType() == JoinRelType.SEMI && joinInfo.nonEquiConditions.isEmpty());
}
Expand All @@ -162,9 +162,6 @@ public void onMatch(RelOptRuleCall call) {
new LogicalJoin(join.getCluster(), join.getTraitSet(), left.getInput(), dynamicBroadcastExchange,
join.getCondition(), join.getVariablesSet(), join.getJoinType(), join.isSemiJoinDone(),
ImmutableList.copyOf(join.getSystemFieldList()));
// adding pass-through exchange after join b/c currently leaf-stage doesn't support chaining operator(s) after JOIN
PinotLogicalExchange passThroughAfterJoinExchange =
PinotLogicalExchange.create(dynamicFilterJoin, RelDistributions.hash(join.analyzeCondition().leftKeys));
call.transformTo(passThroughAfterJoinExchange);
call.transformTo(dynamicFilterJoin);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.core.Filter;
import org.apache.calcite.rel.core.Join;
import org.apache.calcite.rel.core.Project;
import org.apache.calcite.rel.core.RelFactories;
import org.apache.calcite.rel.core.TableScan;
import org.apache.calcite.rex.RexCall;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.tools.RelBuilder;
Expand Down Expand Up @@ -67,20 +69,42 @@ public static boolean isAggregate(RelNode rel) {
return unboxRel(rel) instanceof Aggregate;
}

// TODO: optimize this part out as it is not efficient to scan the entire subtree for exchanges.
public static boolean noExchangeInSubtree(RelNode relNode) {
if (relNode instanceof HepRelVertex) {
relNode = ((HepRelVertex) relNode).getCurrentRel();
}
if (relNode instanceof Exchange) {
/**
* utility logic to determine if a JOIN can be pushed down to the leaf-stage execution and leverage the
* segment-local info (indexing and others) to speed up the execution.
*
* <p>The logic here is that the "row-representation" of the relation must not have changed. E.g. </p>
* <ul>
* <li>`RelNode` that are single-in, single-out are possible (Project/Filter/)</li>
* <li>`Join` can be stacked on top if we only consider SEMI-JOIN</li>
* <li>`Window` should be allowed but we dont have impl for Window on leaf, so not yet included.</li>
* <li>`Sort` should be allowed but we need to reorder Sort and Join first, so not yet included.</li>
* </ul>
*/
public static boolean canPushDynamicBroadcastToLeaf(RelNode relNode) {
// TODO 1: optimize this part out as it is not efficient to scan the entire subtree for exchanges;
// we should cache the stats in the node (potentially using Trait, e.g. marking LeafTrait & IntermediateTrait)
// TODO 2: this part is similar to how ServerPlanRequestVisitor determines leaf-stage boundary;
// we should refactor and merge both logic
// TODO 3: for JoinNode, currently this only works towards left-side;
// we should support both left and right.
// TODO 4: for JoinNode, currently this only works for SEMI-JOIN, INNER-JOIN can bring in rows from both sides;
// we should check only the non-pipeline-breaker side columns are accessed.
relNode = PinotRuleUtils.unboxRel(relNode);

if (relNode instanceof TableScan) {
// reaching table means it is plan-able.
return true;
} else if (relNode instanceof Project || relNode instanceof Filter) {
// reaching single-in, single-out RelNode means we can continue downward.
return canPushDynamicBroadcastToLeaf(relNode.getInput(0));
} else if (relNode instanceof Join) {
// always check only the left child for dynamic broadcast
return canPushDynamicBroadcastToLeaf(((Join) relNode).getLeft());
} else {
// for all others we don't allow dynamic broadcast
return false;
}
for (RelNode child : relNode.getInputs()) {
if (!noExchangeInSubtree(child)) {
return false;
}
}
return true;
}

public static String extractFunctionName(RexCall function) {
Expand Down
17 changes: 8 additions & 9 deletions pinot-query-planner/src/test/resources/queries/JoinPlans.json
Original file line number Diff line number Diff line change
Expand Up @@ -428,15 +428,14 @@
"output": [
"Execution Plan",
"\nLogicalJoin(condition=[=($2, $8)], joinType=[semi])",
"\n PinotLogicalExchange(distribution=[hash[2]])",
"\n LogicalJoin(condition=[=($0, $7)], joinType=[semi])",
"\n LogicalFilter(condition=[<($2, 100)])",
"\n LogicalTableScan(table=[[b]])",
"\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalFilter(condition=[AND(<>($1, 'bar'), <>($1, 'foo'))])",
"\n LogicalTableScan(table=[[a]])",
"\n PinotLogicalExchange(distribution=[hash[1]])",
"\n LogicalJoin(condition=[=($0, $7)], joinType=[semi])",
"\n LogicalFilter(condition=[<($2, 100)])",
"\n LogicalTableScan(table=[[b]])",
"\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col1=[$0], col2=[$1])",
"\n LogicalFilter(condition=[AND(<>($1, 'bar'), <>($1, 'foo'))])",
"\n LogicalTableScan(table=[[a]])",
"\n PinotLogicalExchange(distribution=[broadcast], relExchangeType=[PIPELINE_BREAKER])",
"\n LogicalProject(col2=[$1], col3=[$2])",
"\n LogicalFilter(condition=[AND(<>($1, 'bar'), <>($1, 'foo'))])",
"\n LogicalTableScan(table=[[a]])",
Expand Down
Loading