Skip to content

Commit

Permalink
Track workers by executor ID instead of hostname to allow multiple
Browse files Browse the repository at this point in the history
executors per machine and remove the need for multiple IP addresses in
unit tests.
  • Loading branch information
mateiz committed Jan 28, 2013
1 parent b9e2d9e commit 44b4a0f
Show file tree
Hide file tree
Showing 35 changed files with 343 additions and 314 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ private[spark] class MapOutputTracker(actorSystem: ActorSystem, isMaster: Boolea
var array = mapStatuses(shuffleId)
if (array != null) {
array.synchronized {
if (array(mapId) != null && array(mapId).address == bmAddress) {
if (array(mapId) != null && array(mapId).location == bmAddress) {
array(mapId) = null
}
}
Expand Down Expand Up @@ -277,7 +277,7 @@ private[spark] object MapOutputTracker {
throw new FetchFailedException(null, shuffleId, -1, reduceId,
new Exception("Missing an output location for shuffle " + shuffleId))
} else {
(status.address, decompressSize(status.compressedSizes(reduceId)))
(status.location, decompressSize(status.compressedSizes(reduceId)))
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ class SparkContext(

// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.createFromSystemProperties(
"<driver>",
System.getProperty("spark.master.host"),
System.getProperty("spark.master.port").toInt,
true,
Expand All @@ -97,7 +98,7 @@ class SparkContext(
// Keeps track of all persisted RDDs
private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]()

private[spark] val metadataCleaner = new MetadataCleaner("DAGScheduler", this.cleanup)
private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)


// Add each JAR given through the constructor
Expand Down Expand Up @@ -649,10 +650,9 @@ class SparkContext(
/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()

/** Called by MetadataCleaner to clean up the persistentRdds map periodically */
private[spark] def cleanup(cleanupTime: Long) {
var sizeBefore = persistentRdds.size
persistentRdds.clearOldValues(cleanupTime)
logInfo("idToStage " + sizeBefore + " --> " + persistentRdds.size)
}
}

Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import spark.util.AkkaUtils
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
*/
class SparkEnv (
val executorId: String,
val actorSystem: ActorSystem,
val serializer: Serializer,
val closureSerializer: Serializer,
Expand Down Expand Up @@ -58,11 +59,12 @@ object SparkEnv extends Logging {
}

def createFromSystemProperties(
executorId: String,
hostname: String,
port: Int,
isMaster: Boolean,
isLocal: Boolean
) : SparkEnv = {
isLocal: Boolean): SparkEnv = {

val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)

// Bit of a hack: If this is the master and our port was 0 (meaning bind to any free port),
Expand All @@ -86,7 +88,7 @@ object SparkEnv extends Logging {
val masterPort: Int = System.getProperty("spark.master.port", "7077").toInt
val blockManagerMaster = new BlockManagerMaster(
actorSystem, isMaster, isLocal, masterIp, masterPort)
val blockManager = new BlockManager(actorSystem, blockManagerMaster, serializer)
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)

val connectionManager = blockManager.connectionManager

Expand Down Expand Up @@ -122,6 +124,7 @@ object SparkEnv extends Logging {
}

new SparkEnv(
executorId,
actorSystem,
serializer,
closureSerializer,
Expand Down
16 changes: 9 additions & 7 deletions core/src/main/scala/spark/deploy/LocalSparkCluster.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import spark.{Logging, Utils}

import scala.collection.mutable.ArrayBuffer

/**
* Testing class that creates a Spark standalone process in-cluster (that is, running the
* spark.deploy.master.Master and spark.deploy.worker.Workers in the same JVMs). Executors launched
* by the Workers still run in separate JVMs. This can be used to test distributed operation and
* fault recovery without spinning up a lot of processes.
*/
private[spark]
class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) extends Logging {

Expand All @@ -35,16 +41,12 @@ class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int)

/* Start the Slaves */
for (slaveNum <- 1 to numSlaves) {
/* We can pretend to test distributed stuff by giving the slaves distinct hostnames.
All of 127/8 should be a loopback, we use 127.100.*.* in hopes that it is
sufficiently distinctive. */
val slaveIpAddress = "127.100.0." + (slaveNum % 256)
val (actorSystem, boundPort) =
AkkaUtils.createActorSystem("sparkWorker" + slaveNum, slaveIpAddress, 0)
AkkaUtils.createActorSystem("sparkWorker" + slaveNum, localIpAddress, 0)
slaveActorSystems += actorSystem
val actor = actorSystem.actorOf(
Props(new Worker(slaveIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)),
name = "Worker")
Props(new Worker(localIpAddress, boundPort, 0, coresPerSlave, memoryPerSlave, masterUrl)),
name = "Worker")
slaveActors += actor
}

Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,10 @@ private[spark] class Master(ip: String, port: Int, webUiPort: Int) extends Actor
exec.worker.removeExecutor(exec)

// Only retry certain number of times so we don't go into an infinite loop.
if (jobInfo.incrementRetryCount <= JobState.MAX_NUM_RETRY) {
if (jobInfo.incrementRetryCount < JobState.MAX_NUM_RETRY) {
schedule()
} else {
val e = new SparkException("Job %s wth ID %s failed %d times.".format(
val e = new SparkException("Job %s with ID %s failed %d times.".format(
jobInfo.desc.name, jobInfo.id, jobInfo.retryCount))
logError(e.getMessage, e)
throw e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private[spark] class ExecutorRunner(

/** Replace variables such as {{SLAVEID}} and {{CORES}} in a command argument passed to us */
def substituteVariables(argument: String): String = argument match {
case "{{SLAVEID}}" => workerId
case "{{EXECUTOR_ID}}" => execId.toString
case "{{HOSTNAME}}" => hostname
case "{{CORES}}" => cores.toString
case other => other
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[spark] class Executor extends Logging {

initLogging()

def initialize(slaveHostname: String, properties: Seq[(String, String)]) {
def initialize(executorId: String, slaveHostname: String, properties: Seq[(String, String)]) {
// Make sure the local hostname we report matches the cluster scheduler's name for this host
Utils.setCustomHostname(slaveHostname)

Expand Down Expand Up @@ -64,7 +64,7 @@ private[spark] class Executor extends Logging {
)

// Initialize Spark environment (using system properties read above)
env = SparkEnv.createFromSystemProperties(slaveHostname, 0, false, false)
env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
SparkEnv.set(env)

// Start worker thread pool
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ private[spark] class MesosExecutorBackend(executor: Executor)
executorInfo: ExecutorInfo,
frameworkInfo: FrameworkInfo,
slaveInfo: SlaveInfo) {
logInfo("Registered with Mesos as executor ID " + executorInfo.getExecutorId.getValue)
this.driver = driver
val properties = Utils.deserialize[Array[(String, String)]](executorInfo.getData.toByteArray)
executor.initialize(slaveInfo.getHostname, properties)
executor.initialize(executorInfo.getExecutorId.getValue, slaveInfo.getHostname, properties)
}

override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import spark.scheduler.cluster.RegisterSlave
private[spark] class StandaloneExecutorBackend(
executor: Executor,
masterUrl: String,
slaveId: String,
executorId: String,
hostname: String,
cores: Int)
extends Actor
Expand All @@ -30,7 +30,7 @@ private[spark] class StandaloneExecutorBackend(
try {
logInfo("Connecting to master: " + masterUrl)
master = context.actorFor(masterUrl)
master ! RegisterSlave(slaveId, hostname, cores)
master ! RegisterSlave(executorId, hostname, cores)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
Expand All @@ -43,7 +43,7 @@ private[spark] class StandaloneExecutorBackend(
override def receive = {
case RegisteredSlave(sparkProperties) =>
logInfo("Successfully registered with master")
executor.initialize(hostname, sparkProperties)
executor.initialize(executorId, hostname, sparkProperties)

case RegisterSlaveFailed(message) =>
logError("Slave registration failed: " + message)
Expand All @@ -55,24 +55,24 @@ private[spark] class StandaloneExecutorBackend(
}

override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
master ! StatusUpdate(slaveId, taskId, state, data)
master ! StatusUpdate(executorId, taskId, state, data)
}
}

private[spark] object StandaloneExecutorBackend {
def run(masterUrl: String, slaveId: String, hostname: String, cores: Int) {
def run(masterUrl: String, executorId: String, hostname: String, cores: Int) {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0)
val actor = actorSystem.actorOf(
Props(new StandaloneExecutorBackend(new Executor, masterUrl, slaveId, hostname, cores)),
Props(new StandaloneExecutorBackend(new Executor, masterUrl, executorId, hostname, cores)),
name = "Executor")
actorSystem.awaitTermination()
}

def main(args: Array[String]) {
if (args.length != 4) {
System.err.println("Usage: StandaloneExecutorBackend <master> <slaveId> <hostname> <cores>")
System.err.println("Usage: StandaloneExecutorBackend <master> <executorId> <hostname> <cores>")
System.exit(1)
}
run(args(0), args(1), args(2), args(3).toInt)
Expand Down
44 changes: 22 additions & 22 deletions core/src/main/scala/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
eventQueue.put(CompletionEvent(task, reason, result, accumUpdates))
}

// Called by TaskScheduler when a host fails.
override def hostLost(host: String) {
eventQueue.put(HostLost(host))
// Called by TaskScheduler when an executor fails.
override def executorLost(execId: String) {
eventQueue.put(ExecutorLost(execId))
}

// Called by TaskScheduler to cancel an entire TaskSet due to repeated failures.
Expand Down Expand Up @@ -72,7 +72,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with

// For tracking failed nodes, we use the MapOutputTracker's generation number, which is
// sent with every task. When we detect a node failing, we note the current generation number
// and failed host, increment it for new tasks, and use this to ignore stray ShuffleMapTask
// and failed executor, increment it for new tasks, and use this to ignore stray ShuffleMapTask
// results.
// TODO: Garbage collect information about failure generations when we know there are no more
// stray messages to detect.
Expand Down Expand Up @@ -108,7 +108,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
}

def clearCacheLocs() {
cacheLocs.clear
cacheLocs.clear()
}

/**
Expand Down Expand Up @@ -271,8 +271,8 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
submitStage(finalStage)
}

case HostLost(host) =>
handleHostLost(host)
case ExecutorLost(execId) =>
handleExecutorLost(execId)

case completion: CompletionEvent =>
handleTaskCompletion(completion)
Expand Down Expand Up @@ -436,10 +436,10 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
case smt: ShuffleMapTask =>
val stage = idToStage(smt.stageId)
val status = event.result.asInstanceOf[MapStatus]
val host = status.address.ip
logInfo("ShuffleMapTask finished with host " + host)
if (failedGeneration.contains(host) && smt.generation <= failedGeneration(host)) {
logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + host)
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
if (failedGeneration.contains(execId) && smt.generation <= failedGeneration(execId)) {
logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId)
} else {
stage.addOutputLoc(smt.partition, status)
}
Expand Down Expand Up @@ -511,9 +511,9 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
// Remember that a fetch failed now; this is used to resubmit the broken
// stages later, after a small wait (to give other tasks the chance to fail)
lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock
// TODO: mark the host as failed only if there were lots of fetch failures on it
// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleHostLost(bmAddress.ip, Some(task.generation))
handleExecutorLost(bmAddress.executorId, Some(task.generation))
}

case other =>
Expand All @@ -523,21 +523,21 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
}

/**
* Responds to a host being lost. This is called inside the event loop so it assumes that it can
* modify the scheduler's internal state. Use hostLost() to post a host lost event from outside.
* Responds to an executor being lost. This is called inside the event loop, so it assumes it can
* modify the scheduler's internal state. Use executorLost() to post a loss event from outside.
*
* Optionally the generation during which the failure was caught can be passed to avoid allowing
* stray fetch failures from possibly retriggering the detection of a node as lost.
*/
def handleHostLost(host: String, maybeGeneration: Option[Long] = None) {
def handleExecutorLost(execId: String, maybeGeneration: Option[Long] = None) {
val currentGeneration = maybeGeneration.getOrElse(mapOutputTracker.getGeneration)
if (!failedGeneration.contains(host) || failedGeneration(host) < currentGeneration) {
failedGeneration(host) = currentGeneration
logInfo("Host lost: " + host + " (generation " + currentGeneration + ")")
env.blockManager.master.notifyADeadHost(host)
if (!failedGeneration.contains(execId) || failedGeneration(execId) < currentGeneration) {
failedGeneration(execId) = currentGeneration
logInfo("Executor lost: %s (generation %d)".format(execId, currentGeneration))
env.blockManager.master.removeExecutor(execId)
// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleToMapStage) {
stage.removeOutputsOnHost(host)
stage.removeOutputsOnExecutor(execId)
val locs = stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray
mapOutputTracker.registerMapOutputs(shuffleId, locs, true)
}
Expand All @@ -546,7 +546,7 @@ class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with
}
clearCacheLocs()
} else {
logDebug("Additional host lost message for " + host +
logDebug("Additional executor lost message for " + execId +
"(generation " + currentGeneration + ")")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[spark] case class CompletionEvent(
accumUpdates: Map[Long, Any])
extends DAGSchedulerEvent

private[spark] case class HostLost(host: String) extends DAGSchedulerEvent
private[spark] case class ExecutorLost(execId: String) extends DAGSchedulerEvent

private[spark] case class TaskSetFailed(taskSet: TaskSet, reason: String) extends DAGSchedulerEvent

Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/spark/scheduler/MapStatus.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@ import java.io.{ObjectOutput, ObjectInput, Externalizable}
* task ran on as well as the sizes of outputs for each reducer, for passing on to the reduce tasks.
* The map output sizes are compressed using MapOutputTracker.compressSize.
*/
private[spark] class MapStatus(var address: BlockManagerId, var compressedSizes: Array[Byte])
private[spark] class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])
extends Externalizable {

def this() = this(null, null) // For deserialization only

def writeExternal(out: ObjectOutput) {
address.writeExternal(out)
location.writeExternal(out)
out.writeInt(compressedSizes.length)
out.write(compressedSizes)
}

def readExternal(in: ObjectInput) {
address = BlockManagerId(in)
location = BlockManagerId(in)
compressedSizes = new Array[Byte](in.readInt())
in.readFully(compressedSizes)
}
Expand Down
Loading

0 comments on commit 44b4a0f

Please sign in to comment.