Skip to content

Commit 8ca4176

Browse files
mengxrpwendell
authored andcommitted
[SPARK-1112, 2156] Bootstrap to fetch the driver's Spark properties.
This is an alternative solution to #1124 . Before launching the executor backend, we first fetch driver's spark properties and use it to overwrite executor's spark properties. This should be better than #1124. @pwendell Are there spark properties that might be different on the driver and on the executors? Author: Xiangrui Meng <meng@databricks.com> Closes #1132 from mengxr/akka-bootstrap and squashes the following commits: 77ff32d [Xiangrui Meng] organize imports 68e1dfb [Xiangrui Meng] use timeout from AkkaUtils; remove props from RegisteredExecutor 46d332d [Xiangrui Meng] fix a test 7947c18 [Xiangrui Meng] increase slack size for akka 4ab696a [Xiangrui Meng] bootstrap to retrieve driver spark conf
1 parent a162c9b commit 8ca4176

File tree

6 files changed

+54
-40
lines changed

6 files changed

+54
-40
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,26 @@ package org.apache.spark.executor
1919

2020
import java.nio.ByteBuffer
2121

22-
import akka.actor._
23-
import akka.remote._
22+
import scala.concurrent.Await
2423

25-
import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf}
24+
import akka.actor.{Actor, ActorSelection, Props}
25+
import akka.pattern.Patterns
26+
import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}
27+
28+
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
2629
import org.apache.spark.TaskState.TaskState
2730
import org.apache.spark.deploy.SparkHadoopUtil
2831
import org.apache.spark.deploy.worker.WorkerWatcher
29-
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
3032
import org.apache.spark.scheduler.TaskDescription
33+
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
3134
import org.apache.spark.util.{AkkaUtils, Utils}
3235

3336
private[spark] class CoarseGrainedExecutorBackend(
3437
driverUrl: String,
3538
executorId: String,
3639
hostPort: String,
37-
cores: Int)
38-
extends Actor
39-
with ExecutorBackend
40-
with Logging {
40+
cores: Int,
41+
sparkProperties: Seq[(String, String)]) extends Actor with ExecutorBackend with Logging {
4142

4243
Utils.checkHostPort(hostPort, "Expected hostport")
4344

@@ -52,7 +53,7 @@ private[spark] class CoarseGrainedExecutorBackend(
5253
}
5354

5455
override def receive = {
55-
case RegisteredExecutor(sparkProperties) =>
56+
case RegisteredExecutor =>
5657
logInfo("Successfully registered with driver")
5758
// Make this host instead of hostPort ?
5859
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,
@@ -101,26 +102,33 @@ private[spark] object CoarseGrainedExecutorBackend {
101102
workerUrl: Option[String]) {
102103

103104
SparkHadoopUtil.get.runAsSparkUser { () =>
104-
// Debug code
105-
Utils.checkHost(hostname)
106-
107-
val conf = new SparkConf
108-
// Create a new ActorSystem to run the backend, because we can't create a
109-
// SparkEnv / Executor before getting started with all our system properties, etc
110-
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
111-
conf, new SecurityManager(conf))
112-
// set it
113-
val sparkHostPort = hostname + ":" + boundPort
114-
actorSystem.actorOf(
115-
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
116-
sparkHostPort, cores),
117-
name = "Executor")
118-
workerUrl.foreach {
119-
url =>
120-
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
121-
}
122-
actorSystem.awaitTermination()
123-
105+
// Debug code
106+
Utils.checkHost(hostname)
107+
108+
// Bootstrap to fetch the driver's Spark properties.
109+
val executorConf = new SparkConf
110+
val (fetcher, _) = AkkaUtils.createActorSystem(
111+
"driverPropsFetcher", hostname, 0, executorConf, new SecurityManager(executorConf))
112+
val driver = fetcher.actorSelection(driverUrl)
113+
val timeout = AkkaUtils.askTimeout(executorConf)
114+
val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
115+
val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]]
116+
fetcher.shutdown()
117+
118+
// Create a new ActorSystem using driver's Spark properties to run the backend.
119+
val driverConf = new SparkConf().setAll(props)
120+
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
121+
"sparkExecutor", hostname, 0, driverConf, new SecurityManager(driverConf))
122+
// set it
123+
val sparkHostPort = hostname + ":" + boundPort
124+
actorSystem.actorOf(
125+
Props(classOf[CoarseGrainedExecutorBackend],
126+
driverUrl, executorId, sparkHostPort, cores, props),
127+
name = "Executor")
128+
workerUrl.foreach { url =>
129+
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")
130+
}
131+
actorSystem.awaitTermination()
124132
}
125133
}
126134

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ private[spark] class Executor(
212212
val serializedDirectResult = ser.serialize(directResult)
213213
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
214214
val serializedResult = {
215-
if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
215+
if (serializedDirectResult.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
216216
logInfo("Storing result for " + taskId + " in local BlockManager")
217217
val blockId = TaskResultBlockId(taskId)
218218
env.blockManager.putBytes(

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,21 @@ package org.apache.spark.scheduler.cluster
2020
import java.nio.ByteBuffer
2121

2222
import org.apache.spark.TaskState.TaskState
23-
import org.apache.spark.scheduler.TaskDescription
2423
import org.apache.spark.util.{SerializableBuffer, Utils}
2524

2625
private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
2726

2827
private[spark] object CoarseGrainedClusterMessages {
2928

29+
case object RetrieveSparkProps extends CoarseGrainedClusterMessage
30+
3031
// Driver to executors
3132
case class LaunchTask(data: SerializableBuffer) extends CoarseGrainedClusterMessage
3233

3334
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
3435
extends CoarseGrainedClusterMessage
3536

36-
case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
37-
extends CoarseGrainedClusterMessage
37+
case object RegisteredExecutor extends CoarseGrainedClusterMessage
3838

3939
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
4040

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
7575
sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
7676
} else {
7777
logInfo("Registered executor: " + sender + " with ID " + executorId)
78-
sender ! RegisteredExecutor(sparkProperties)
78+
sender ! RegisteredExecutor
7979
executorActor(executorId) = sender
8080
executorHost(executorId) = Utils.parseHostPort(hostPort)._1
8181
totalCores(executorId) = cores
@@ -124,6 +124,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
124124
addressToExecutorId.get(address).foreach(removeExecutor(_,
125125
"remote Akka client disassociated"))
126126

127+
case RetrieveSparkProps =>
128+
sender ! sparkProperties
127129
}
128130

129131
// Make fake resource offers on all executors
@@ -143,14 +145,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
143145
for (task <- tasks.flatten) {
144146
val ser = SparkEnv.get.closureSerializer.newInstance()
145147
val serializedTask = ser.serialize(task)
146-
if (serializedTask.limit >= akkaFrameSize - 1024) {
148+
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
147149
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
148150
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
149151
try {
150-
var msg = "Serialized task %s:%d was %d bytes which " +
151-
"exceeds spark.akka.frameSize (%d bytes). " +
152-
"Consider using broadcast variables for large values."
153-
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize)
152+
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
153+
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
154+
"spark.akka.frameSize or using broadcast variables for large values."
155+
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
156+
AkkaUtils.reservedSizeBytes)
154157
taskSet.abort(msg)
155158
} catch {
156159
case e: Exception => logError("Exception in error callback", e)

core/src/main/scala/org/apache/spark/util/AkkaUtils.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,4 +121,7 @@ private[spark] object AkkaUtils extends Logging {
121121
def maxFrameSizeBytes(conf: SparkConf): Int = {
122122
conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024
123123
}
124+
125+
/** Space reserved for extra data in an Akka message besides serialized task or task result. */
126+
val reservedSizeBytes = 200 * 1024
124127
}

core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class CoarseGrainedSchedulerBackendSuite extends FunSuite with LocalSparkContext
3535
val thrown = intercept[SparkException] {
3636
larger.collect()
3737
}
38-
assert(thrown.getMessage.contains("Consider using broadcast variables for large values"))
38+
assert(thrown.getMessage.contains("using broadcast variables for large values"))
3939
val smaller = sc.parallelize(1 to 4).collect()
4040
assert(smaller.size === 4)
4141
}

0 commit comments

Comments
 (0)