-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-7884] Move block deserialization from BlockStoreShuffleFetcher to ShuffleReader #6423
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
19135f2
b70c945
208b7a5
7c8f73e
01e8721
7e8e0fe
f93841e
28f8085
7eedd1d
5c30405
4abb855
f458489
7429a98
4ea1712
a011bfa
f98a1b9
5186da0
290f1eb
d0a1b39
8b0632c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,23 +17,23 @@ | |
|
||
package org.apache.spark.storage | ||
|
||
import java.io.InputStream | ||
import java.util.concurrent.LinkedBlockingQueue | ||
|
||
import scala.collection.mutable.{ArrayBuffer, HashSet, Queue} | ||
import scala.util.{Failure, Try} | ||
|
||
import org.apache.spark.{Logging, TaskContext} | ||
import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient} | ||
import org.apache.spark.network.buffer.ManagedBuffer | ||
import org.apache.spark.serializer.{SerializerInstance, Serializer} | ||
import org.apache.spark.util.{CompletionIterator, Utils} | ||
import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient} | ||
import org.apache.spark.util.Utils | ||
|
||
/** | ||
* An iterator that fetches multiple blocks. For local blocks, it fetches from the local block | ||
* manager. For remote blocks, it fetches them using the provided BlockTransferService. | ||
* | ||
* This creates an iterator of (BlockID, values) tuples so the caller can handle blocks in a | ||
* pipelined fashion as they are received. | ||
* This creates an iterator of (BlockID, Try[InputStream]) tuples so the caller can handle blocks | ||
* in a pipelined fashion as they are received. | ||
* | ||
* The implementation throttles the remote fetches to they don't exceed maxBytesInFlight to avoid | ||
* using too much memory. | ||
|
@@ -44,7 +44,6 @@ import org.apache.spark.util.{CompletionIterator, Utils} | |
* @param blocksByAddress list of blocks to fetch grouped by the [[BlockManagerId]]. | ||
* For each block we also require the size (in bytes as a long field) in | ||
* order to throttle the memory usage. | ||
* @param serializer serializer used to deserialize the data. | ||
* @param maxBytesInFlight max size (in bytes) of remote blocks to fetch at any given point. | ||
*/ | ||
private[spark] | ||
|
@@ -53,9 +52,8 @@ final class ShuffleBlockFetcherIterator( | |
shuffleClient: ShuffleClient, | ||
blockManager: BlockManager, | ||
blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])], | ||
serializer: Serializer, | ||
maxBytesInFlight: Long) | ||
extends Iterator[(BlockId, Try[Iterator[Any]])] with Logging { | ||
extends Iterator[(BlockId, Try[InputStream])] with Logging { | ||
|
||
import ShuffleBlockFetcherIterator._ | ||
|
||
|
@@ -83,7 +81,7 @@ final class ShuffleBlockFetcherIterator( | |
|
||
/** | ||
* A queue to hold our results. This turns the asynchronous model provided by | ||
* [[BlockTransferService]] into a synchronous model (iterator). | ||
* [[org.apache.spark.network.BlockTransferService]] into a synchronous model (iterator). | ||
*/ | ||
private[this] val results = new LinkedBlockingQueue[FetchResult] | ||
|
||
|
@@ -102,9 +100,7 @@ final class ShuffleBlockFetcherIterator( | |
/** Current bytes in flight from our requests */ | ||
private[this] var bytesInFlight = 0L | ||
|
||
private[this] val shuffleMetrics = context.taskMetrics.createShuffleReadMetricsForDependency() | ||
|
||
private[this] val serializerInstance: SerializerInstance = serializer.newInstance() | ||
private[this] val shuffleMetrics = context.taskMetrics().createShuffleReadMetricsForDependency() | ||
|
||
/** | ||
* Whether the iterator is still active. If isZombie is true, the callback interface will no | ||
|
@@ -114,17 +110,23 @@ final class ShuffleBlockFetcherIterator( | |
|
||
initialize() | ||
|
||
/** | ||
* Mark the iterator as zombie, and release all buffers that haven't been deserialized yet. | ||
*/ | ||
private[this] def cleanup() { | ||
isZombie = true | ||
// Decrements the buffer reference count. | ||
// The currentResult is set to null to prevent releasing the buffer again on cleanup() | ||
private[storage] def releaseCurrentResultBuffer(): Unit = { | ||
// Release the current buffer if necessary | ||
currentResult match { | ||
case SuccessFetchResult(_, _, buf) => buf.release() | ||
case _ => | ||
} | ||
currentResult = null | ||
} | ||
|
||
/** | ||
* Mark the iterator as zombie, and release all buffers that haven't been deserialized yet. | ||
*/ | ||
private[this] def cleanup() { | ||
isZombie = true | ||
releaseCurrentResultBuffer() | ||
// Release buffers in the results queue | ||
val iter = results.iterator() | ||
while (iter.hasNext) { | ||
|
@@ -272,7 +274,13 @@ final class ShuffleBlockFetcherIterator( | |
|
||
override def hasNext: Boolean = numBlocksProcessed < numBlocksToFetch | ||
|
||
override def next(): (BlockId, Try[Iterator[Any]]) = { | ||
/** | ||
* Fetches the next (BlockId, Try[InputStream]). If a task fails, the ManagedBuffers | ||
* underlying each InputStream will be freed by the cleanup() method registered with the | ||
* TaskCompletionListener. However, callers should close() these InputStreams | ||
* as soon as they are no longer needed, in order to release memory as early as possible. | ||
*/ | ||
override def next(): (BlockId, Try[InputStream]) = { | ||
numBlocksProcessed += 1 | ||
val startFetchWait = System.currentTimeMillis() | ||
currentResult = results.take() | ||
|
@@ -290,29 +298,56 @@ final class ShuffleBlockFetcherIterator( | |
sendRequest(fetchRequests.dequeue()) | ||
} | ||
|
||
val iteratorTry: Try[Iterator[Any]] = result match { | ||
val iteratorTry: Try[InputStream] = result match { | ||
case FailureFetchResult(_, e) => | ||
Failure(e) | ||
case SuccessFetchResult(blockId, _, buf) => | ||
// There is a chance that createInputStream can fail (e.g. fetching a local file that does | ||
// not exist, SPARK-4085). In that case, we should propagate the right exception so | ||
// the scheduler gets a FetchFailedException. | ||
Try(buf.createInputStream()).map { is0 => | ||
val is = blockManager.wrapForCompression(blockId, is0) | ||
val iter = serializerInstance.deserializeStream(is).asKeyValueIterator | ||
CompletionIterator[Any, Iterator[Any]](iter, { | ||
// Once the iterator is exhausted, release the buffer and set currentResult to null | ||
// so we don't release it again in cleanup. | ||
currentResult = null | ||
buf.release() | ||
}) | ||
Try(buf.createInputStream()).map { inputStream => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might also treat this patch as an opportunity to revisit why we're using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is technically not your change -- but do you know what happens if the stream is not consumed in full in a task? Does that lead to memory leaks because close on the stream is never called? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need to worry about a memory leak when the task exits with success or failure since there is a cleanup method registered with the task context, e.g.
However, you're correct that there would be a memory (and file handle) leak, if the To be more defensive and potentially simplify the code, it might make sense to have a call to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @rxin Let me know if you'd like this change to be made to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We might consider deferring this change to a followup PR; want to file a JIRA issue so that we don't forget to eventually follow up? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will do. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
new BufferReleasingInputStream(inputStream, this) | ||
} | ||
} | ||
|
||
(result.blockId, iteratorTry) | ||
} | ||
} | ||
|
||
/** | ||
* Helper class that ensures a ManagedBuffer is release upon InputStream.close() | ||
* Note: the delegate parameter is private[storage] to make it available to tests. | ||
*/ | ||
private class BufferReleasingInputStream( | ||
private val delegate: InputStream, | ||
private val iterator: ShuffleBlockFetcherIterator) | ||
extends InputStream { | ||
private[this] var closed = false | ||
|
||
override def read(): Int = delegate.read() | ||
|
||
override def close(): Unit = { | ||
if (!closed) { | ||
delegate.close() | ||
iterator.releaseCurrentResultBuffer() | ||
closed = true | ||
} | ||
} | ||
|
||
override def available(): Int = delegate.available() | ||
|
||
override def mark(readlimit: Int): Unit = delegate.mark(readlimit) | ||
|
||
override def skip(n: Long): Long = delegate.skip(n) | ||
|
||
override def markSupported(): Boolean = delegate.markSupported() | ||
|
||
override def read(b: Array[Byte]): Int = delegate.read(b) | ||
|
||
override def read(b: Array[Byte], off: Int, len: Int): Int = delegate.read(b, off, len) | ||
|
||
override def reset(): Unit = delegate.reset() | ||
} | ||
|
||
private[storage] | ||
object ShuffleBlockFetcherIterator { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just started writing a big comment about why I thought this was unnecessary because I thought the
TaskCompletionListener
would take care of it for us ... and then I realized that this was necessary so that we release buffers as soon as finish with them as we fetch a bunch of blocks.would you mind adding a small doc here that callers of this class should always be sure to wrap the results in a
CompletionIterator
which closes theInputStream
, to be sure buffers are released as soon as possible? Would help callers and future readers of this code