Skip to content

Commit 9838df2

Browse files
xuanyuankingcloud-fan
authored andcommitted
[SPARK-21492][SQL][FOLLOW UP] Reimplement UnsafeExternalRowSorter in database style iterator
### What changes were proposed in this pull request? Reimplement the iterator in UnsafeExternalRowSorter in database style. This can be done by reusing the `RowIterator` in our code base. ### Why are the changes needed? During the job in #26164, after involving a var `isReleased` in `hasNext`, there's possible that `isReleased` is false when calling `hasNext`, but it becomes true before calling `next`. A safer way is using database-style iterator: `advanceNext` and `getRow`. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing UT. Closes #26229 from xuanyuanking/SPARK-21492-follow-up. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 9e77d48) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
1 parent b1ba6fa commit 9838df2

File tree

2 files changed

+26
-22
lines changed

2 files changed

+26
-22
lines changed

sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import java.io.IOException;
2121
import java.util.function.Supplier;
2222

23-
import scala.collection.AbstractIterator;
2423
import scala.collection.Iterator;
2524
import scala.math.Ordering;
2625

@@ -170,39 +169,40 @@ public void cleanupResources() {
170169
sorter.cleanupResources();
171170
}
172171

173-
public Iterator<UnsafeRow> sort() throws IOException {
172+
public Iterator<InternalRow> sort() throws IOException {
174173
try {
175174
final UnsafeSorterIterator sortedIterator = sorter.getSortedIterator();
176175
if (!sortedIterator.hasNext()) {
177176
// Since we won't ever call next() on an empty iterator, we need to clean up resources
178177
// here in order to prevent memory leaks.
179178
cleanupResources();
180179
}
181-
return new AbstractIterator<UnsafeRow>() {
180+
return new RowIterator() {
182181

183182
private final int numFields = schema.length();
184183
private UnsafeRow row = new UnsafeRow(numFields);
185184

186185
@Override
187-
public boolean hasNext() {
188-
return !isReleased && sortedIterator.hasNext();
189-
}
190-
191-
@Override
192-
public UnsafeRow next() {
186+
public boolean advanceNext() {
193187
try {
194-
sortedIterator.loadNext();
195-
row.pointTo(
196-
sortedIterator.getBaseObject(),
197-
sortedIterator.getBaseOffset(),
198-
sortedIterator.getRecordLength());
199-
if (!hasNext()) {
200-
UnsafeRow copy = row.copy(); // so that we don't have dangling pointers to freed page
201-
row = null; // so that we don't keep references to the base object
202-
cleanupResources();
203-
return copy;
188+
if (!isReleased && sortedIterator.hasNext()) {
189+
sortedIterator.loadNext();
190+
row.pointTo(
191+
sortedIterator.getBaseObject(),
192+
sortedIterator.getBaseOffset(),
193+
sortedIterator.getRecordLength());
194+
// Here is the initial bug fix in SPARK-9364: the bug fix of use-after-free bug
195+
// when returning the last row from an iterator. For example, in
196+
// [[GroupedIterator]], we still use the last row after traversing the iterator
197+
// in `fetchNextGroupIterator`
198+
if (!sortedIterator.hasNext()) {
199+
row = row.copy(); // so that we don't have dangling pointers to freed page
200+
cleanupResources();
201+
}
202+
return true;
204203
} else {
205-
return row;
204+
row = null; // so that we don't keep references to the base object
205+
return false;
206206
}
207207
} catch (IOException e) {
208208
cleanupResources();
@@ -212,14 +212,18 @@ public UnsafeRow next() {
212212
}
213213
throw new RuntimeException("Exception should have been re-thrown in next()");
214214
}
215-
};
215+
216+
@Override
217+
public UnsafeRow getRow() { return row; }
218+
219+
}.toScala();
216220
} catch (IOException e) {
217221
cleanupResources();
218222
throw e;
219223
}
220224
}
221225

222-
public Iterator<UnsafeRow> sort(Iterator<UnsafeRow> inputIterator) throws IOException {
226+
public Iterator<InternalRow> sort(Iterator<UnsafeRow> inputIterator) throws IOException {
223227
while (inputIterator.hasNext()) {
224228
insertRow(inputIterator.next());
225229
}

0 commit comments

Comments
 (0)