-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-3796] Create external service which can serve shuffle files #3001
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
5ea4df6
46a70bf
5483e96
2f70c0c
c8d1ac3
56caa50
7fe51d5
3d62679
9883918
fd3928b
705748f
4d1f8c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,16 +20,16 @@ package org.apache.spark.network | |
import java.io.Closeable | ||
import java.nio.ByteBuffer | ||
|
||
import scala.concurrent.{Await, Future} | ||
import scala.concurrent.{Promise, Await, Future} | ||
import scala.concurrent.duration.Duration | ||
|
||
import org.apache.spark.Logging | ||
import org.apache.spark.network.buffer.{NioManagedBuffer, ManagedBuffer} | ||
import org.apache.spark.storage.{BlockId, StorageLevel} | ||
import org.apache.spark.util.Utils | ||
import org.apache.spark.network.shuffle.{ShuffleClient, BlockFetchingListener} | ||
import org.apache.spark.storage.{BlockManagerId, BlockId, StorageLevel} | ||
|
||
private[spark] | ||
abstract class BlockTransferService extends Closeable with Logging { | ||
abstract class BlockTransferService extends ShuffleClient with Closeable with Logging { | ||
|
||
/** | ||
* Initialize the transfer service by giving it the BlockDataManager that can be used to fetch | ||
|
@@ -60,10 +60,11 @@ abstract class BlockTransferService extends Closeable with Logging { | |
* return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as | ||
* the data of a block is fetched, rather than waiting for all blocks to be fetched. | ||
*/ | ||
def fetchBlocks( | ||
hostName: String, | ||
override def fetchBlocks( | ||
host: String, | ||
port: Int, | ||
blockIds: Seq[String], | ||
execId: String, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. execId should be part of the connection establishment / registration and not part of fetchBlocks There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The tricky part here is that "execId" is actually part of the request. I am fetching Executor 6's blocks, while I am myself Executor 4. So there is no API that is exposed at a lower layer to transfer the execId. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ic - does each executor have its own path for shuffle files? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, each executor registers its ExecutorShuffleInfo, which includes its own localDirs (created by the Executor on initialization). |
||
blockIds: Array[String], | ||
listener: BlockFetchingListener): Unit | ||
|
||
/** | ||
|
@@ -81,43 +82,23 @@ abstract class BlockTransferService extends Closeable with Logging { | |
* | ||
* It is also only available after [[init]] is invoked. | ||
*/ | ||
def fetchBlockSync(hostName: String, port: Int, blockId: String): ManagedBuffer = { | ||
def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = { | ||
// A monitor for the thread to wait on. | ||
val lock = new Object | ||
@volatile var result: Either[ManagedBuffer, Throwable] = null | ||
fetchBlocks(hostName, port, Seq(blockId), new BlockFetchingListener { | ||
override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = { | ||
lock.synchronized { | ||
result = Right(exception) | ||
lock.notify() | ||
val result = Promise[ManagedBuffer]() | ||
fetchBlocks(host, port, execId, Array(blockId), | ||
new BlockFetchingListener { | ||
override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = { | ||
result.failure(exception) | ||
} | ||
} | ||
override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { | ||
lock.synchronized { | ||
override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { | ||
val ret = ByteBuffer.allocate(data.size.toInt) | ||
ret.put(data.nioByteBuffer()) | ||
ret.flip() | ||
result = Left(new NioManagedBuffer(ret)) | ||
lock.notify() | ||
result.success(new NioManagedBuffer(ret)) | ||
} | ||
} | ||
}) | ||
}) | ||
|
||
// Sleep until result is no longer null | ||
lock.synchronized { | ||
while (result == null) { | ||
try { | ||
lock.wait() | ||
} catch { | ||
case e: InterruptedException => | ||
} | ||
} | ||
} | ||
|
||
result match { | ||
case Left(data) => data | ||
case Right(e) => throw e | ||
} | ||
Await.result(result.future, Duration.Inf) | ||
} | ||
|
||
/** | ||
|
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,15 +17,15 @@ | |
|
||
package org.apache.spark.network.netty | ||
|
||
import scala.concurrent.{Promise, Future} | ||
import scala.concurrent.{Future, Promise} | ||
|
||
import org.apache.spark.SparkConf | ||
import org.apache.spark.network._ | ||
import org.apache.spark.network.buffer.ManagedBuffer | ||
import org.apache.spark.network.client.{RpcResponseCallback, TransportClient, TransportClientFactory} | ||
import org.apache.spark.network.netty.NettyMessages.UploadBlock | ||
import org.apache.spark.network.client.{RpcResponseCallback, TransportClientFactory} | ||
import org.apache.spark.network.netty.NettyMessages.{OpenBlocks, UploadBlock} | ||
import org.apache.spark.network.server._ | ||
import org.apache.spark.network.util.{ConfigProvider, TransportConf} | ||
import org.apache.spark.network.shuffle.{BlockFetchingListener, OneForOneBlockFetcher} | ||
import org.apache.spark.serializer.JavaSerializer | ||
import org.apache.spark.storage.{BlockId, StorageLevel} | ||
import org.apache.spark.util.Utils | ||
|
@@ -37,30 +37,29 @@ class NettyBlockTransferService(conf: SparkConf) extends BlockTransferService { | |
// TODO: Don't use Java serialization, use a more cross-version compatible serialization format. | ||
val serializer = new JavaSerializer(conf) | ||
|
||
// Create a TransportConfig using SparkConf. | ||
private[this] val transportConf = new TransportConf( | ||
new ConfigProvider { override def get(name: String) = conf.get(name) }) | ||
|
||
private[this] var transportContext: TransportContext = _ | ||
private[this] var server: TransportServer = _ | ||
private[this] var clientFactory: TransportClientFactory = _ | ||
|
||
override def init(blockDataManager: BlockDataManager): Unit = { | ||
val streamManager = new DefaultStreamManager | ||
val rpcHandler = new NettyBlockRpcServer(serializer, streamManager, blockDataManager) | ||
transportContext = new TransportContext(transportConf, streamManager, rpcHandler) | ||
val rpcHandler = new NettyBlockRpcServer(serializer, blockDataManager) | ||
transportContext = new TransportContext(SparkTransportConf.fromSparkConf(conf), rpcHandler) | ||
clientFactory = transportContext.createClientFactory() | ||
server = transportContext.createServer() | ||
logInfo("Server created on " + server.getPort) | ||
} | ||
|
||
override def fetchBlocks( | ||
hostname: String, | ||
host: String, | ||
port: Int, | ||
blockIds: Seq[String], | ||
execId: String, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. see above - best to leave execId out of this |
||
blockIds: Array[String], | ||
listener: BlockFetchingListener): Unit = { | ||
logTrace(s"Fetch blocks from $host:$port (executor id $execId)") | ||
try { | ||
val client = clientFactory.createClient(hostname, port) | ||
new NettyBlockFetcher(serializer, client, blockIds, listener).start() | ||
val client = clientFactory.createClient(host, port) | ||
new OneForOneBlockFetcher(client, blockIds.toArray, listener) | ||
.start(OpenBlocks(blockIds.map(BlockId.apply))) | ||
} catch { | ||
case e: Exception => | ||
logError("Exception while beginning fetchBlocks", e) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is this change about?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was introduced recently, and I was planning on using it, but ended up not. Still, I was inclined to keep the seemingly more sensible semantics of "spark.executor.id" being the executorId rather than being prefixed. It is currently only used by the "MetricsSystem".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that makes sense. This was introduced in a patch that was merged not long ago (middle of 1.2 window) so it's OK to change it.