Skip to content

[SPARK-36612][SQL] Support left outer join build left or right outer join build right in shuffled hash join #41398

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,14 @@ trait JoinSelectionHelper {

def canBuildShuffledHashJoinLeft(joinType: JoinType): Boolean = {
joinType match {
case _: InnerLike | RightOuter | FullOuter => true
case _: InnerLike | LeftOuter | FullOuter | RightOuter => true
case _ => false
}
}

def canBuildShuffledHashJoinRight(joinType: JoinType): Boolean = {
joinType match {
case _: InnerLike | LeftOuter | FullOuter |
case _: InnerLike | LeftOuter | FullOuter | RightOuter |
LeftSemi | LeftAnti | _: ExistenceJoin => true
case _ => false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ private[execution] object HashedRelation {
* Create a HashedRelation from an Iterator of InternalRow.
*
* @param allowsNullKey Allow NULL keys in HashedRelation.
* This is used for full outer join in `ShuffledHashJoinExec` only.
* @param ignoresDuplicatedKey Ignore rows with duplicated keys in HashedRelation.
* This is only used for semi and anti join without join condition in
* `ShuffledHashJoinExec` only.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,12 @@ case class ShuffledHashJoinExec(
override def outputPartitioning: Partitioning = super[ShuffledJoin].outputPartitioning

override def outputOrdering: Seq[SortOrder] = joinType match {
// For outer joins where the outer side is build-side, order cannot be guaranteed.
// The algorithm performs an additional un-ordered iteration on build-side (HashedRelation)
// to find unmatched rows to satisfy the outer join semantic.
case FullOuter => Nil
case LeftOuter if buildSide == BuildLeft => Nil
case RightOuter if buildSide == BuildRight => Nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some comments to explain why the ordering can't be preserved?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, added a comment as per my understanding (please let me know if I misunderstand something)

My thought was, because the second iteration on the build-side (for outer-join semantic) is on a hashedRelation, the result cannot be in order.

case _ => super.outputOrdering
}

Expand All @@ -83,8 +88,10 @@ case class ShuffledHashJoinExec(
iter,
buildBoundKeys,
taskMemoryManager = context.taskMemoryManager(),
// Full outer join needs support for NULL key in HashedRelation.
allowsNullKey = joinType == FullOuter,
// build-side or full outer join needs support for NULL key in HashedRelation.
allowsNullKey = joinType == FullOuter ||
(joinType == LeftOuter && buildSide == BuildLeft) ||
(joinType == RightOuter && buildSide == BuildRight),
ignoresDuplicatedKey = ignoreDuplicatedKey)
buildTime += NANOSECONDS.toMillis(System.nanoTime() - start)
buildDataSize += relation.estimatedSize
Expand All @@ -98,16 +105,22 @@ case class ShuffledHashJoinExec(
streamedPlan.execute().zipPartitions(buildPlan.execute()) { (streamIter, buildIter) =>
val hashed = buildHashedRelation(buildIter)
joinType match {
case FullOuter => fullOuterJoin(streamIter, hashed, numOutputRows)
case FullOuter => buildSideOrFullOuterJoin(streamIter, hashed, numOutputRows,
isFullOuterJoin = true)
case LeftOuter if buildSide.equals(BuildLeft) =>
buildSideOrFullOuterJoin(streamIter, hashed, numOutputRows, isFullOuterJoin = false)
case RightOuter if buildSide.equals(BuildRight) =>
buildSideOrFullOuterJoin(streamIter, hashed, numOutputRows, isFullOuterJoin = false)
case _ => join(streamIter, hashed, numOutputRows)
}
}
}

private def fullOuterJoin(
private def buildSideOrFullOuterJoin(
streamIter: Iterator[InternalRow],
hashedRelation: HashedRelation,
numOutputRows: SQLMetric): Iterator[InternalRow] = {
numOutputRows: SQLMetric,
isFullOuterJoin: Boolean): Iterator[InternalRow] = {
val joinKeys = streamSideKeyGenerator()
val joinRow = new JoinedRow
val (joinRowWithStream, joinRowWithBuild) = {
Expand All @@ -130,11 +143,11 @@ case class ShuffledHashJoinExec(
}

val iter = if (hashedRelation.keyIsUnique) {
fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream,
joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow)
buildSideOrFullOuterJoinUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream,
joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, isFullOuterJoin)
} else {
fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream,
joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow)
buildSideOrFullOuterJoinNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream,
joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, isFullOuterJoin)
}

val resultProj = UnsafeProjection.create(output, output)
Expand All @@ -145,31 +158,38 @@ case class ShuffledHashJoinExec(
}

/**
* Full outer shuffled hash join with unique join keys:
* Shuffled hash join with unique join keys, where an outer side is the build side.
* 1. Process rows from stream side by looking up hash relation.
* Mark the matched rows from build side be looked up.
* A bit set is used to track matched rows with key index.
* 2. Process rows from build side by iterating hash relation.
* Filter out rows from build side being matched already,
* by checking key index from bit set.
*/
private def fullOuterJoinWithUniqueKey(
private def buildSideOrFullOuterJoinUniqueKey(
streamIter: Iterator[InternalRow],
hashedRelation: HashedRelation,
joinKeys: UnsafeProjection,
joinRowWithStream: InternalRow => JoinedRow,
joinRowWithBuild: InternalRow => JoinedRow,
streamNullJoinRowWithBuild: => InternalRow => JoinedRow,
buildNullRow: GenericInternalRow): Iterator[InternalRow] = {
buildNullRow: GenericInternalRow,
isFullOuterJoin: Boolean): Iterator[InternalRow] = {
val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
longMetric("buildDataSize") += matchedKeys.capacity / 8

def noMatch = if (isFullOuterJoin) {
Some(joinRowWithBuild(buildNullRow))
} else {
None
}

// Process stream side with looking up hash relation
val streamResultIter = streamIter.map { srow =>
val streamResultIter = streamIter.flatMap { srow =>
joinRowWithStream(srow)
val keys = joinKeys(srow)
if (keys.anyNull) {
joinRowWithBuild(buildNullRow)
noMatch
} else {
val matched = hashedRelation.getValueWithKeyIndex(keys)
if (matched != null) {
Expand All @@ -178,12 +198,12 @@ case class ShuffledHashJoinExec(
val joinRow = joinRowWithBuild(buildRow)
if (boundCondition(joinRow)) {
matchedKeys.set(keyIndex)
joinRow
Some(joinRow)
} else {
joinRowWithBuild(buildNullRow)
noMatch
}
} else {
joinRowWithBuild(buildNullRow)
noMatch
}
}
}
Expand All @@ -205,7 +225,7 @@ case class ShuffledHashJoinExec(
}

/**
* Full outer shuffled hash join with non-unique join keys:
* Shuffled hash join with non-unique join keys, where an outer side is the build side.
* 1. Process rows from stream side by looking up hash relation.
* Mark the matched rows from build side be looked up.
* A [[OpenHashSet]] (Long) is used to track matched rows with
Expand All @@ -219,14 +239,15 @@ case class ShuffledHashJoinExec(
* the value indices of its tuples will be 0, 1 and 2.
* Note that value indices of tuples with different keys are incomparable.
*/
private def fullOuterJoinWithNonUniqueKey(
private def buildSideOrFullOuterJoinNonUniqueKey(
streamIter: Iterator[InternalRow],
hashedRelation: HashedRelation,
joinKeys: UnsafeProjection,
joinRowWithStream: InternalRow => JoinedRow,
joinRowWithBuild: InternalRow => JoinedRow,
streamNullJoinRowWithBuild: => InternalRow => JoinedRow,
buildNullRow: GenericInternalRow): Iterator[InternalRow] = {
buildNullRow: GenericInternalRow,
isFullOuterJoin: Boolean): Iterator[InternalRow] = {
val matchedRows = new OpenHashSet[Long]
TaskContext.get().addTaskCompletionListener[Unit](_ => {
// At the end of the task, update the task's memory usage for this
Expand All @@ -252,7 +273,12 @@ case class ShuffledHashJoinExec(
val joinRow = joinRowWithStream(srow)
val keys = joinKeys(srow)
if (keys.anyNull) {
Iterator.single(joinRowWithBuild(buildNullRow))
// return row with build side NULL row to satisfy full outer join semantics if enabled
if (isFullOuterJoin) {
Iterator.single(joinRowWithBuild(buildNullRow))
} else {
Iterator.empty
}
} else {
val buildIter = hashedRelation.getWithKeyIndex(keys)
new RowIterator {
Expand All @@ -272,8 +298,8 @@ case class ShuffledHashJoinExec(
}
// When we reach here, it means no match is found for this key.
// So we need to return one row with build side NULL row,
// to satisfy the full outer join semantic.
if (!found) {
// to satisfy the full outer join semantic if enabled.
if (!found && isFullOuterJoin) {
joinRowWithBuild(buildNullRow)
// Set `found` to be true as we only need to return one row
// but no more.
Expand Down Expand Up @@ -314,6 +340,8 @@ case class ShuffledHashJoinExec(

override def supportCodegen: Boolean = joinType match {
case FullOuter => conf.getConf(SQLConf.ENABLE_FULL_OUTER_SHUFFLED_HASH_JOIN_CODEGEN)
case LeftOuter if buildSide == BuildLeft => false
case RightOuter if buildSide == BuildRight => false
case _ => true
}

Expand Down
30 changes: 21 additions & 9 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinHintSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -489,10 +489,16 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
assertShuffleHashJoin(
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil)), BuildLeft)
assertShuffleHashJoin(
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildRight)
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "left")), BuildLeft)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this situation, t1 is smaller than t2, so it now picks t1. Before it was not possible to pick t1 and so t2 was picked.

Copy link
Member

@viirya viirya Jun 3, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I meant that the original test coverage (BuildRight) is removed and lost.

assertShuffleHashJoin(
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1, t2)" :: Nil, "right")), BuildLeft)

// Determine build side based on hint
assertShuffleHashJoin(
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil, "left")), BuildLeft)
assertShuffleHashJoin(
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t2)" :: Nil, "right")), BuildRight)

// Shuffle-hash hint prioritized over shuffle-replicate-nl hint
assertShuffleHashJoin(
sql(equiJoinQueryWithHint("SHUFFLE_REPLICATE_NL(t2)" :: "SHUFFLE_HASH(t1)" :: Nil)),
Expand All @@ -507,8 +513,6 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
BuildLeft)

// Shuffle-hash hint specified but not doable
assertBroadcastHashJoin(
sql(equiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil, "left")), BuildRight)
assertBroadcastNLJoin(
sql(nonEquiJoinQueryWithHint("SHUFFLE_HASH(t1)" :: Nil)), BuildLeft)
}
Expand Down Expand Up @@ -606,13 +610,25 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
withLogAppender(hintAppender, level = Some(Level.WARN)) {
assertShuffleMergeJoin(
df1.hint("BROADCAST").join(df2, $"a1" === $"b1", joinType))
}

val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
.filter(_.contains("is not supported in the query:"))
assert(logs.size === 1)
logs.foreach(log =>
assert(log.contains(s"build left for ${joinType.split("_").mkString(" ")} join.")))
}

Seq("left_semi", "left_anti").foreach { joinType =>
val hintAppender = new LogAppender(s"join hint build side check for $joinType")
withLogAppender(hintAppender, level = Some(Level.WARN)) {
assertShuffleMergeJoin(
df1.hint("SHUFFLE_HASH").join(df2, $"a1" === $"b1", joinType))
}

val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
.filter(_.contains("is not supported in the query:"))
assert(logs.size === 2)
assert(logs.size === 1)
logs.foreach(log =>
assert(log.contains(s"build left for ${joinType.split("_").mkString(" ")} join.")))
}
Expand All @@ -622,8 +638,6 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
withLogAppender(hintAppender, level = Some(Level.WARN)) {
assertBroadcastHashJoin(
df1.join(df2.hint("BROADCAST"), $"a1" === $"b1", joinType), BuildRight)
assertShuffleHashJoin(
df1.join(df2.hint("SHUFFLE_HASH"), $"a1" === $"b1", joinType), BuildRight)
}

val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
Expand All @@ -636,12 +650,10 @@ class JoinHintSuite extends PlanTest with SharedSparkSession with AdaptiveSparkP
withLogAppender(hintAppender, level = Some(Level.WARN)) {
assertShuffleMergeJoin(
df1.join(df2.hint("BROADCAST"), $"a1" === $"b1", joinType))
assertShuffleMergeJoin(
df1.join(df2.hint("SHUFFLE_HASH"), $"a1" === $"b1", joinType))
}
val logs = hintAppender.loggingEvents.map(_.getMessage.getFormattedMessage)
.filter(_.contains("is not supported in the query:"))
assert(logs.size === 2)
assert(logs.size === 1)
logs.foreach(log =>
assert(log.contains(s"build right for ${joinType.split("_").mkString(" ")} join.")))
}
Expand Down
77 changes: 77 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,83 @@ class JoinSuite extends QueryTest with SharedSparkSession with AdaptiveSparkPlan
}
}

test("SPARK-36612: Support left outer join build left or right outer join build right in " +
"shuffled hash join") {
val inputDFs = Seq(
// Test unique join key
(spark.range(10).selectExpr("id as k1"),
spark.range(30).selectExpr("id as k2"),
$"k1" === $"k2"),
// Test non-unique join key
(spark.range(10).selectExpr("id % 5 as k1"),
spark.range(30).selectExpr("id % 5 as k2"),
$"k1" === $"k2"),
// Test empty build side
(spark.range(10).selectExpr("id as k1").filter("k1 < -1"),
spark.range(30).selectExpr("id as k2"),
$"k1" === $"k2"),
// Test empty stream side
(spark.range(10).selectExpr("id as k1"),
spark.range(30).selectExpr("id as k2").filter("k2 < -1"),
$"k1" === $"k2"),
// Test empty build and stream side
(spark.range(10).selectExpr("id as k1").filter("k1 < -1"),
spark.range(30).selectExpr("id as k2").filter("k2 < -1"),
$"k1" === $"k2"),
// Test string join key
(spark.range(10).selectExpr("cast(id * 3 as string) as k1"),
spark.range(30).selectExpr("cast(id as string) as k2"),
$"k1" === $"k2"),
// Test build side at right
(spark.range(30).selectExpr("cast(id / 3 as string) as k1"),
spark.range(10).selectExpr("cast(id as string) as k2"),
$"k1" === $"k2"),
// Test NULL join key
(spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr("value as k1"),
spark.range(30).map(i => if (i % 4 == 0) i else null).selectExpr("value as k2"),
$"k1" === $"k2"),
(spark.range(10).map(i => if (i % 3 == 0) i else null).selectExpr("value as k1"),
spark.range(30).map(i => if (i % 5 == 0) i else null).selectExpr("value as k2"),
$"k1" === $"k2"),
// Test multiple join keys
(spark.range(10).map(i => if (i % 2 == 0) i else null).selectExpr(
"value as k1", "cast(value % 5 as short) as k2", "cast(value * 3 as long) as k3"),
spark.range(30).map(i => if (i % 3 == 0) i else null).selectExpr(
"value as k4", "cast(value % 5 as short) as k5", "cast(value * 3 as long) as k6"),
$"k1" === $"k4" && $"k2" === $"k5" && $"k3" === $"k6")
)

// test left outer with left side build
inputDFs.foreach { case (df1, df2, joinExprs) =>
val smjDF = df1.hint("SHUFFLE_MERGE").join(df2, joinExprs, "leftouter")
assert(collect(smjDF.queryExecution.executedPlan) {
case _: SortMergeJoinExec => true }.size === 1)
val smjResult = smjDF.collect()

val shjDF = df1.hint("SHUFFLE_HASH").join(df2, joinExprs, "leftouter")
assert(collect(shjDF.queryExecution.executedPlan) {
case _: ShuffledHashJoinExec => true
}.size === 1)
// Same result between shuffled hash join and sort merge join
checkAnswer(shjDF, smjResult)
}

// test right outer with right side build
inputDFs.foreach { case (df2, df1, joinExprs) =>
val smjDF = df2.join(df1.hint("SHUFFLE_MERGE"), joinExprs, "rightouter")
assert(collect(smjDF.queryExecution.executedPlan) {
case _: SortMergeJoinExec => true }.size === 1)
val smjResult = smjDF.collect()

val shjDF = df2.join(df1.hint("SHUFFLE_HASH"), joinExprs, "rightouter")
assert(collect(shjDF.queryExecution.executedPlan) {
case _: ShuffledHashJoinExec => true
}.size === 1)
Comment on lines +1320 to +1323
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to verify build side of ShuffledHashJoinExec is BuildLeft here? Or hint is always working?

// Same result between shuffled hash join and sort merge join
checkAnswer(shjDF, smjResult)
}
}

test("SPARK-32649: Optimize BHJ/SHJ inner/semi join with empty hashed relation") {
val inputDFs = Seq(
// Test empty build side for inner join
Expand Down