Skip to content

Commit c20b3b9

Browse files
committed
Fix the corner case for SMJ inner join
1 parent 9f78b70 commit c20b3b9

File tree

1 file changed

+14
-6
lines changed

1 file changed

+14
-6
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -654,8 +654,10 @@ case class SortMergeJoinExec(
654654
(evaluateVariables(leftVars), "")
655655
}
656656

657-
// The last two line of code generate in processNext here will handle properly
658-
// releasing the resources if the input iterators are not fully consumed
657+
// The last two lines of code generated in processNext here will attempt to handle
658+
// releasing the resources if the input iterators are not fully consumed. It only
659+
// attempts to release the resources of an iterator if the associated child operator
660+
// is codegened
659661
s"""
660662
|while (findNextInnerJoinRows($leftInput, $rightInput)) {
661663
| ${leftVarDecl.mkString("\n")}
@@ -669,10 +671,16 @@ case class SortMergeJoinExec(
669671
| }
670672
| if (shouldStop()) return;
671673
|}
672-
|((org.apache.spark.sql.execution.ScalaIteratorWithBufferedIterator)$leftInput)
673-
| .getBufferedRowIterator().close();
674-
|((org.apache.spark.sql.execution.ScalaIteratorWithBufferedIterator)$rightInput)
675-
| .getBufferedRowIterator().close();
674+
|if ($leftInput instanceof
675+
| org.apache.spark.sql.execution.ScalaIteratorWithBufferedIterator) {
676+
| ((org.apache.spark.sql.execution.ScalaIteratorWithBufferedIterator)$leftInput)
677+
| .getBufferedRowIterator().close();
678+
|}
679+
|if ($rightInput instanceof
680+
| org.apache.spark.sql.execution.ScalaIteratorWithBufferedIterator) {
681+
| ((org.apache.spark.sql.execution.ScalaIteratorWithBufferedIterator)$rightInput)
682+
| .getBufferedRowIterator().close();
683+
|}
676684
""".stripMargin
677685
}
678686
}

0 commit comments

Comments
 (0)