Skip to content

Commit 67751e2

Browse files
mccheahsquito
authored andcommitted
[SPARK-29072][CORE] Put back usage of TimeTrackingOutputStream for UnsafeShuffleWriter and SortShuffleWriter
### What changes were proposed in this pull request? The previous refactors of the shuffle writers using the shuffle writer plugin resulted in shuffle write metric updates - particularly write times - being lost in particular situations. This patch restores the lost metric updates. ### Why are the changes needed? This fixes a regression. I'm pretty sure that without this, the Spark UI will lose shuffle write time information. ### Does this PR introduce any user-facing change? No change from Spark 2.4. Without this, there would be a user-facing bug in Spark 3.0. ### How was this patch tested? Existing unit tests. Closes #25780 from mccheah/fix-write-metrics. Authored-by: mcheah <mcheah@palantir.com> Signed-off-by: Imran Rashid <irashid@cloudera.com>
1 parent 6297287 commit 67751e2

File tree

2 files changed

+14
-3
lines changed

2 files changed

+14
-3
lines changed

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import org.apache.spark.shuffle.api.SingleSpillShuffleMapOutputWriter;
5858
import org.apache.spark.shuffle.api.WritableByteChannelWrapper;
5959
import org.apache.spark.storage.BlockManager;
60+
import org.apache.spark.storage.TimeTrackingOutputStream;
6061
import org.apache.spark.unsafe.Platform;
6162
import org.apache.spark.util.Utils;
6263

@@ -382,6 +383,7 @@ private void mergeSpillsWithFileStream(
382383
ShufflePartitionWriter writer = mapWriter.getPartitionWriter(partition);
383384
OutputStream partitionOutput = writer.openStream();
384385
try {
386+
partitionOutput = new TimeTrackingOutputStream(writeMetrics, partitionOutput);
385387
partitionOutput = blockManager.serializerManager().wrapForEncryption(partitionOutput);
386388
if (compressionCodec != null) {
387389
partitionOutput = compressionCodec.compressedOutputStream(partitionOutput);

core/src/main/scala/org/apache/spark/shuffle/ShufflePartitionPairsWriter.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.io.{Closeable, IOException, OutputStream}
2121

2222
import org.apache.spark.serializer.{SerializationStream, SerializerInstance, SerializerManager}
2323
import org.apache.spark.shuffle.api.ShufflePartitionWriter
24-
import org.apache.spark.storage.BlockId
24+
import org.apache.spark.storage.{BlockId, TimeTrackingOutputStream}
2525
import org.apache.spark.util.Utils
2626
import org.apache.spark.util.collection.PairsWriter
2727

@@ -39,6 +39,7 @@ private[spark] class ShufflePartitionPairsWriter(
3939

4040
private var isClosed = false
4141
private var partitionStream: OutputStream = _
42+
private var timeTrackingStream: OutputStream = _
4243
private var wrappedStream: OutputStream = _
4344
private var objOut: SerializationStream = _
4445
private var numRecordsWritten = 0
@@ -59,7 +60,8 @@ private[spark] class ShufflePartitionPairsWriter(
5960
private def open(): Unit = {
6061
try {
6162
partitionStream = partitionWriter.openStream
62-
wrappedStream = serializerManager.wrapStream(blockId, partitionStream)
63+
timeTrackingStream = new TimeTrackingOutputStream(writeMetrics, partitionStream)
64+
wrappedStream = serializerManager.wrapStream(blockId, timeTrackingStream)
6365
objOut = serializerInstance.serializeStream(wrappedStream)
6466
} catch {
6567
case e: Exception =>
@@ -78,6 +80,7 @@ private[spark] class ShufflePartitionPairsWriter(
7880
// Setting these to null will prevent the underlying streams from being closed twice
7981
// just in case any stream's close() implementation is not idempotent.
8082
wrappedStream = null
83+
timeTrackingStream = null
8184
partitionStream = null
8285
} {
8386
// Normally closing objOut would close the inner streams as well, but just in case there
@@ -86,9 +89,15 @@ private[spark] class ShufflePartitionPairsWriter(
8689
wrappedStream = closeIfNonNull(wrappedStream)
8790
// Same as above - if wrappedStream closes then assume it closes underlying
8891
// partitionStream and don't close again in the finally
92+
timeTrackingStream = null
8993
partitionStream = null
9094
} {
91-
partitionStream = closeIfNonNull(partitionStream)
95+
Utils.tryWithSafeFinally {
96+
timeTrackingStream = closeIfNonNull(timeTrackingStream)
97+
partitionStream = null
98+
} {
99+
partitionStream = closeIfNonNull(partitionStream)
100+
}
92101
}
93102
}
94103
updateBytesWritten()

0 commit comments

Comments
 (0)