Skip to content

Commit d6fd119

Browse files
committed
Remove references to ClusterScheduler.
ClusterScheduler was renamed to TaskSchedulerImpl; this commit updates comments and tests accordingly.
1 parent 12738c1 commit d6fd119

File tree

8 files changed

+47
-46
lines changed

8 files changed

+47
-46
lines changed

core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler
1919

2020
/**
2121
* A backend interface for scheduling systems that allows plugging in different ones under
22-
* ClusterScheduler. We assume a Mesos-like model where the application gets resource offers as
22+
* TaskSchedulerImpl. We assume a Mesos-like model where the application gets resource offers as
2323
* machines become available and can launch tasks on them.
2424
*/
2525
private[spark] trait SchedulerBackend {

core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler
2020
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
2121

2222
/**
23-
* Low-level task scheduler interface, currently implemented exclusively by the ClusterScheduler.
23+
* Low-level task scheduler interface, currently implemented exclusively by TaskSchedulerImpl.
2424
* This interface allows plugging in different task schedulers. Each TaskScheduler schedulers tasks
2525
* for a single SparkContext. These schedulers get sets of tasks submitted to them from the
2626
* DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running

core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,14 @@ import scala.collection.mutable.HashSet
2626
import scala.math.max
2727
import scala.math.min
2828

29-
import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState}
29+
import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, FetchFailed, Logging, Resubmitted,
30+
SparkEnv, Success, TaskEndReason, TaskKilled, TaskResultLost, TaskState}
3031
import org.apache.spark.TaskState.TaskState
3132
import org.apache.spark.executor.TaskMetrics
3233
import org.apache.spark.util.{Clock, SystemClock}
3334

3435
/**
35-
* Schedules the tasks within a single TaskSet in the ClusterScheduler. This class keeps track of
36+
* Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of
3637
* each task, retries tasks if they fail (up to a limited number of times), and
3738
* handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces
3839
* to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node,
@@ -41,7 +42,7 @@ import org.apache.spark.util.{Clock, SystemClock}
4142
* THREADING: This class is designed to only be called from code with a lock on the
4243
* TaskScheduler (e.g. its event handlers). It should not be called from other threads.
4344
*
44-
* @param sched the ClusterScheduler associated with the TaskSetManager
45+
* @param sched the TaskSchedulerImpl associated with the TaskSetManager
4546
* @param taskSet the TaskSet to manage scheduling for
4647
* @param maxTaskFailures if any particular task fails more than this number of times, the entire
4748
* task set will be aborted

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ private[spark] class MesosSchedulerBackend(
203203
getResource(offer.getResourcesList, "cpus").toInt)
204204
}
205205

206-
// Call into the ClusterScheduler
206+
// Call into the TaskSchedulerImpl
207207
val taskLists = scheduler.resourceOffers(offerableWorkers)
208208

209209
// Build a list of Mesos tasks for each slave

core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ private case class KillTask(taskId: Long)
3535
/**
3636
* Calls to LocalBackend are all serialized through LocalActor. Using an actor makes the calls on
3737
* LocalBackend asynchronous, which is necessary to prevent deadlock between LocalBackend
38-
* and the ClusterScheduler.
38+
* and the TaskSchedulerImpl.
3939
*/
4040
private[spark] class LocalActor(
4141
scheduler: TaskSchedulerImpl,
@@ -76,7 +76,7 @@ private[spark] class LocalActor(
7676

7777
/**
7878
* LocalBackend is used when running a local version of Spark where the executor, backend, and
79-
* master all run in the same JVM. It sits behind a ClusterScheduler and handles launching tasks
79+
* master all run in the same JVM. It sits behind a TaskSchedulerImpl and handles launching tasks
8080
* on a single Executor (created by the LocalBackend) running locally.
8181
*/
8282
private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)

core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,10 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
9393
// If this test hangs, it's probably because no resource offers were made after the task
9494
// failed.
9595
val scheduler: TaskSchedulerImpl = sc.taskScheduler match {
96-
case clusterScheduler: TaskSchedulerImpl =>
97-
clusterScheduler
96+
case taskScheduler: TaskSchedulerImpl =>
97+
taskScheduler
9898
case _ =>
99-
assert(false, "Expect local cluster to use ClusterScheduler")
99+
assert(false, "Expect local cluster to use TaskSchedulerImpl")
100100
throw new ClassCastException
101101
}
102102
scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)

core/src/test/scala/org/apache/spark/scheduler/ClusterSchedulerSuite.scala renamed to core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@ class FakeTaskSetManager(
2929
initPriority: Int,
3030
initStageId: Int,
3131
initNumTasks: Int,
32-
clusterScheduler: TaskSchedulerImpl,
32+
taskScheduler: TaskSchedulerImpl,
3333
taskSet: TaskSet)
34-
extends TaskSetManager(clusterScheduler, taskSet, 0) {
34+
extends TaskSetManager(taskScheduler, taskSet, 0) {
3535

3636
parent = null
3737
weight = 1
@@ -105,7 +105,7 @@ class FakeTaskSetManager(
105105
}
106106
}
107107

108-
class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
108+
class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Logging {
109109

110110
def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskSchedulerImpl, taskSet: TaskSet): FakeTaskSetManager = {
111111
new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
@@ -133,8 +133,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
133133
}
134134

135135
test("FIFO Scheduler Test") {
136-
sc = new SparkContext("local", "ClusterSchedulerSuite")
137-
val clusterScheduler = new TaskSchedulerImpl(sc)
136+
sc = new SparkContext("local", "TaskSchedulerImplSuite")
137+
val taskScheduler = new TaskSchedulerImpl(sc)
138138
var tasks = ArrayBuffer[Task[_]]()
139139
val task = new FakeTask(0)
140140
tasks += task
@@ -144,9 +144,9 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
144144
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
145145
schedulableBuilder.buildPools()
146146

147-
val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, clusterScheduler, taskSet)
148-
val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, clusterScheduler, taskSet)
149-
val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, clusterScheduler, taskSet)
147+
val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, taskScheduler, taskSet)
148+
val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, taskScheduler, taskSet)
149+
val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, taskScheduler, taskSet)
150150
schedulableBuilder.addTaskSetManager(taskSetManager0, null)
151151
schedulableBuilder.addTaskSetManager(taskSetManager1, null)
152152
schedulableBuilder.addTaskSetManager(taskSetManager2, null)
@@ -160,8 +160,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
160160
}
161161

162162
test("Fair Scheduler Test") {
163-
sc = new SparkContext("local", "ClusterSchedulerSuite")
164-
val clusterScheduler = new TaskSchedulerImpl(sc)
163+
sc = new SparkContext("local", "TaskSchedulerImplSuite")
164+
val taskScheduler = new TaskSchedulerImpl(sc)
165165
var tasks = ArrayBuffer[Task[_]]()
166166
val task = new FakeTask(0)
167167
tasks += task
@@ -189,15 +189,15 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
189189
val properties2 = new Properties()
190190
properties2.setProperty("spark.scheduler.pool","2")
191191

192-
val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet)
193-
val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet)
194-
val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, clusterScheduler, taskSet)
192+
val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, taskScheduler, taskSet)
193+
val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, taskScheduler, taskSet)
194+
val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, taskScheduler, taskSet)
195195
schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
196196
schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
197197
schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
198198

199-
val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, clusterScheduler, taskSet)
200-
val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, clusterScheduler, taskSet)
199+
val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, taskScheduler, taskSet)
200+
val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, taskScheduler, taskSet)
201201
schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
202202
schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
203203

@@ -217,8 +217,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
217217
}
218218

219219
test("Nested Pool Test") {
220-
sc = new SparkContext("local", "ClusterSchedulerSuite")
221-
val clusterScheduler = new TaskSchedulerImpl(sc)
220+
sc = new SparkContext("local", "TaskSchedulerImplSuite")
221+
val taskScheduler = new TaskSchedulerImpl(sc)
222222
var tasks = ArrayBuffer[Task[_]]()
223223
val task = new FakeTask(0)
224224
tasks += task
@@ -240,23 +240,23 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
240240
pool1.addSchedulable(pool10)
241241
pool1.addSchedulable(pool11)
242242

243-
val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, clusterScheduler, taskSet)
244-
val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, clusterScheduler, taskSet)
243+
val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, taskScheduler, taskSet)
244+
val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, taskScheduler, taskSet)
245245
pool00.addSchedulable(taskSetManager000)
246246
pool00.addSchedulable(taskSetManager001)
247247

248-
val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, clusterScheduler, taskSet)
249-
val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, clusterScheduler, taskSet)
248+
val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, taskScheduler, taskSet)
249+
val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, taskScheduler, taskSet)
250250
pool01.addSchedulable(taskSetManager010)
251251
pool01.addSchedulable(taskSetManager011)
252252

253-
val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, clusterScheduler, taskSet)
254-
val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, clusterScheduler, taskSet)
253+
val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, taskScheduler, taskSet)
254+
val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, taskScheduler, taskSet)
255255
pool10.addSchedulable(taskSetManager100)
256256
pool10.addSchedulable(taskSetManager101)
257257

258-
val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, clusterScheduler, taskSet)
259-
val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, clusterScheduler, taskSet)
258+
val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, taskScheduler, taskSet)
259+
val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, taskScheduler, taskSet)
260260
pool11.addSchedulable(taskSetManager110)
261261
pool11.addSchedulable(taskSetManager111)
262262

core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark._
2626
import org.apache.spark.executor.TaskMetrics
2727
import org.apache.spark.util.FakeClock
2828

29-
class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler(taskScheduler) {
29+
class FakeDAGScheduler(taskScheduler: FakeTaskScheduler) extends DAGScheduler(taskScheduler) {
3030
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
3131
taskScheduler.startedTasks += taskInfo.index
3232
}
@@ -51,12 +51,12 @@ class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler
5151
}
5252

5353
/**
54-
* A mock ClusterScheduler implementation that just remembers information about tasks started and
54+
* A mock TaskSchedulerImpl implementation that just remembers information about tasks started and
5555
* feedback received from the TaskSetManagers. Note that it's important to initialize this with
5656
* a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost
5757
* to work, and these are required for locality in TaskSetManager.
5858
*/
59-
class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
59+
class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
6060
extends TaskSchedulerImpl(sc)
6161
{
6262
val startedTasks = new ArrayBuffer[Long]
@@ -87,7 +87,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
8787

8888
test("TaskSet with no preferences") {
8989
sc = new SparkContext("local", "test")
90-
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
90+
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
9191
val taskSet = createTaskSet(1)
9292
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
9393

@@ -113,7 +113,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
113113

114114
test("multiple offers with no preferences") {
115115
sc = new SparkContext("local", "test")
116-
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
116+
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
117117
val taskSet = createTaskSet(3)
118118
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
119119

@@ -144,7 +144,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
144144

145145
test("basic delay scheduling") {
146146
sc = new SparkContext("local", "test")
147-
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
147+
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
148148
val taskSet = createTaskSet(4,
149149
Seq(TaskLocation("host1", "exec1")),
150150
Seq(TaskLocation("host2", "exec2")),
@@ -188,7 +188,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
188188

189189
test("delay scheduling with fallback") {
190190
sc = new SparkContext("local", "test")
191-
val sched = new FakeClusterScheduler(sc,
191+
val sched = new FakeTaskScheduler(sc,
192192
("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
193193
val taskSet = createTaskSet(5,
194194
Seq(TaskLocation("host1")),
@@ -228,7 +228,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
228228

229229
test("delay scheduling with failed hosts") {
230230
sc = new SparkContext("local", "test")
231-
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
231+
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
232232
val taskSet = createTaskSet(3,
233233
Seq(TaskLocation("host1")),
234234
Seq(TaskLocation("host2")),
@@ -260,7 +260,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
260260

261261
test("task result lost") {
262262
sc = new SparkContext("local", "test")
263-
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
263+
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
264264
val taskSet = createTaskSet(1)
265265
val clock = new FakeClock
266266
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
@@ -277,7 +277,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
277277

278278
test("repeated failures lead to task set abortion") {
279279
sc = new SparkContext("local", "test")
280-
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
280+
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
281281
val taskSet = createTaskSet(1)
282282
val clock = new FakeClock
283283
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)

0 commit comments

Comments
 (0)