Skip to content

Commit 2c84685

Browse files
wangyumGitHub Enterprise
authored and
GitHub Enterprise
committed
[CARMEL-6496] Implement bloom filter join hint #1201
1 parent cb10771 commit 2c84685

File tree

3 files changed

+42
-15
lines changed

3 files changed

+42
-15
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/hints.scala

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ object JoinStrategyHint {
148148
BROADCAST,
149149
SHUFFLE_MERGE,
150150
SHUFFLE_HASH,
151-
SHUFFLE_REPLICATE_NL)
151+
SHUFFLE_REPLICATE_NL,
152+
BLOOM_FILTER_JOIN)
152153
}
153154

154155
/**
@@ -192,6 +193,15 @@ case object SHUFFLE_REPLICATE_NL extends JoinStrategyHint {
192193
"SHUFFLE_REPLICATE_NL")
193194
}
194195

196+
/**
197+
* The hint for bloom filter join.
198+
*/
199+
case object BLOOM_FILTER_JOIN extends JoinStrategyHint {
200+
override def displayName: String = "bloom_filter_join"
201+
override def hintAliases: Set[String] = Set(
202+
"BLOOM_FILTER_JOIN")
203+
}
204+
195205
/**
196206
* An internal hint to discourage broadcast hash join, used by adaptive query execution.
197207
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/dynamicpruning/DynamicBloomFilterPruning.scala

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,14 @@ object DynamicBloomFilterPruning extends Rule[LogicalPlan]
5858
}.headOption
5959
}
6060

61+
private def hintToBuildBloomFilterLeft(hint: JoinHint): Boolean = {
62+
hint.leftHint.exists(_.strategy.contains(BLOOM_FILTER_JOIN))
63+
}
64+
65+
private def hintToBuildBloomFilterRight(hint: JoinHint): Boolean = {
66+
hint.rightHint.exists(_.strategy.contains(BLOOM_FILTER_JOIN))
67+
}
68+
6169
private def rowCounts(plan: LogicalPlan): BigInt = {
6270
plan.stats.rowCount
6371
.getOrElse(plan.stats.sizeInBytes / EstimationUtils.getSizePerRow(plan.output))
@@ -164,24 +172,24 @@ object DynamicBloomFilterPruning extends Rule[LogicalPlan]
164172
val leftDistCnt = distinctCounts(l, left)
165173
val rightDistCnt = distinctCounts(r, right)
166174

167-
if (isRightSideSmall && canPruneLeft(joinType) && rightRowCnt > 0 &&
168-
!rightHasScalarSubquery &&
169-
rightRowCnt <= conf.dynamicBloomFilterJoinPruningMaxBloomFilterEntries &&
170-
!hasDynamicBloomFilterPruningSubquery(right) && !right.isStreaming &&
171-
getFilterableTableScan(l, left).exists(p =>
172-
!hasSelectivePredicate(p) &&
173-
pruningHasBenefit(rightRowCnt, rightDistCnt, leftDistCnt, p))) {
175+
if (canPruneLeft(joinType) && (hintToBuildBloomFilterRight(hint) ||
176+
(isRightSideSmall && rightRowCnt > 0 && !rightHasScalarSubquery &&
177+
rightRowCnt <= conf.dynamicBloomFilterJoinPruningMaxBloomFilterEntries &&
178+
!hasDynamicBloomFilterPruningSubquery(right) && !right.isStreaming &&
179+
getFilterableTableScan(l, left).exists(p =>
180+
!hasSelectivePredicate(p) &&
181+
pruningHasBenefit(rightRowCnt, rightDistCnt, leftDistCnt, p))))) {
174182
newLeft = insertPredicate(l, newLeft, r, right, rightKeys, rightDistCnt, rightRowCnt)
175183
newHint = newHint.copy(leftHint = Some(HintInfo(strategy = Some(NO_BROADCAST_HASH))))
176184
}
177185

178-
if (isLeftSideSmall && canPruneRight(joinType) && leftRowCount > 0 &&
179-
!leftHasScalarSubquery &&
180-
leftRowCount <= conf.dynamicBloomFilterJoinPruningMaxBloomFilterEntries &&
181-
!hasDynamicBloomFilterPruningSubquery(left) && !left.isStreaming &&
182-
getFilterableTableScan(r, right).exists(p =>
183-
!hasSelectivePredicate(p) &&
184-
pruningHasBenefit(leftRowCount, leftDistCnt, rightDistCnt, p))) {
186+
if (canPruneRight(joinType) && (hintToBuildBloomFilterLeft(hint) ||
187+
(isLeftSideSmall && leftRowCount > 0 && !leftHasScalarSubquery &&
188+
leftRowCount <= conf.dynamicBloomFilterJoinPruningMaxBloomFilterEntries &&
189+
!hasDynamicBloomFilterPruningSubquery(left) && !left.isStreaming &&
190+
getFilterableTableScan(r, right).exists(p =>
191+
!hasSelectivePredicate(p) &&
192+
pruningHasBenefit(leftRowCount, leftDistCnt, rightDistCnt, p))))) {
185193
newRight = insertPredicate(r, newRight, l, left, leftKeys, leftDistCnt, leftRowCount)
186194
newHint = newHint.copy(rightHint = Some(HintInfo(strategy = Some(NO_BROADCAST_HASH))))
187195
}

sql/core/src/test/scala/org/apache/spark/sql/DynamicBloomFilterJoinPruningSuite.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,15 @@ abstract class DynamicBloomFilterPruningSuiteBase
166166
checkBloomFilterPruningPredicate(df, true)
167167
checkAnswer(df, Row(2, 2) :: Nil)
168168
}
169+
170+
// test hint
171+
withSQLConf(SQLConf.DYNAMIC_BLOOM_FILTER_JOIN_PRUNING_ENABLED.key -> "true") {
172+
checkBloomFilterPruningPredicate(
173+
sql("SELECT * FROM t1 f JOIN t2 s ON f.a = s.a AND f.b = 2"), false)
174+
checkBloomFilterPruningPredicate(
175+
sql("SELECT /*+ BLOOM_FILTER_JOIN(s) */ * FROM t1 f JOIN t2 s ON f.a = s.a AND f.b = 2"),
176+
true)
177+
}
169178
}
170179
}
171180

0 commit comments

Comments
 (0)