Skip to content

Commit 3f7b9e8

Browse files
committed
Add JIRA reference; move clone into DAGScheduler
1 parent 707e417 commit 3f7b9e8

File tree

4 files changed

+11
-9
lines changed

4 files changed

+11
-9
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@ import scala.reflect.{ClassTag, classTag}
3434

3535
import akka.actor.Props
3636

37-
import org.apache.commons.lang3.SerializationUtils
38-
3937
import org.apache.hadoop.conf.Configuration
4038
import org.apache.hadoop.fs.Path
4139
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
@@ -1491,7 +1489,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14911489
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
14921490
}
14931491
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
1494-
resultHandler, SerializationUtils.clone(localProperties.get))
1492+
resultHandler, localProperties.get)
14951493
progressBar.foreach(_.finishAll())
14961494
rdd.doCheckpoint()
14971495
}
@@ -1577,7 +1575,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
15771575
logInfo("Starting job: " + callSite.shortForm)
15781576
val start = System.nanoTime
15791577
val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout,
1580-
SerializationUtils.clone(localProperties.get))
1578+
localProperties.get)
15811579
logInfo(
15821580
"Job finished: " + callSite.shortForm + ", took " + (System.nanoTime - start) / 1e9 + " s")
15831581
result
@@ -1605,7 +1603,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
16051603
callSite,
16061604
allowLocal = false,
16071605
resultHandler,
1608-
SerializationUtils.clone(localProperties.get))
1606+
localProperties.get)
16091607
new SimpleFutureAction(waiter, resultFunc)
16101608
}
16111609

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import scala.util.control.NonFatal
3131
import akka.pattern.ask
3232
import akka.util.Timeout
3333

34+
import org.apache.commons.lang3.SerializationUtils
35+
3436
import org.apache.spark._
3537
import org.apache.spark.broadcast.Broadcast
3638
import org.apache.spark.executor.TaskMetrics
@@ -493,7 +495,8 @@ class DAGScheduler(
493495
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
494496
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
495497
eventProcessLoop.post(JobSubmitted(
496-
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
498+
jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter,
499+
SerializationUtils.clone(properties)))
497500
waiter
498501
}
499502

@@ -534,7 +537,8 @@ class DAGScheduler(
534537
val partitions = (0 until rdd.partitions.size).toArray
535538
val jobId = nextJobId.getAndIncrement()
536539
eventProcessLoop.post(JobSubmitted(
537-
jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties))
540+
jobId, rdd, func2, partitions, allowLocal = false, callSite, listener,
541+
SerializationUtils.clone(properties)))
538542
listener.awaitResult() // Will throw an exception if the job fails
539543
}
540544

core/src/test/scala/org/apache/spark/JobCancellationSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
141141
assert(jobB.get() === 100)
142142
}
143143

144-
test("inherited job group") {
144+
test("inherited job group (SPARK-6629)") {
145145
sc = new SparkContext("local[2]", "test")
146146

147147
// Add a listener to release the semaphore once any tasks are launched.

core/src/test/scala/org/apache/spark/ThreadingSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ class ThreadingSuite extends FunSuite with LocalSparkContext {
191191
assert(sc.getLocalProperty("Foo") === null)
192192
}
193193

194-
test("mutations to local properties should not affect submitted jobs") {
194+
test("mutations to local properties should not affect submitted jobs (SPARK-6629)") {
195195
val jobStarted = new Semaphore(0)
196196
val jobEnded = new Semaphore(0)
197197
@volatile var jobResult: JobResult = null

0 commit comments

Comments
 (0)