Skip to content

Commit 68e1dfb

Browse files
committed
use timeout from AkkaUtils; remove props from RegisteredExecutor
1 parent 46d332d commit 68e1dfb

File tree

3 files changed

+10
-15
lines changed

3 files changed

+10
-15
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,12 @@
1818
package org.apache.spark.executor
1919

2020
import java.nio.ByteBuffer
21-
import java.util.concurrent.TimeUnit
2221

2322
import scala.concurrent.Await
2423

2524
import akka.actor._
2625
import akka.remote._
2726
import akka.pattern.Patterns
28-
import akka.util.Timeout
2927

3028
import org.apache.spark.{SparkEnv, Logging, SecurityManager, SparkConf}
3129
import org.apache.spark.TaskState.TaskState
@@ -39,10 +37,8 @@ private[spark] class CoarseGrainedExecutorBackend(
3937
driverUrl: String,
4038
executorId: String,
4139
hostPort: String,
42-
cores: Int)
43-
extends Actor
44-
with ExecutorBackend
45-
with Logging {
40+
cores: Int,
41+
sparkProperties: Seq[(String, String)]) extends Actor with ExecutorBackend with Logging {
4642

4743
Utils.checkHostPort(hostPort, "Expected hostport")
4844

@@ -57,7 +53,7 @@ private[spark] class CoarseGrainedExecutorBackend(
5753
}
5854

5955
override def receive = {
60-
case RegisteredExecutor(sparkProperties) =>
56+
case RegisteredExecutor =>
6157
logInfo("Successfully registered with driver")
6258
// Make this host instead of hostPort ?
6359
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,
@@ -114,20 +110,20 @@ private[spark] object CoarseGrainedExecutorBackend {
114110
val (fetcher, _) = AkkaUtils.createActorSystem(
115111
"driverPropsFetcher", hostname, 0, executorConf, new SecurityManager(executorConf))
116112
val driver = fetcher.actorSelection(driverUrl)
117-
val timeout = new Timeout(5, TimeUnit.MINUTES)
113+
val timeout = AkkaUtils.askTimeout(executorConf)
118114
val fut = Patterns.ask(driver, RetrieveSparkProps, timeout)
119-
val props = Await.result(fut, timeout.duration).asInstanceOf[Seq[(String, String)]]
115+
val props = Await.result(fut, timeout).asInstanceOf[Seq[(String, String)]]
120116
fetcher.shutdown()
121117

122-
// Create a new ActorSystem to run the backend, because we can't create a
123-
// SparkEnv / Executor before getting started with all our system properties, etc
118+
// Create a new ActorSystem using driver's Spark properties to run the backend.
124119
val driverConf = new SparkConf().setAll(props)
125120
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
126121
"sparkExecutor", hostname, 0, driverConf, new SecurityManager(driverConf))
127122
// set it
128123
val sparkHostPort = hostname + ":" + boundPort
129124
actorSystem.actorOf(
130-
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
125+
Props(classOf[CoarseGrainedExecutorBackend],
126+
driverUrl, executorId, sparkHostPort, cores, props),
131127
name = "Executor")
132128
workerUrl.foreach { url =>
133129
actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher")

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ private[spark] object CoarseGrainedClusterMessages {
3636
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
3737
extends CoarseGrainedClusterMessage
3838

39-
case class RegisteredExecutor(sparkProperties: Seq[(String, String)])
40-
extends CoarseGrainedClusterMessage
39+
case object RegisteredExecutor extends CoarseGrainedClusterMessage
4140

4241
case class RegisterExecutorFailed(message: String) extends CoarseGrainedClusterMessage
4342

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
7575
sender ! RegisterExecutorFailed("Duplicate executor ID: " + executorId)
7676
} else {
7777
logInfo("Registered executor: " + sender + " with ID " + executorId)
78-
sender ! RegisteredExecutor(sparkProperties)
78+
sender ! RegisteredExecutor
7979
executorActor(executorId) = sender
8080
executorHost(executorId) = Utils.parseHostPort(hostPort)._1
8181
totalCores(executorId) = cores

0 commit comments

Comments
 (0)