Skip to content

Fix merge conflicts in get-or-create-metrics PR #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incShuffleWriteTime(System.nanoTime() - openStartTime);
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);

while (records.hasNext()) {
final Product2<K, V> record = records.next();
Expand Down Expand Up @@ -202,7 +202,7 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
threwException = false;
} finally {
Closeables.close(out, threwException);
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
}
partitionWriters = null;
return lengths;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,8 @@ private void writeSortedFile(boolean isLastFile) throws IOException {
// Note that we intentionally ignore the value of `writeMetricsToUse.shuffleWriteTime()`.
// Consistent with ExternalSorter, we do not count this IO towards shuffle write time.
// This means that this IO time is not accounted for anywhere; SPARK-3577 will fix this.
writeMetrics.incShuffleRecordsWritten(writeMetricsToUse.shuffleRecordsWritten());
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.shuffleBytesWritten());
writeMetrics.incRecordsWritten(writeMetricsToUse.recordsWritten());
taskContext.taskMetrics().incDiskBytesSpilled(writeMetricsToUse.bytesWritten());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOExcepti
// final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
// to be counted as shuffle write, but this will lead to double-counting of the final
// SpillInfo's bytes.
writeMetrics.decShuffleBytesWritten(spills[spills.length - 1].file.length());
writeMetrics.incShuffleBytesWritten(outputFile.length());
writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
writeMetrics.incBytesWritten(outputFile.length());
return partitionLengths;
}
} catch (IOException e) {
Expand Down Expand Up @@ -410,7 +410,7 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
spillInputChannelPositions[i] += actualBytesTransferred;
bytesToTransfer -= actualBytesTransferred;
}
writeMetrics.incShuffleWriteTime(System.nanoTime() - writeStartTime);
writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
bytesWrittenToMergedFile += partitionLengthInSpill;
partitionLengths[partition] += partitionLengthInSpill;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,34 +42,34 @@ public TimeTrackingOutputStream(ShuffleWriteMetrics writeMetrics, OutputStream o
public void write(int b) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void write(byte[] b) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
final long startTime = System.nanoTime();
outputStream.write(b, off, len);
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void flush() throws IOException {
final long startTime = System.nanoTime();
outputStream.flush();
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}

@Override
public void close() throws IOException {
final long startTime = System.nanoTime();
outputStream.close();
writeMetrics.incShuffleWriteTime(System.nanoTime() - startTime);
writeMetrics.incWriteTime(System.nanoTime() - startTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,39 @@ import org.apache.spark.annotation.DeveloperApi
*/
@DeveloperApi
class ShuffleWriteMetrics extends Serializable {

/**
* Number of bytes written for the shuffle by this task
*/
@volatile private var _shuffleBytesWritten: Long = _
def shuffleBytesWritten: Long = _shuffleBytesWritten
private[spark] def incShuffleBytesWritten(value: Long) = _shuffleBytesWritten += value
private[spark] def decShuffleBytesWritten(value: Long) = _shuffleBytesWritten -= value
@volatile private var _bytesWritten: Long = _
def bytesWritten: Long = _bytesWritten
private[spark] def incBytesWritten(value: Long) = _bytesWritten += value
private[spark] def decBytesWritten(value: Long) = _bytesWritten -= value

/**
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
*/
@volatile private var _shuffleWriteTime: Long = _
def shuffleWriteTime: Long = _shuffleWriteTime
private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value
private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value
@volatile private var _writeTime: Long = _
def writeTime: Long = _writeTime
private[spark] def incWriteTime(value: Long) = _writeTime += value
private[spark] def decWriteTime(value: Long) = _writeTime -= value

/**
* Total number of records written to the shuffle by this task
*/
@volatile private var _shuffleRecordsWritten: Long = _
def shuffleRecordsWritten: Long = _shuffleRecordsWritten
private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value
private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value
private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value
@volatile private var _recordsWritten: Long = _
def recordsWritten: Long = _recordsWritten
private[spark] def incRecordsWritten(value: Long) = _recordsWritten += value
private[spark] def decRecordsWritten(value: Long) = _recordsWritten -= value
private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value

// Legacy methods for backward compatibility.
// TODO: remove these once we make this class private.
@deprecated("use bytesWritten instead", "2.0.0")
def shuffleBytesWritten: Long = bytesWritten
@deprecated("use writeTime instead", "2.0.0")
def shuffleWriteTime: Long = writeTime
@deprecated("use recordsWritten instead", "2.0.0")
def shuffleRecordsWritten: Long = recordsWritten

}
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ class StatsReportListener extends SparkListener with Logging {

// Shuffle write
showBytesDistribution("shuffle bytes written:",
(_, metric) => metric.shuffleWriteMetrics.map(_.shuffleBytesWritten), taskInfoMetrics)
(_, metric) => metric.shuffleWriteMetrics.map(_.bytesWritten), taskInfoMetrics)

// Fetch & I/O
showMillisDistribution("fetch wait time:",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf)
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, so should be included in the shuffle write time.
writeMetrics.incShuffleWriteTime(System.nanoTime - openStartTime)
writeMetrics.incWriteTime(System.nanoTime - openStartTime)

override def releaseWriters(success: Boolean) {
shuffleState.completedMapTasks.add(mapId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private[spark] class SortShuffleWriter[K, V, C](
if (sorter != null) {
val startTime = System.nanoTime()
sorter.stop()
writeMetrics.incShuffleWriteTime(System.nanoTime - startTime)
writeMetrics.incWriteTime(System.nanoTime - startTime)
sorter = null
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,9 +214,9 @@ private[v1] object AllStagesResource {
raw.shuffleWriteMetrics
}
def build: ShuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions(
writeBytes = submetricQuantiles(_.shuffleBytesWritten),
writeRecords = submetricQuantiles(_.shuffleRecordsWritten),
writeTime = submetricQuantiles(_.shuffleWriteTime)
writeBytes = submetricQuantiles(_.bytesWritten),
writeRecords = submetricQuantiles(_.recordsWritten),
writeTime = submetricQuantiles(_.writeTime)
)
}.metricOption

Expand Down Expand Up @@ -283,9 +283,9 @@ private[v1] object AllStagesResource {

def convertShuffleWriteMetrics(internal: InternalShuffleWriteMetrics): ShuffleWriteMetrics = {
new ShuffleWriteMetrics(
bytesWritten = internal.shuffleBytesWritten,
writeTime = internal.shuffleWriteTime,
recordsWritten = internal.shuffleRecordsWritten
bytesWritten = internal.bytesWritten,
writeTime = internal.writeTime,
recordsWritten = internal.recordsWritten
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ private[spark] class DiskBlockObjectWriter(
objOut.flush()
val start = System.nanoTime()
fos.getFD.sync()
writeMetrics.incShuffleWriteTime(System.nanoTime() - start)
writeMetrics.incWriteTime(System.nanoTime() - start)
}
} {
objOut.close()
Expand Down Expand Up @@ -132,7 +132,7 @@ private[spark] class DiskBlockObjectWriter(
close()
finalPosition = file.length()
// In certain compression codecs, more bytes are written after close() is called
writeMetrics.incShuffleBytesWritten(finalPosition - reportedPosition)
writeMetrics.incBytesWritten(finalPosition - reportedPosition)
} else {
finalPosition = file.length()
}
Expand All @@ -152,8 +152,8 @@ private[spark] class DiskBlockObjectWriter(
// truncating the file to its initial position.
try {
if (initialized) {
writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition)
writeMetrics.decShuffleRecordsWritten(numRecordsWritten)
writeMetrics.decBytesWritten(reportedPosition - initialPosition)
writeMetrics.decRecordsWritten(numRecordsWritten)
objOut.flush()
bs.flush()
close()
Expand Down Expand Up @@ -201,7 +201,7 @@ private[spark] class DiskBlockObjectWriter(
*/
def recordWritten(): Unit = {
numRecordsWritten += 1
writeMetrics.incShuffleRecordsWritten(1)
writeMetrics.incRecordsWritten(1)

if (numRecordsWritten % 32 == 0) {
updateBytesWritten()
Expand All @@ -226,7 +226,7 @@ private[spark] class DiskBlockObjectWriter(
*/
private def updateBytesWritten() {
val pos = channel.position()
writeMetrics.incShuffleBytesWritten(pos - reportedPosition)
writeMetrics.incBytesWritten(pos - reportedPosition)
reportedPosition = pos
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp
}
metrics.shuffleWriteMetrics.foreach { shuffleWrite =>
executorToShuffleWrite(eid) =
executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten
executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.bytesWritten
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -426,14 +426,14 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val execSummary = stageData.executorSummary.getOrElseUpdate(execId, new ExecutorSummary)

val shuffleWriteDelta =
(taskMetrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten).getOrElse(0L))
(taskMetrics.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.bytesWritten).getOrElse(0L))
stageData.shuffleWriteBytes += shuffleWriteDelta
execSummary.shuffleWrite += shuffleWriteDelta

val shuffleWriteRecordsDelta =
(taskMetrics.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleRecordsWritten).getOrElse(0L))
(taskMetrics.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L)
- oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.recordsWritten).getOrElse(0L))
stageData.shuffleWriteRecords += shuffleWriteRecordsDelta
execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta

Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -500,11 +500,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
getFormattedSizeQuantiles(shuffleReadRemoteSizes)

val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
metrics.get.shuffleWriteMetrics.map(_.bytesWritten).getOrElse(0L).toDouble
}

val shuffleWriteRecords = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L).toDouble
metrics.get.shuffleWriteMetrics.map(_.recordsWritten).getOrElse(0L).toDouble
}

val shuffleWriteQuantiles = <td>Shuffle Write Size / Records</td> +:
Expand Down Expand Up @@ -619,7 +619,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val shuffleReadTimeProportion = toProportion(shuffleReadTime)
val shuffleWriteTime =
(metricsOpt.flatMap(_.shuffleWriteMetrics
.map(_.shuffleWriteTime)).getOrElse(0L) / 1e6).toLong
.map(_.writeTime)).getOrElse(0L) / 1e6).toLong
val shuffleWriteTimeProportion = toProportion(shuffleWriteTime)

val serializationTime = metricsOpt.map(_.resultSerializationTime).getOrElse(0L)
Expand Down Expand Up @@ -930,13 +930,13 @@ private[ui] class TaskDataSource(
val shuffleReadRemoteReadable = remoteShuffleBytes.map(Utils.bytesToString).getOrElse("")

val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics)
val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten).getOrElse(0L)
val shuffleWriteSortable = maybeShuffleWrite.map(_.bytesWritten).getOrElse(0L)
val shuffleWriteReadable = maybeShuffleWrite
.map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("")
.map(m => s"${Utils.bytesToString(m.bytesWritten)}").getOrElse("")
val shuffleWriteRecords = maybeShuffleWrite
.map(_.shuffleRecordsWritten.toString).getOrElse("")
.map(_.recordsWritten.toString).getOrElse("")

val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime)
val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.writeTime)
val writeTimeSortable = maybeWriteTime.getOrElse(0L)
val writeTimeReadable = maybeWriteTime.map(t => t / (1000 * 1000)).map { ms =>
if (ms == 0) "" else UIUtils.formatDuration(ms)
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -727,10 +727,10 @@ private[spark] object JsonProtocol {
// TODO: Drop the redundant "Shuffle" since it's inconsistent with related classes.
Utils.jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson =>
val writeMetrics = metrics.registerShuffleWriteMetrics()
writeMetrics.incShuffleBytesWritten((writeJson \ "Shuffle Bytes Written").extract[Long])
writeMetrics.incShuffleRecordsWritten((writeJson \ "Shuffle Records Written")
writeMetrics.incBytesWritten((writeJson \ "Shuffle Bytes Written").extract[Long])
writeMetrics.incRecordsWritten((writeJson \ "Shuffle Records Written")
.extractOpt[Long].getOrElse(0L))
writeMetrics.incShuffleWriteTime((writeJson \ "Shuffle Write Time").extract[Long])
writeMetrics.incWriteTime((writeJson \ "Shuffle Write Time").extract[Long])
}

// Output metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ class ExternalAppendOnlyMap[K, V, C](
val w = writer
writer = null
w.commitAndClose()
_diskBytesSpilled += curWriteMetrics.shuffleBytesWritten
batchSizes.append(curWriteMetrics.shuffleBytesWritten)
_diskBytesSpilled += curWriteMetrics.bytesWritten
batchSizes.append(curWriteMetrics.bytesWritten)
objectsWritten = 0
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,8 @@ private[spark] class ExternalSorter[K, V, C](
val w = writer
writer = null
w.commitAndClose()
_diskBytesSpilled += spillMetrics.shuffleBytesWritten
batchSizes.append(spillMetrics.shuffleBytesWritten)
_diskBytesSpilled += spillMetrics.bytesWritten
batchSizes.append(spillMetrics.bytesWritten)
spillMetrics = null
objectsWritten = 0
}
Expand Down
Loading