Skip to content

Commit a15af30

Browse files
committed
Address comments for prepareRelation and others
1 parent 7fbd3a8 commit a15af30

File tree

2 files changed

+16
-31
lines changed

2 files changed

+16
-31
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -400,7 +400,7 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
400400
* Generates the code for Inner join.
401401
*/
402402
protected def codegenInner(ctx: CodegenContext, input: Seq[ExprCode]): String = {
403-
val (relationTerm, keyIsKnownUnique) = prepareRelation(ctx)
403+
val (relationTerm, keyIsUnique) = prepareRelation(ctx)
404404
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
405405
val (matched, checkCondition, buildVars) = getJoinCondition(ctx, input)
406406
val numOutput = metricTerm(ctx, "numOutputRows")
@@ -410,7 +410,7 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
410410
case BuildRight => input ++ buildVars
411411
}
412412

413-
if (keyIsKnownUnique) {
413+
if (keyIsUnique) {
414414
s"""
415415
|// generate join key for stream side
416416
|${keyEv.code}
@@ -450,7 +450,7 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
450450
* Generates the code for left or right outer join.
451451
*/
452452
protected def codegenOuter(ctx: CodegenContext, input: Seq[ExprCode]): String = {
453-
val (relationTerm, keyIsKnownUnique) = prepareRelation(ctx)
453+
val (relationTerm, keyIsUnique) = prepareRelation(ctx)
454454
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
455455
val matched = ctx.freshName("matched")
456456
val buildVars = genBuildSideVars(ctx, matched)
@@ -482,7 +482,7 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
482482
case BuildRight => input ++ buildVars
483483
}
484484

485-
if (keyIsKnownUnique) {
485+
if (keyIsUnique) {
486486
s"""
487487
|// generate join key for stream side
488488
|${keyEv.code}
@@ -527,12 +527,12 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
527527
* Generates the code for left semi join.
528528
*/
529529
protected def codegenSemi(ctx: CodegenContext, input: Seq[ExprCode]): String = {
530-
val (relationTerm, keyIsKnownUnique) = prepareRelation(ctx)
530+
val (relationTerm, keyIsUnique) = prepareRelation(ctx)
531531
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
532532
val (matched, checkCondition, _) = getJoinCondition(ctx, input)
533533
val numOutput = metricTerm(ctx, "numOutputRows")
534534

535-
if (keyIsKnownUnique) {
535+
if (keyIsUnique) {
536536
s"""
537537
|// generate join key for stream side
538538
|${keyEv.code}
@@ -576,12 +576,12 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
576576
* Generates the code for anti join.
577577
*/
578578
protected def codegenAnti(ctx: CodegenContext, input: Seq[ExprCode]): String = {
579-
val (relationTerm, keyIsKnownUnique) = prepareRelation(ctx)
579+
val (relationTerm, keyIsUnique) = prepareRelation(ctx)
580580
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
581581
val (matched, checkCondition, _) = getJoinCondition(ctx, input)
582582
val numOutput = metricTerm(ctx, "numOutputRows")
583583

584-
if (keyIsKnownUnique) {
584+
if (keyIsUnique) {
585585
val found = ctx.freshName("found")
586586
s"""
587587
|boolean $found = false;
@@ -637,7 +637,7 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
637637
* Generates the code for existence join.
638638
*/
639639
protected def codegenExistence(ctx: CodegenContext, input: Seq[ExprCode]): String = {
640-
val (relationTerm, keyIsKnownUnique) = prepareRelation(ctx)
640+
val (relationTerm, keyIsUnique) = prepareRelation(ctx)
641641
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
642642
val numOutput = metricTerm(ctx, "numOutputRows")
643643
val existsVar = ctx.freshName("exists")
@@ -664,7 +664,7 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
664664
val resultVar = input ++ Seq(ExprCode.forNonNullValue(
665665
JavaCode.variable(existsVar, BooleanType)))
666666

667-
if (keyIsKnownUnique) {
667+
if (keyIsUnique) {
668668
s"""
669669
|// generate join key for stream side
670670
|${keyEv.code}

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -85,16 +85,8 @@ case class ShuffledHashJoinExec(
8585
// inline mutable state since not many join operations in a task
8686
val streamedInput = ctx.addMutableState(
8787
"scala.collection.Iterator", "streamedInput", v => s"$v = inputs[0];", forceInline = true)
88-
val buildInput = ctx.addMutableState(
89-
"scala.collection.Iterator", "buildInput", v => s"$v = inputs[1];", forceInline = true)
90-
val initRelation = ctx.addMutableState(
91-
CodeGenerator.JAVA_BOOLEAN, "initRelation", v => s"$v = false;", forceInline = true)
9288
val streamedRow = ctx.addMutableState(
9389
"InternalRow", "streamedRow", forceInline = true)
94-
95-
val thisPlan = ctx.addReferenceObj("plan", this)
96-
val (relationTerm, _) = prepareRelation(ctx)
97-
val buildRelation = s"$relationTerm = $thisPlan.buildHashedRelation($buildInput);"
9890
val (streamInputVar, streamInputVarDecl) = createVars(ctx, streamedRow, streamedPlan.output)
9991

10092
val join = joinType match {
@@ -109,12 +101,6 @@ case class ShuffledHashJoinExec(
109101
}
110102

111103
s"""
112-
|// construct hash map for shuffled hash join build side
113-
|if (!$initRelation) {
114-
| $buildRelation
115-
| $initRelation = true;
116-
|}
117-
|
118104
|while ($streamedInput.hasNext()) {
119105
| $streamedRow = (InternalRow) $streamedInput.next();
120106
| ${streamInputVarDecl.mkString("\n")}
@@ -130,13 +116,12 @@ case class ShuffledHashJoinExec(
130116
* and boolean false to indicate key not to be known unique in code-gen time.
131117
*/
132118
protected override def prepareRelation(ctx: CodegenContext): (String, Boolean) = {
133-
if (relationTerm == null) {
134-
// Inline mutable state since not many join operations in a task
135-
relationTerm = ctx.addMutableState(
136-
"org.apache.spark.sql.execution.joins.HashedRelation", "relation", forceInline = true)
137-
}
119+
val thisPlan = ctx.addReferenceObj("plan", this)
120+
val clsName = classOf[HashedRelation].getName
121+
122+
// Inline mutable state since not many join operations in a task
123+
val relationTerm = ctx.addMutableState(clsName, "relation",
124+
v => s"$v = $thisPlan.buildHashedRelation(inputs[1]);", forceInline = true)
138125
(relationTerm, false)
139126
}
140-
141-
private var relationTerm: String = _
142127
}

0 commit comments

Comments
 (0)