Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public static class AggregateOptions {

public static class JoinHintOptions {
public static final String JOIN_STRATEGY = "join_strategy";
public static final String DYNAMIC_BROADCAST_JOIN_STRATEGY = "dynamic_broadcast";
public static final String HASH_TABLE_JOIN_STRATEGY = "hash_table";
/**
* Max rows allowed to build the right table hash collection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@
public class PinotJoinToDynamicBroadcastRule extends RelOptRule {
public static final PinotJoinToDynamicBroadcastRule INSTANCE =
new PinotJoinToDynamicBroadcastRule(PinotRuleUtils.PINOT_REL_FACTORY);
private static final String DYNAMIC_BROADCAST_HINT_OPTION_VALUE = "dynamic_broadcast";

public PinotJoinToDynamicBroadcastRule(RelBuilderFactory factory) {
super(operand(LogicalJoin.class, any()), factory, null);
Expand All @@ -134,17 +133,18 @@ public boolean matches(RelOptRuleCall call) {
PinotHintOptions.JOIN_HINT_OPTIONS, PinotHintOptions.JoinHintOptions.JOIN_STRATEGY);
List<String> joinStrategies = joinStrategyString != null ? StringUtils.split(joinStrategyString, ",")
: Collections.emptyList();
if (!joinStrategies.contains(DYNAMIC_BROADCAST_HINT_OPTION_VALUE)) {
return false;
}
boolean explicitOtherStrategy = joinStrategies.size() > 0
&& !joinStrategies.contains(PinotHintOptions.JoinHintOptions.DYNAMIC_BROADCAST_JOIN_STRATEGY);

JoinInfo joinInfo = join.analyzeCondition();
RelNode left = join.getLeft() instanceof HepRelVertex ? ((HepRelVertex) join.getLeft()).getCurrentRel()
: join.getLeft();
RelNode right = join.getRight() instanceof HepRelVertex ? ((HepRelVertex) join.getRight()).getCurrentRel()
: join.getRight();
return left instanceof Exchange && right instanceof Exchange
&& PinotRuleUtils.noExchangeInSubtree(left.getInput(0))
&& (join.getJoinType() == JoinRelType.SEMI && joinInfo.nonEquiConditions.isEmpty());
// default enable dynamic broadcast for SEMI join unless other join strategy were specified
&& (!explicitOtherStrategy && join.getJoinType() == JoinRelType.SEMI && joinInfo.nonEquiConditions.isEmpty());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is dynamic broadcast useful for non-SEMI join? Currently it won't be applied even if we explicitly hint about it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be in the future. currently it will only apply to SEMI join

}

@Override
Expand Down
4 changes: 2 additions & 2 deletions pinot-query-planner/src/test/resources/queries/JoinPlans.json
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@
},
{
"description": "Semi join with IN clause",
"sql": "EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col3 IN (SELECT col3 FROM b)",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy = 'hash_table') */ col1, col2 FROM a WHERE col3 IN (SELECT col3 FROM b)",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], col2=[$1])",
Expand All @@ -239,7 +239,7 @@
},
{
"description": "Semi join with multiple IN clause",
"sql": "EXPLAIN PLAN FOR SELECT col1, col2 FROM a WHERE col2 = 'test' AND col3 IN (SELECT col3 FROM b WHERE col1='foo') AND col3 IN (SELECT col3 FROM b WHERE col1='bar') AND col3 IN (SELECT col3 FROM b WHERE col1='foobar')",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy = 'hash_table') */ col1, col2 FROM a WHERE col2 = 'test' AND col3 IN (SELECT col3 FROM b WHERE col1='foo') AND col3 IN (SELECT col3 FROM b WHERE col1='bar') AND col3 IN (SELECT col3 FROM b WHERE col1='foobar')",
"output": [
"Execution Plan",
"\nLogicalProject(col1=[$0], col2=[$1])",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
},
{
"description": "semi-join with dynamic_broadcast join strategy",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ a.col1, a.col2 FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0)",
"sql": "EXPLAIN PLAN FOR SELECT a.col1, a.col2 FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0)",
"output": [
"Execution Plan",
"\nPinotLogicalExchange(distribution=[hash[0]])",
Expand All @@ -38,7 +38,7 @@
},
{
"description": "semi-join with dynamic_broadcast join strategy then group-by on same key",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy='dynamic_broadcast'), aggOptionsInternal(agg_type='DIRECT') */ a.col1, SUM(a.col3) FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0) GROUP BY 1",
"sql": "EXPLAIN PLAN FOR SELECT /*+ aggOptionsInternal(agg_type='DIRECT') */ a.col1, SUM(a.col3) FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0) GROUP BY 1",
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], EXPR$1=[$SUM0($1)])",
Expand All @@ -55,7 +55,7 @@
},
{
"description": "semi-join with dynamic_broadcast join strategy then group-by on different key",
"sql": "EXPLAIN PLAN FOR SELECT /*+ joinOptions(join_strategy='dynamic_broadcast') */ a.col2, SUM(a.col3) FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0) GROUP BY 1",
"sql": "EXPLAIN PLAN FOR SELECT a.col2, SUM(a.col3) FROM a WHERE a.col1 IN (SELECT col2 FROM b WHERE b.col3 > 0) GROUP BY 1",
"output": [
"Execution Plan",
"\nLogicalAggregate(group=[{0}], agg#0=[$SUM0($1)])",
Expand Down