Skip to content

Commit ffa5f8e

Browse files
committed
Fix issue when local properties pass from parent to child thread
1 parent 2aff798 commit ffa5f8e

File tree

2 files changed

+42
-2
lines changed

2 files changed

+42
-2
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,9 @@ class SparkContext(
256256
private[spark] var checkpointDir: Option[String] = None
257257

258258
// Thread Local variable that can be used by users to pass information down the stack
259-
private val localProperties = new ThreadLocal[Properties]
259+
private val localProperties = new InheritableThreadLocal[Properties] {
260+
override protected def childValue(parent: Properties): Properties = new Properties(parent)
261+
}
260262

261263
def initLocalProperties() {
262264
localProperties.set(new Properties())
@@ -273,6 +275,8 @@ class SparkContext(
273275
}
274276
}
275277

278+
def getLocalProperty(key: String): String = Option(localProperties.get).map(_.getProperty(key)).getOrElse(null)
279+
276280
/** Set a human readable description of the current job. */
277281
def setJobDescription(value: String) {
278282
setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)

core/src/test/scala/org/apache/spark/ThreadingSuite.scala

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ object ThreadingSuiteState {
4040
}
4141

4242
class ThreadingSuite extends FunSuite with LocalSparkContext {
43-
43+
4444
test("accessing SparkContext form a different thread") {
4545
sc = new SparkContext("local", "test")
4646
val nums = sc.parallelize(1 to 10, 2)
@@ -149,4 +149,40 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
149149
fail("One or more threads didn't see runningThreads = 4")
150150
}
151151
}
152+
153+
test("set local properties in different thread") {
154+
sc = new SparkContext("local", "test")
155+
156+
val threads = (1 to 5).map{ i =>
157+
new Thread() {
158+
override def run() {
159+
sc.setLocalProperty("test", i.toString)
160+
assert(sc.getLocalProperty("test") === i.toString)
161+
}
162+
}
163+
}
164+
165+
threads.foreach(_.start())
166+
167+
assert(sc.getLocalProperty("test") === null)
168+
}
169+
170+
test("set and get local properties in parent-children thread") {
171+
sc = new SparkContext("local", "test")
172+
sc.setLocalProperty("test", "parent")
173+
174+
val threads = (1 to 5).map{ i =>
175+
new Thread() {
176+
override def run() {
177+
assert(sc.getLocalProperty("test") === "parent")
178+
sc.setLocalProperty("test", i.toString)
179+
assert(sc.getLocalProperty("test") === i.toString)
180+
}
181+
}
182+
}
183+
184+
threads.foreach(_.start())
185+
assert(sc.getLocalProperty("test") === "parent")
186+
assert(sc.getLocalProperty("Foo") === null)
187+
}
152188
}

0 commit comments

Comments
 (0)