Skip to content

Commit 6e14683

Browse files
committed
unit test just to make sure we fail fast on concurrent attempts
1 parent 06a0af6 commit 6e14683

File tree

1 file changed

+16
-0
lines changed

1 file changed

+16
-0
lines changed

core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,4 +128,20 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with L
128128
assert(taskDescriptions.map(_.executorId) === Seq("executor0"))
129129
}
130130

131+
test("refuse to schedule concurrent attempts for the same stage (SPARK-8103)") {
132+
sc = new SparkContext("local", "TaskSchedulerImplSuite")
133+
val taskScheduler = new TaskSchedulerImpl(sc)
134+
taskScheduler.initialize(new FakeSchedulerBackend)
135+
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
136+
val dagScheduler = new DAGScheduler(sc, taskScheduler) {
137+
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
138+
override def executorAdded(execId: String, host: String) {}
139+
}
140+
taskScheduler.setDAGScheduler(dagScheduler)
141+
val attempt1 = new TaskSet(Array(new FakeTask(0)), 0, 0, 0, null)
142+
val attempt2 = new TaskSet(Array(new FakeTask(0)), 0, 1, 0, null)
143+
taskScheduler.submitTasks(attempt1)
144+
intercept[SparkIllegalStateException] { taskScheduler.submitTasks(attempt2)}
145+
}
146+
131147
}

0 commit comments

Comments
 (0)