Skip to content

Commit 77d3656

Browse files
kayousterhoutmarkhamstra
authored andcommitted
[SPARK-979] Randomize order of offers.
This commit randomizes the order of resource offers to avoid scheduling all tasks on the same small set of machines. This is a much simpler solution to SPARK-979 than alteryx#7. Author: Kay Ousterhout <kayousterhout@gmail.com> Closes alteryx#27 from kayousterhout/randomize and squashes the following commits: 435d817 [Kay Ousterhout] [SPARK-979] Randomize order of offers. Conflicts: core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala
1 parent 2e15cda commit 77d3656

File tree

4 files changed

+76
-42
lines changed

4 files changed

+76
-42
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import java.util.{TimerTask, Timer}
2424
import scala.collection.mutable.ArrayBuffer
2525
import scala.collection.mutable.HashMap
2626
import scala.collection.mutable.HashSet
27+
import scala.util.Random
2728

2829
import org.apache.spark._
2930
import org.apache.spark.TaskState.TaskState
@@ -217,9 +218,11 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
217218
}
218219
}
219220

220-
// Build a list of tasks to assign to each worker
221-
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
222-
val availableCpus = offers.map(o => o.cores).toArray
221+
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
222+
val shuffledOffers = Random.shuffle(offers)
223+
// Build a list of tasks to assign to each worker.
224+
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
225+
val availableCpus = shuffledOffers.map(o => o.cores).toArray
223226
val sortedTaskSets = rootPool.getSortedTaskSetQueue()
224227
for (taskSet <- sortedTaskSets) {
225228
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
@@ -232,9 +235,9 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
232235
for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
233236
do {
234237
launchedTask = false
235-
for (i <- 0 until offers.size) {
236-
val execId = offers(i).executorId
237-
val host = offers(i).host
238+
for (i <- 0 until shuffledOffers.size) {
239+
val execId = shuffledOffers(i).executorId
240+
val host = shuffledOffers(i).host
238241
for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
239242
tasks(i) += task
240243
val tid = task.taskId

core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala

Lines changed: 43 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ import scala.collection.mutable.ArrayBuffer
2727

2828
import java.util.Properties
2929

30+
class FakeSchedulerBackend extends SchedulerBackend {
31+
def start() {}
32+
def stop() {}
33+
def reviveOffers() {}
34+
def defaultParallelism() = 1
35+
}
36+
3037
class FakeTaskSetManager(
3138
initPriority: Int,
3239
initStageId: Int,
@@ -106,7 +113,8 @@ class FakeTaskSetManager(
106113

107114
class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
108115

109-
def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): FakeTaskSetManager = {
116+
def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler,
117+
taskSet: TaskSet): FakeTaskSetManager = {
110118
new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
111119
}
112120

@@ -134,10 +142,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
134142
test("FIFO Scheduler Test") {
135143
sc = new SparkContext("local", "ClusterSchedulerSuite")
136144
val clusterScheduler = new ClusterScheduler(sc)
137-
var tasks = ArrayBuffer[Task[_]]()
138-
val task = new FakeTask(0)
139-
tasks += task
140-
val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
145+
val taskSet = FakeTask.createTaskSet(1)
141146

142147
val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
143148
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
@@ -161,10 +166,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
161166
test("Fair Scheduler Test") {
162167
sc = new SparkContext("local", "ClusterSchedulerSuite")
163168
val clusterScheduler = new ClusterScheduler(sc)
164-
var tasks = ArrayBuffer[Task[_]]()
165-
val task = new FakeTask(0)
166-
tasks += task
167-
val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
169+
val taskSet = FakeTask.createTaskSet(1)
168170

169171
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
170172
System.setProperty("spark.scheduler.allocation.file", xmlPath)
@@ -218,10 +220,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
218220
test("Nested Pool Test") {
219221
sc = new SparkContext("local", "ClusterSchedulerSuite")
220222
val clusterScheduler = new ClusterScheduler(sc)
221-
var tasks = ArrayBuffer[Task[_]]()
222-
val task = new FakeTask(0)
223-
tasks += task
224-
val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
223+
val taskSet = FakeTask.createTaskSet(1)
225224

226225
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
227226
val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
@@ -264,4 +263,35 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
264263
checkTaskSetId(rootPool, 6)
265264
checkTaskSetId(rootPool, 2)
266265
}
266+
267+
test("Scheduler does not always schedule tasks on the same workers") {
268+
sc = new SparkContext("local", "ClusterSchedulerSuite")
269+
val taskScheduler = new ClusterScheduler(sc)
270+
taskScheduler.initialize(new FakeSchedulerBackend)
271+
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
272+
var dagScheduler = new DAGScheduler(taskScheduler) {
273+
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
274+
override def executorGained(execId: String, host: String) {}
275+
}
276+
277+
val numFreeCores = 1
278+
val workerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores),
279+
new WorkerOffer("executor1", "host1", numFreeCores))
280+
// Repeatedly try to schedule a 1-task job, and make sure that it doesn't always
281+
// get scheduled on the same executor. While there is a chance this test will fail
282+
// because the task randomly gets placed on the first executor all 1000 times, the
283+
// probability of that happening is 2^-1000 (so sufficiently small to be considered
284+
// negligible).
285+
val numTrials = 1000
286+
val selectedExecutorIds = 1.to(numTrials).map { _ =>
287+
val taskSet = FakeTask.createTaskSet(1)
288+
taskScheduler.submitTasks(taskSet)
289+
val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
290+
assert(1 === taskDescriptions.length)
291+
taskDescriptions(0).executorId
292+
}
293+
var count = selectedExecutorIds.count(_ == workerOffers(0).executorId)
294+
assert(count > 0)
295+
assert(count < numTrials)
296+
}
267297
}

core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
8787
test("TaskSet with no preferences") {
8888
sc = new SparkContext("local", "test")
8989
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
90-
val taskSet = createTaskSet(1)
90+
val taskSet = FakeTask.createTaskSet(1)
9191
val manager = new ClusterTaskSetManager(sched, taskSet)
9292

9393
// Offer a host with no CPUs
@@ -113,7 +113,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
113113
test("multiple offers with no preferences") {
114114
sc = new SparkContext("local", "test")
115115
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
116-
val taskSet = createTaskSet(3)
116+
val taskSet = FakeTask.createTaskSet(3)
117117
val manager = new ClusterTaskSetManager(sched, taskSet)
118118

119119
// First three offers should all find tasks
@@ -144,7 +144,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
144144
test("basic delay scheduling") {
145145
sc = new SparkContext("local", "test")
146146
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
147-
val taskSet = createTaskSet(4,
147+
val taskSet = FakeTask.createTaskSet(4,
148148
Seq(TaskLocation("host1", "exec1")),
149149
Seq(TaskLocation("host2", "exec2")),
150150
Seq(TaskLocation("host1"), TaskLocation("host2", "exec2")),
@@ -189,7 +189,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
189189
sc = new SparkContext("local", "test")
190190
val sched = new FakeClusterScheduler(sc,
191191
("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
192-
val taskSet = createTaskSet(5,
192+
val taskSet = FakeTask.createTaskSet(5,
193193
Seq(TaskLocation("host1")),
194194
Seq(TaskLocation("host2")),
195195
Seq(TaskLocation("host2")),
@@ -228,7 +228,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
228228
test("delay scheduling with failed hosts") {
229229
sc = new SparkContext("local", "test")
230230
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
231-
val taskSet = createTaskSet(3,
231+
val taskSet = FakeTask.createTaskSet(3,
232232
Seq(TaskLocation("host1")),
233233
Seq(TaskLocation("host2")),
234234
Seq(TaskLocation("host3"))
@@ -260,7 +260,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
260260
test("task result lost") {
261261
sc = new SparkContext("local", "test")
262262
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
263-
val taskSet = createTaskSet(1)
263+
val taskSet = FakeTask.createTaskSet(1)
264264
val clock = new FakeClock
265265
val manager = new ClusterTaskSetManager(sched, taskSet, clock)
266266

@@ -277,7 +277,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
277277
test("repeated failures lead to task set abortion") {
278278
sc = new SparkContext("local", "test")
279279
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
280-
val taskSet = createTaskSet(1)
280+
val taskSet = FakeTask.createTaskSet(1)
281281
val clock = new FakeClock
282282
val manager = new ClusterTaskSetManager(sched, taskSet, clock)
283283

@@ -297,21 +297,6 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
297297
}
298298
}
299299

300-
301-
/**
302-
* Utility method to create a TaskSet, potentially setting a particular sequence of preferred
303-
* locations for each task (given as varargs) if this sequence is not empty.
304-
*/
305-
def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
306-
if (prefLocs.size != 0 && prefLocs.size != numTasks) {
307-
throw new IllegalArgumentException("Wrong number of task locations")
308-
}
309-
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
310-
new FakeTask(i, if (prefLocs.size != 0) prefLocs(i) else Nil)
311-
}
312-
new TaskSet(tasks, 0, 0, 0, null)
313-
}
314-
315300
def createTaskResult(id: Int): DirectTaskResult[Int] = {
316301
new DirectTaskResult[Int](id, mutable.Map.empty, new TaskMetrics)
317302
}

core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,26 @@
1818
package org.apache.spark.scheduler.cluster
1919

2020
import org.apache.spark.TaskContext
21-
import org.apache.spark.scheduler.{TaskLocation, Task}
21+
import org.apache.spark.scheduler.{TaskLocation, Task, TaskSet}
2222

2323
class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0) {
2424
override def runTask(context: TaskContext): Int = 0
2525

2626
override def preferredLocations: Seq[TaskLocation] = prefLocs
2727
}
28+
29+
object FakeTask {
30+
/**
31+
* Utility method to create a TaskSet, potentially setting a particular sequence of preferred
32+
* locations for each task (given as varargs) if this sequence is not empty.
33+
*/
34+
def createTaskSet(numTasks: Int, prefLocs: Seq[TaskLocation]*): TaskSet = {
35+
if (prefLocs.size != 0 && prefLocs.size != numTasks) {
36+
throw new IllegalArgumentException("Wrong number of task locations")
37+
}
38+
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
39+
new FakeTask(i, if (prefLocs.size != 0) prefLocs(i) else Nil)
40+
}
41+
new TaskSet(tasks, 0, 0, 0, null)
42+
}
43+
}

0 commit comments

Comments
 (0)