Skip to content

Commit f8f95f3

Browse files
committed
Address comments
1 parent c125a14 commit f8f95f3

File tree

3 files changed

+10
-12
lines changed

3 files changed

+10
-12
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -268,8 +268,7 @@ private long[] mergeSpills(SpillInfo[] spills) throws IOException {
268268
mapId,
269269
taskContext.taskAttemptId(),
270270
partitioner.numPartitions());
271-
mapWriter.commitAllPartitions();
272-
return new long[partitioner.numPartitions()];
271+
return mapWriter.commitAllPartitions();
273272
} else if (spills.length == 1) {
274273
Optional<SingleSpillShuffleMapOutputWriter> maybeSingleFileWriter =
275274
shuffleExecutorComponents.createSingleFileMapOutputWriter(
@@ -379,7 +378,7 @@ private void mergeSpillsWithFileStream(
379378
inputBufferSizeInBytes);
380379
}
381380
for (int partition = 0; partition < numPartitions; partition++) {
382-
boolean copyThrewExecption = true;
381+
boolean copyThrewException = true;
383382
ShufflePartitionWriter writer = mapWriter.getPartitionWriter(partition);
384383
OutputStream partitionOutput = writer.openStream();
385384
try {
@@ -409,9 +408,9 @@ private void mergeSpillsWithFileStream(
409408
}
410409
}
411410
}
412-
copyThrewExecption = false;
411+
copyThrewException = false;
413412
} finally {
414-
Closeables.close(partitionOutput, copyThrewExecption);
413+
Closeables.close(partitionOutput, copyThrewException);
415414
}
416415
long numBytesWritten = writer.getNumBytesWritten();
417416
writeMetrics.incBytesWritten(numBytesWritten);
@@ -458,10 +457,10 @@ private void mergeSpillsWithTransferTo(
458457
final FileChannel spillInputChannel = spillInputChannels[i];
459458
final long writeStartTime = System.nanoTime();
460459
Utils.copyFileStreamNIO(
461-
spillInputChannel,
462-
resolvedChannel.channel(),
463-
spillInputChannelPositions[i],
464-
partitionLengthInSpill);
460+
spillInputChannel,
461+
resolvedChannel.channel(),
462+
spillInputChannelPositions[i],
463+
partitionLengthInSpill);
465464
copyThrewExecption = false;
466465
spillInputChannelPositions[i] += partitionLengthInSpill;
467466
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);

core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,8 +102,7 @@ public long[] commitAllPartitions() throws IOException {
102102
// exception if it is incorrect. The position will not be increased to the expected length
103103
// after calling transferTo in kernel version 2.6.32. This issue is described at
104104
// https://bugs.openjdk.java.net/browse/JDK-7052359 and SPARK-3948.
105-
if (outputFileChannel != null
106-
&& outputFileChannel.position() != bytesWrittenToMergedFile) {
105+
if (outputFileChannel != null && outputFileChannel.position() != bytesWrittenToMergedFile) {
107106
throw new IOException(
108107
"Current position " + outputFileChannel.position() + " does not equal expected " +
109108
"position " + bytesWrittenToMergedFile + " after transferTo. Please check your " +

core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public void setUp() throws IOException {
137137

138138
when(shuffleBlockResolver.getDataFile(anyInt(), anyInt())).thenReturn(mergedOutputFile);
139139

140-
Answer renameTempAnswer = invocationOnMock -> {
140+
Answer<?> renameTempAnswer = invocationOnMock -> {
141141
partitionSizesInMergedFile = (long[]) invocationOnMock.getArguments()[2];
142142
File tmp = (File) invocationOnMock.getArguments()[3];
143143
if (!mergedOutputFile.delete()) {

0 commit comments

Comments
 (0)