Skip to content

Commit f26556b

Browse files
author
Marcelo Vanzin
committed
Fix a thread-safety issue in "local" mode.
The issue is that SparkConf is not thread-safe; so it was possible for the executor thread to try to read the configuration while the context thread was modifying it. In my tests this caused the executor to consistently miss the "spark.driver.port" config and fail tests. Long term, it would probably be better to investigate using a concurrent map implementation in SparkConf (instead of a HashMap).
1 parent 2f4e8b4 commit f26556b

File tree

2 files changed

+14
-5
lines changed

2 files changed

+14
-5
lines changed

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,18 +43,25 @@ private[spark] class Executor(
4343
executorId: String,
4444
slaveHostname: String,
4545
env: SparkEnv,
46+
conf: SparkConf,
4647
isLocal: Boolean = false)
4748
extends Logging
4849
{
50+
51+
def this(executorId: String,
52+
slaveHostname: String,
53+
env: SparkEnv,
54+
isLocal: Boolean = false) = {
55+
this(executorId, slaveHostname, env, env.conf, isLocal)
56+
}
57+
4958
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
5059
// Each map holds the master's timestamp for the version of that file or JAR we got.
5160
private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
5261
private val currentJars: HashMap[String, Long] = new HashMap[String, Long]()
5362

5463
private val EMPTY_BYTE_BUFFER = ByteBuffer.wrap(new Array[Byte](0))
5564

56-
private val conf = env.conf
57-
5865
@volatile private var isStopped = false
5966

6067
// No ip or host:port - just hostname

core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
2121

2222
import akka.actor.{Actor, ActorRef, Props}
2323

24-
import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
24+
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
2525
import org.apache.spark.TaskState.TaskState
2626
import org.apache.spark.executor.{Executor, ExecutorBackend}
2727
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
@@ -43,6 +43,7 @@ private case class StopExecutor()
4343
private[spark] class LocalActor(
4444
scheduler: TaskSchedulerImpl,
4545
executorBackend: LocalBackend,
46+
conf: SparkConf,
4647
private val totalCores: Int)
4748
extends Actor with ActorLogReceive with Logging {
4849

@@ -52,7 +53,7 @@ private[spark] class LocalActor(
5253
private val localExecutorHostname = "localhost"
5354

5455
private val executor = new Executor(
55-
localExecutorId, localExecutorHostname, SparkEnv.get, isLocal = true)
56+
localExecutorId, localExecutorHostname, SparkEnv.get, conf, isLocal = true)
5657

5758
override def receiveWithLogging = {
5859
case ReviveOffers =>
@@ -90,11 +91,12 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
9091
extends SchedulerBackend with ExecutorBackend {
9192

9293
private val appId = "local-" + System.currentTimeMillis
94+
private val conf = SparkEnv.get.conf.clone()
9395
var localActor: ActorRef = null
9496

9597
override def start() {
9698
localActor = SparkEnv.get.actorSystem.actorOf(
97-
Props(new LocalActor(scheduler, this, totalCores)),
99+
Props(new LocalActor(scheduler, this, conf, totalCores)),
98100
"LocalBackendActor")
99101
}
100102

0 commit comments

Comments
 (0)