@@ -73,22 +73,23 @@ case class SortMergeJoin(
73
73
private [this ] var rightElement : Row = _
74
74
private [this ] var leftKey : Row = _
75
75
private [this ] var rightKey : Row = _
76
- private [this ] var leftMatches : CompactBuffer [Row ] = _
77
76
private [this ] var rightMatches : CompactBuffer [Row ] = _
78
- private [this ] var leftPosition : Int = - 1
79
77
private [this ] var rightPosition : Int = - 1
78
+ private [this ] var stop : Boolean = false
79
+ private [this ] var matchKey : Row = _
80
80
81
- override final def hasNext : Boolean = leftPosition != - 1 || nextMatchingPair
81
+ override final def hasNext : Boolean = nextMatchingPair()
82
82
83
83
override final def next (): Row = {
84
84
if (hasNext) {
85
- val joinedRow = joinRow(leftMatches(leftPosition) , rightMatches(rightPosition))
85
+ val joinedRow = joinRow(leftElement , rightMatches(rightPosition))
86
86
rightPosition += 1
87
87
if (rightPosition >= rightMatches.size) {
88
- leftPosition += 1
89
88
rightPosition = 0
90
- if (leftPosition >= leftMatches.size) {
91
- leftPosition = - 1
89
+ fetchLeft()
90
+ if (leftElement == null || ordering.compare(leftKey, matchKey) != 0 ) {
91
+ stop = false
92
+ rightMatches = null
92
93
}
93
94
}
94
95
joinedRow
@@ -130,9 +131,7 @@ case class SortMergeJoin(
130
131
* of tuples.
131
132
*/
132
133
private def nextMatchingPair (): Boolean = {
133
- if (leftPosition == - 1 ) {
134
- leftMatches = null
135
- var stop : Boolean = false
134
+ if (! stop && rightElement != null ) {
136
135
while (! stop && leftElement != null && rightElement != null ) {
137
136
stop = ordering.compare(leftKey, rightKey) == 0 && ! leftKey.anyNull
138
137
if (ordering.compare(leftKey, rightKey) > 0 || rightKey.anyNull) {
@@ -142,27 +141,21 @@ case class SortMergeJoin(
142
141
}
143
142
}
144
143
rightMatches = new CompactBuffer [Row ]()
145
- while (stop && rightElement != null ) {
146
- rightMatches += rightElement
147
- fetchRight()
148
- // exit loop when run out of right matches
149
- stop = ordering.compare(leftKey, rightKey) == 0
150
- }
151
- if (rightMatches.size > 0 ) {
152
- leftMatches = new CompactBuffer [Row ]()
153
- val leftMatch = leftKey.copy()
154
- while (ordering.compare(leftKey, leftMatch) == 0 && leftElement != null ) {
155
- leftMatches += leftElement
156
- fetchLeft()
144
+ if (stop) {
145
+ stop = false
146
+ while (! stop && rightElement != null ) {
147
+ rightMatches += rightElement
148
+ fetchRight()
149
+ // exit loop when run out of right matches
150
+ stop = ordering.compare(leftKey, rightKey) != 0
151
+ }
152
+ if (rightMatches.size > 0 ) {
153
+ rightPosition = 0
154
+ matchKey = leftKey
157
155
}
158
- }
159
-
160
- if (leftMatches != null ) {
161
- leftPosition = 0
162
- rightPosition = 0
163
156
}
164
157
}
165
- leftPosition > - 1
158
+ rightMatches != null && rightMatches.size > 0
166
159
}
167
160
}
168
161
}
0 commit comments