Skip to content

Commit

Permalink
SPARK-2566. Update ShuffleWriteMetrics incrementally
Browse files Browse the repository at this point in the history
I haven't tested this out on a cluster yet, but wanted to make sure the approach (passing ShuffleWriteMetrics down to DiskBlockObjectWriter) was ok

Author: Sandy Ryza <sandy@cloudera.com>

Closes apache#1481 from sryza/sandy-spark-2566 and squashes the following commits:

8090d88 [Sandy Ryza] Fix ExternalSorter
b2a62ed [Sandy Ryza] Fix more test failures
8be6218 [Sandy Ryza] Fix test failures and mark a couple variables private
c5e68e5 [Sandy Ryza] SPARK-2566. Update ShuffleWriteMetrics incrementally
  • Loading branch information
sryza authored and pwendell committed Aug 6, 2014
1 parent d614967 commit 4e98236
Show file tree
Hide file tree
Showing 11 changed files with 164 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,10 @@ class ShuffleWriteMetrics extends Serializable {
/**
* Number of bytes written for the shuffle by this task
*/
var shuffleBytesWritten: Long = _
@volatile var shuffleBytesWritten: Long = _

/**
* Time the task spent blocking on writes to disk or buffer cache, in nanoseconds
*/
var shuffleWriteTime: Long = _
@volatile var shuffleWriteTime: Long = _
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,14 @@ private[spark] class HashShuffleWriter[K, V](
// we don't try deleting files, etc twice.
private var stopping = false

private val writeMetrics = new ShuffleWriteMetrics()
metrics.shuffleWriteMetrics = Some(writeMetrics)

private val blockManager = SparkEnv.get.blockManager
private val shuffleBlockManager = blockManager.shuffleBlockManager
private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null))
private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser)
private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
writeMetrics)

/** Write a bunch of records to this task's output */
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
Expand Down Expand Up @@ -99,22 +103,12 @@ private[spark] class HashShuffleWriter[K, V](

private def commitWritesAndBuildStatus(): MapStatus = {
// Commit the writes. Get the size of each bucket block (total block size).
var totalBytes = 0L
var totalTime = 0L
val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
writer.commitAndClose()
val size = writer.fileSegment().length
totalBytes += size
totalTime += writer.timeWriting()
MapOutputTracker.compressSize(size)
}

// Update shuffle metrics.
val shuffleMetrics = new ShuffleWriteMetrics
shuffleMetrics.shuffleBytesWritten = totalBytes
shuffleMetrics.shuffleWriteTime = totalTime
metrics.shuffleWriteMetrics = Some(shuffleMetrics)

new MapStatus(blockManager.blockManagerId, compressedSizes)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ private[spark] class SortShuffleWriter[K, V, C](

private var mapStatus: MapStatus = null

private val writeMetrics = new ShuffleWriteMetrics()
context.taskMetrics.shuffleWriteMetrics = Some(writeMetrics)

/** Write a bunch of records to this task's output */
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
// Get an iterator with the elements for each partition ID
Expand Down Expand Up @@ -84,32 +87,23 @@ private[spark] class SortShuffleWriter[K, V, C](
val offsets = new Array[Long](numPartitions + 1)
val lengths = new Array[Long](numPartitions)

// Statistics
var totalBytes = 0L
var totalTime = 0L

for ((id, elements) <- partitions) {
if (elements.hasNext) {
val writer = blockManager.getDiskWriter(blockId, outputFile, ser, fileBufferSize)
val writer = blockManager.getDiskWriter(blockId, outputFile, ser, fileBufferSize,
writeMetrics)
for (elem <- elements) {
writer.write(elem)
}
writer.commitAndClose()
val segment = writer.fileSegment()
offsets(id + 1) = segment.offset + segment.length
lengths(id) = segment.length
totalTime += writer.timeWriting()
totalBytes += segment.length
} else {
// The partition is empty; don't create a new writer to avoid writing headers, etc
offsets(id + 1) = offsets(id)
}
}

val shuffleMetrics = new ShuffleWriteMetrics
shuffleMetrics.shuffleBytesWritten = totalBytes
shuffleMetrics.shuffleWriteTime = totalTime
context.taskMetrics.shuffleWriteMetrics = Some(shuffleMetrics)
context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled
context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled

Expand Down
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props}
import sun.nio.ch.DirectBuffer

import org.apache.spark._
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleWriteMetrics}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
import org.apache.spark.serializer.Serializer
Expand Down Expand Up @@ -562,17 +562,19 @@ private[spark] class BlockManager(

/**
* A short circuited method to get a block writer that can write data directly to disk.
* The Block will be appended to the File specified by filename. This is currently used for
* writing shuffle files out. Callers should handle error cases.
* The Block will be appended to the File specified by filename. Callers should handle error
* cases.
*/
def getDiskWriter(
blockId: BlockId,
file: File,
serializer: Serializer,
bufferSize: Int): BlockObjectWriter = {
bufferSize: Int,
writeMetrics: ShuffleWriteMetrics): BlockObjectWriter = {
val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _)
val syncWrites = conf.getBoolean("spark.shuffle.sync", false)
new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites)
new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites,
writeMetrics)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio.channels.FileChannel

import org.apache.spark.Logging
import org.apache.spark.serializer.{SerializationStream, Serializer}
import org.apache.spark.executor.ShuffleWriteMetrics

/**
* An interface for writing JVM objects to some underlying storage. This interface allows
Expand Down Expand Up @@ -60,41 +61,26 @@ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {
* This is only valid after commitAndClose() has been called.
*/
def fileSegment(): FileSegment

/**
* Cumulative time spent performing blocking writes, in ns.
*/
def timeWriting(): Long

/**
* Number of bytes written so far
*/
def bytesWritten: Long
}

/** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */
/**
* BlockObjectWriter which writes directly to a file on disk. Appends to the given file.
* The given write metrics will be updated incrementally, but will not necessarily be current until
* commitAndClose is called.
*/
private[spark] class DiskBlockObjectWriter(
blockId: BlockId,
file: File,
serializer: Serializer,
bufferSize: Int,
compressStream: OutputStream => OutputStream,
syncWrites: Boolean)
syncWrites: Boolean,
writeMetrics: ShuffleWriteMetrics)
extends BlockObjectWriter(blockId)
with Logging
{

/** Intercepts write calls and tracks total time spent writing. Not thread safe. */
private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream {
def timeWriting = _timeWriting
private var _timeWriting = 0L

private def callWithTiming(f: => Unit) = {
val start = System.nanoTime()
f
_timeWriting += (System.nanoTime() - start)
}

def write(i: Int): Unit = callWithTiming(out.write(i))
override def write(b: Array[Byte]) = callWithTiming(out.write(b))
override def write(b: Array[Byte], off: Int, len: Int) = callWithTiming(out.write(b, off, len))
Expand All @@ -111,7 +97,11 @@ private[spark] class DiskBlockObjectWriter(
private val initialPosition = file.length()
private var finalPosition: Long = -1
private var initialized = false
private var _timeWriting = 0L

/** Calling channel.position() to update the write metrics can be a little bit expensive, so we
* only call it every N writes */
private var writesSinceMetricsUpdate = 0
private var lastPosition = initialPosition

override def open(): BlockObjectWriter = {
fos = new FileOutputStream(file, true)
Expand All @@ -128,14 +118,11 @@ private[spark] class DiskBlockObjectWriter(
if (syncWrites) {
// Force outstanding writes to disk and track how long it takes
objOut.flush()
val start = System.nanoTime()
fos.getFD.sync()
_timeWriting += System.nanoTime() - start
def sync = fos.getFD.sync()
callWithTiming(sync)
}
objOut.close()

_timeWriting += ts.timeWriting

channel = null
bs = null
fos = null
Expand All @@ -153,6 +140,7 @@ private[spark] class DiskBlockObjectWriter(
// serializer stream and the lower level stream.
objOut.flush()
bs.flush()
updateBytesWritten()
close()
}
finalPosition = file.length()
Expand All @@ -162,6 +150,8 @@ private[spark] class DiskBlockObjectWriter(
// truncating the file to its initial position.
override def revertPartialWritesAndClose() {
try {
writeMetrics.shuffleBytesWritten -= (lastPosition - initialPosition)

if (initialized) {
objOut.flush()
bs.flush()
Expand All @@ -184,19 +174,36 @@ private[spark] class DiskBlockObjectWriter(
if (!initialized) {
open()
}

objOut.writeObject(value)

if (writesSinceMetricsUpdate == 32) {
writesSinceMetricsUpdate = 0
updateBytesWritten()
} else {
writesSinceMetricsUpdate += 1
}
}

override def fileSegment(): FileSegment = {
new FileSegment(file, initialPosition, bytesWritten)
new FileSegment(file, initialPosition, finalPosition - initialPosition)
}

// Only valid if called after close()
override def timeWriting() = _timeWriting
private def updateBytesWritten() {
val pos = channel.position()
writeMetrics.shuffleBytesWritten += (pos - lastPosition)
lastPosition = pos
}

private def callWithTiming(f: => Unit) = {
val start = System.nanoTime()
f
writeMetrics.shuffleWriteTime += (System.nanoTime() - start)
}

// Only valid if called after commit()
override def bytesWritten: Long = {
assert(finalPosition != -1, "bytesWritten is only valid after successful commit()")
finalPosition - initialPosition
// For testing
private[spark] def flush() {
objOut.flush()
bs.flush()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.storage.ShuffleBlockManager.ShuffleFileGroup
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
import org.apache.spark.shuffle.sort.SortShuffleManager
import org.apache.spark.executor.ShuffleWriteMetrics

/** A group of writers for a ShuffleMapTask, one writer per reducer. */
private[spark] trait ShuffleWriterGroup {
Expand Down Expand Up @@ -111,7 +112,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
* Get a ShuffleWriterGroup for the given map task, which will register it as complete
* when the writers are closed successfully
*/
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer) = {
def forMapTask(shuffleId: Int, mapId: Int, numBuckets: Int, serializer: Serializer,
writeMetrics: ShuffleWriteMetrics) = {
new ShuffleWriterGroup {
shuffleStates.putIfAbsent(shuffleId, new ShuffleState(numBuckets))
private val shuffleState = shuffleStates(shuffleId)
Expand All @@ -121,7 +123,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
fileGroup = getUnusedFileGroup()
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize)
blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize,
writeMetrics)
}
} else {
Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
Expand All @@ -136,7 +139,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
logWarning(s"Failed to remove existing shuffle file $blockFile")
}
}
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize, writeMetrics)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.serializer.{DeserializationStream, Serializer}
import org.apache.spark.storage.{BlockId, BlockManager}
import org.apache.spark.util.collection.ExternalAppendOnlyMap.HashComparator
import org.apache.spark.executor.ShuffleWriteMetrics

/**
* :: DeveloperApi ::
Expand Down Expand Up @@ -102,6 +103,10 @@ class ExternalAppendOnlyMap[K, V, C](
private var _diskBytesSpilled = 0L

private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024

// Write metrics for current spill
private var curWriteMetrics: ShuffleWriteMetrics = _

private val keyComparator = new HashComparator[K]
private val ser = serializer.newInstance()

Expand Down Expand Up @@ -172,7 +177,9 @@ class ExternalAppendOnlyMap[K, V, C](
logInfo("Thread %d spilling in-memory map of %d MB to disk (%d time%s so far)"
.format(threadId, mapSize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else ""))
val (blockId, file) = diskBlockManager.createTempBlock()
var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize)
curWriteMetrics = new ShuffleWriteMetrics()
var writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize,
curWriteMetrics)
var objectsWritten = 0

// List of batch sizes (bytes) in the order they are written to disk
Expand All @@ -183,9 +190,8 @@ class ExternalAppendOnlyMap[K, V, C](
val w = writer
writer = null
w.commitAndClose()
val bytesWritten = w.bytesWritten
batchSizes.append(bytesWritten)
_diskBytesSpilled += bytesWritten
_diskBytesSpilled += curWriteMetrics.shuffleBytesWritten
batchSizes.append(curWriteMetrics.shuffleBytesWritten)
objectsWritten = 0
}

Expand All @@ -199,7 +205,9 @@ class ExternalAppendOnlyMap[K, V, C](

if (objectsWritten == serializerBatchSize) {
flush()
writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize)
curWriteMetrics = new ShuffleWriteMetrics()
writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize,
curWriteMetrics)
}
}
if (objectsWritten > 0) {
Expand Down
Loading

0 comments on commit 4e98236

Please sign in to comment.