Skip to content

Commit 707e417

Browse files
committed
Clone local properties to prevent mutations from breaking job cancellation.
1 parent b376114 commit 707e417

File tree

2 files changed

+50
-4
lines changed

2 files changed

+50
-4
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import scala.reflect.{ClassTag, classTag}
3434

3535
import akka.actor.Props
3636

37+
import org.apache.commons.lang3.SerializationUtils
38+
3739
import org.apache.hadoop.conf.Configuration
3840
import org.apache.hadoop.fs.Path
3941
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
@@ -1489,7 +1491,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14891491
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
14901492
}
14911493
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
1492-
resultHandler, localProperties.get)
1494+
resultHandler, SerializationUtils.clone(localProperties.get))
14931495
progressBar.foreach(_.finishAll())
14941496
rdd.doCheckpoint()
14951497
}
@@ -1575,7 +1577,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
15751577
logInfo("Starting job: " + callSite.shortForm)
15761578
val start = System.nanoTime
15771579
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
1578-
localProperties.get)
1580+
SerializationUtils.clone(localProperties.get))
15791581
logInfo(
15801582
"Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
15811583
result
@@ -1603,7 +1605,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
16031605
callSite,
16041606
allowLocal = false,
16051607
resultHandler,
1606-
localProperties.get)
1608+
SerializationUtils.clone(localProperties.get))
16071609
new SimpleFutureAction(waiter, resultFunc)
16081610
}
16091611

core/src/test/scala/org/apache/spark/ThreadingSuite.scala

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
package org.apache.spark
1919

20-
import java.util.concurrent.Semaphore
20+
import java.util.concurrent.{TimeUnit, Semaphore}
2121
import java.util.concurrent.atomic.AtomicBoolean
2222
import java.util.concurrent.atomic.AtomicInteger
2323

24+
import org.apache.spark.scheduler._
2425
import org.scalatest.FunSuite
2526

2627
/**
@@ -189,4 +190,47 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
189190
assert(sc.getLocalProperty("test") === "parent")
190191
assert(sc.getLocalProperty("Foo") === null)
191192
}
193+
194+
test("mutations to local properties should not affect submitted jobs") {
195+
val jobStarted = new Semaphore(0)
196+
val jobEnded = new Semaphore(0)
197+
@volatile var jobResult: JobResult = null
198+
199+
sc = new SparkContext("local", "test")
200+
sc.setJobGroup("originalJobGroupId", "description")
201+
sc.addSparkListener(new SparkListener {
202+
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
203+
jobStarted.release()
204+
}
205+
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
206+
jobResult = jobEnd.jobResult
207+
jobEnded.release()
208+
}
209+
})
210+
211+
// Create a new thread which will inherit the current thread's properties
212+
val thread = new Thread() {
213+
override def run(): Unit = {
214+
assert(sc.getLocalProperty(SparkContext.SPARK_JOB_GROUP_ID) === "originalJobGroupId")
215+
// Sleeps for a total of 10 seconds, but allows cancellation to interrupt the task
216+
try {
217+
sc.parallelize(1 to 100).foreach { x =>
218+
Thread.sleep(100)
219+
}
220+
} catch {
221+
case s: SparkException => // ignored so that we don't print noise in test logs
222+
}
223+
}
224+
}
225+
thread.start()
226+
// Wait for the job to start, then mutate the original properties, which should have been
227+
// inherited by the running job but hopefully defensively copied or snapshotted:
228+
jobStarted.tryAcquire(10, TimeUnit.SECONDS)
229+
sc.setJobGroup("modifiedJobGroupId", "description")
230+
// Canceling the original job group should cancel the running job. In other words, the
231+
// modification of the properties object should not affect the properties of running jobs
232+
sc.cancelJobGroup("originalJobGroupId")
233+
jobEnded.tryAcquire(10, TimeUnit.SECONDS)
234+
assert(jobResult.isInstanceOf[JobFailed])
235+
}
192236
}

0 commit comments

Comments
 (0)