@@ -93,19 +93,16 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
9393 val taskSet = FakeTask .createTaskSet(1 )
9494 val manager = new TaskSetManager (sched, taskSet, MAX_TASK_FAILURES )
9595
96- // Offer a host with no CPUs
97- assert(manager.resourceOffer(" exec1" , " host1" , 0 , ANY ) === None )
98-
9996 // Offer a host with process-local as the constraint; this should work because the TaskSet
10097 // above won't have any locality preferences
101- val taskOption = manager.resourceOffer(" exec1" , " host1" , 2 , TaskLocality .PROCESS_LOCAL )
98+ val taskOption = manager.resourceOffer(" exec1" , " host1" , TaskLocality .PROCESS_LOCAL )
10299 assert(taskOption.isDefined)
103100 val task = taskOption.get
104101 assert(task.executorId === " exec1" )
105102 assert(sched.startedTasks.contains(0 ))
106103
107104 // Re-offer the host -- now we should get no more tasks
108- assert(manager.resourceOffer(" exec1" , " host1" , 2 , PROCESS_LOCAL ) === None )
105+ assert(manager.resourceOffer(" exec1" , " host1" , PROCESS_LOCAL ) === None )
109106
110107 // Tell it the task has finished
111108 manager.handleSuccessfulTask(0 , createTaskResult(0 ))
@@ -121,15 +118,15 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
121118
122119 // First three offers should all find tasks
123120 for (i <- 0 until 3 ) {
124- val taskOption = manager.resourceOffer(" exec1" , " host1" , 1 , PROCESS_LOCAL )
121+ val taskOption = manager.resourceOffer(" exec1" , " host1" , PROCESS_LOCAL )
125122 assert(taskOption.isDefined)
126123 val task = taskOption.get
127124 assert(task.executorId === " exec1" )
128125 }
129126 assert(sched.startedTasks.toSet === Set (0 , 1 , 2 ))
130127
131128 // Re-offer the host -- now we should get no more tasks
132- assert(manager.resourceOffer(" exec1" , " host1" , 1 , PROCESS_LOCAL ) === None )
129+ assert(manager.resourceOffer(" exec1" , " host1" , PROCESS_LOCAL ) === None )
133130
134131 // Finish the first two tasks
135132 manager.handleSuccessfulTask(0 , createTaskResult(0 ))
@@ -157,35 +154,35 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
157154 val manager = new TaskSetManager (sched, taskSet, MAX_TASK_FAILURES , clock)
158155
159156 // First offer host1, exec1: first task should be chosen
160- assert(manager.resourceOffer(" exec1" , " host1" , 1 , ANY ).get.index === 0 )
157+ assert(manager.resourceOffer(" exec1" , " host1" , ANY ).get.index === 0 )
161158
162159 // Offer host1, exec1 again: the last task, which has no prefs, should be chosen
163- assert(manager.resourceOffer(" exec1" , " host1" , 1 , ANY ).get.index === 3 )
160+ assert(manager.resourceOffer(" exec1" , " host1" , ANY ).get.index === 3 )
164161
165162 // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
166- assert(manager.resourceOffer(" exec1" , " host1" , 1 , PROCESS_LOCAL ) === None )
163+ assert(manager.resourceOffer(" exec1" , " host1" , PROCESS_LOCAL ) === None )
167164
168165 clock.advance(LOCALITY_WAIT )
169166
170167 // Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
171- assert(manager.resourceOffer(" exec1" , " host1" , 1 , PROCESS_LOCAL ) === None )
168+ assert(manager.resourceOffer(" exec1" , " host1" , PROCESS_LOCAL ) === None )
172169
173170 // Offer host1, exec1 again, at NODE_LOCAL level: we should choose task 2
174- assert(manager.resourceOffer(" exec1" , " host1" , 1 , NODE_LOCAL ).get.index == 2 )
171+ assert(manager.resourceOffer(" exec1" , " host1" , NODE_LOCAL ).get.index == 2 )
175172
176173 // Offer host1, exec1 again, at NODE_LOCAL level: nothing should get chosen
177- assert(manager.resourceOffer(" exec1" , " host1" , 1 , NODE_LOCAL ) === None )
174+ assert(manager.resourceOffer(" exec1" , " host1" , NODE_LOCAL ) === None )
178175
179176 // Offer host1, exec1 again, at ANY level: nothing should get chosen
180- assert(manager.resourceOffer(" exec1" , " host1" , 1 , ANY ) === None )
177+ assert(manager.resourceOffer(" exec1" , " host1" , ANY ) === None )
181178
182179 clock.advance(LOCALITY_WAIT )
183180
184181 // Offer host1, exec1 again, at ANY level: task 1 should get chosen
185- assert(manager.resourceOffer(" exec1" , " host1" , 1 , ANY ).get.index === 1 )
182+ assert(manager.resourceOffer(" exec1" , " host1" , ANY ).get.index === 1 )
186183
187184 // Offer host1, exec1 again, at ANY level: nothing should be chosen as we've launched all tasks
188- assert(manager.resourceOffer(" exec1" , " host1" , 1 , ANY ) === None )
185+ assert(manager.resourceOffer(" exec1" , " host1" , ANY ) === None )
189186 }
190187
191188 test(" delay scheduling with fallback" ) {
@@ -203,29 +200,29 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
203200 val manager = new TaskSetManager (sched, taskSet, MAX_TASK_FAILURES , clock)
204201
205202 // First offer host1: first task should be chosen
206- assert(manager.resourceOffer(" exec1" , " host1" , 1 , ANY ).get.index === 0 )
203+ assert(manager.resourceOffer(" exec1" , " host1" , ANY ).get.index === 0 )
207204
208205 // Offer host1 again: nothing should get chosen
209- assert(manager.resourceOffer(" exec1" , " host1" , 1 , ANY ) === None )
206+ assert(manager.resourceOffer(" exec1" , " host1" , ANY ) === None )
210207
211208 clock.advance(LOCALITY_WAIT )
212209
213210 // Offer host1 again: second task (on host2) should get chosen
214- assert(manager.resourceOffer(" exec1" , " host1" , 1 , ANY ).get.index === 1 )
211+ assert(manager.resourceOffer(" exec1" , " host1" , ANY ).get.index === 1 )
215212
216213 // Offer host1 again: third task (on host2) should get chosen
217- assert(manager.resourceOffer(" exec1" , " host1" , 1 , ANY ).get.index === 2 )
214+ assert(manager.resourceOffer(" exec1" , " host1" , ANY ).get.index === 2 )
218215
219216 // Offer host2: fifth task (also on host2) should get chosen
220- assert(manager.resourceOffer(" exec2" , " host2" , 1 , ANY ).get.index === 4 )
217+ assert(manager.resourceOffer(" exec2" , " host2" , ANY ).get.index === 4 )
221218
222219 // Now that we've launched a local task, we should no longer launch the task for host3
223- assert(manager.resourceOffer(" exec2" , " host2" , 1 , ANY ) === None )
220+ assert(manager.resourceOffer(" exec2" , " host2" , ANY ) === None )
224221
225222 clock.advance(LOCALITY_WAIT )
226223
227224 // After another delay, we can go ahead and launch that task non-locally
228- assert(manager.resourceOffer(" exec2" , " host2" , 1 , ANY ).get.index === 3 )
225+ assert(manager.resourceOffer(" exec2" , " host2" , ANY ).get.index === 3 )
229226 }
230227
231228 test(" delay scheduling with failed hosts" ) {
@@ -240,24 +237,24 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
240237 val manager = new TaskSetManager (sched, taskSet, MAX_TASK_FAILURES , clock)
241238
242239 // First offer host1: first task should be chosen
243- assert(manager.resourceOffer(" exec1" , " host1" , 1 , ANY ).get.index === 0 )
240+ assert(manager.resourceOffer(" exec1" , " host1" , ANY ).get.index === 0 )
244241
245242 // Offer host1 again: third task should be chosen immediately because host3 is not up
246- assert(manager.resourceOffer(" exec1" , " host1" , 1 , ANY ).get.index === 2 )
243+ assert(manager.resourceOffer(" exec1" , " host1" , ANY ).get.index === 2 )
247244
248245 // After this, nothing should get chosen
249- assert(manager.resourceOffer(" exec1" , " host1" , 1 , ANY ) === None )
246+ assert(manager.resourceOffer(" exec1" , " host1" , ANY ) === None )
250247
251248 // Now mark host2 as dead
252249 sched.removeExecutor(" exec2" )
253250 manager.executorLost(" exec2" , " host2" )
254251
255252 // Task 1 should immediately be launched on host1 because its original host is gone
256- assert(manager.resourceOffer(" exec1" , " host1" , 1 , ANY ).get.index === 1 )
253+ assert(manager.resourceOffer(" exec1" , " host1" , ANY ).get.index === 1 )
257254
258255 // Now that all tasks have launched, nothing new should be launched anywhere else
259- assert(manager.resourceOffer(" exec1" , " host1" , 1 , ANY ) === None )
260- assert(manager.resourceOffer(" exec2" , " host2" , 1 , ANY ) === None )
256+ assert(manager.resourceOffer(" exec1" , " host1" , ANY ) === None )
257+ assert(manager.resourceOffer(" exec2" , " host2" , ANY ) === None )
261258 }
262259
263260 test(" task result lost" ) {
@@ -267,14 +264,14 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
267264 val clock = new FakeClock
268265 val manager = new TaskSetManager (sched, taskSet, MAX_TASK_FAILURES , clock)
269266
270- assert(manager.resourceOffer(" exec1" , " host1" , 1 , ANY ).get.index === 0 )
267+ assert(manager.resourceOffer(" exec1" , " host1" , ANY ).get.index === 0 )
271268
272269 // Tell it the task has finished but the result was lost.
273270 manager.handleFailedTask(0 , TaskState .FINISHED , TaskResultLost )
274271 assert(sched.endedTasks(0 ) === TaskResultLost )
275272
276273 // Re-offer the host -- now we should get task 0 again.
277- assert(manager.resourceOffer(" exec1" , " host1" , 1 , ANY ).get.index === 0 )
274+ assert(manager.resourceOffer(" exec1" , " host1" , ANY ).get.index === 0 )
278275 }
279276
280277 test(" repeated failures lead to task set abortion" ) {
@@ -287,7 +284,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
287284 // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
288285 // after the last failure.
289286 (1 to manager.maxTaskFailures).foreach { index =>
290- val offerResult = manager.resourceOffer(" exec1" , " host1" , 1 , ANY )
287+ val offerResult = manager.resourceOffer(" exec1" , " host1" , ANY )
291288 assert(offerResult.isDefined,
292289 " Expect resource offer on iteration %s to return a task" .format(index))
293290 assert(offerResult.get.index === 0 )
@@ -317,7 +314,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
317314 val manager = new TaskSetManager (sched, taskSet, 4 , clock)
318315
319316 {
320- val offerResult = manager.resourceOffer(" exec1" , " host1" , 1 , TaskLocality .PROCESS_LOCAL )
317+ val offerResult = manager.resourceOffer(" exec1" , " host1" , TaskLocality .PROCESS_LOCAL )
321318 assert(offerResult.isDefined, " Expect resource offer to return a task" )
322319
323320 assert(offerResult.get.index === 0 )
@@ -328,15 +325,15 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
328325 assert(! sched.taskSetsFailed.contains(taskSet.id))
329326
330327 // Ensure scheduling on exec1 fails after failure 1 due to blacklist
331- assert(manager.resourceOffer(" exec1" , " host1" , 1 , TaskLocality .PROCESS_LOCAL ).isEmpty)
332- assert(manager.resourceOffer(" exec1" , " host1" , 1 , TaskLocality .NODE_LOCAL ).isEmpty)
333- assert(manager.resourceOffer(" exec1" , " host1" , 1 , TaskLocality .RACK_LOCAL ).isEmpty)
334- assert(manager.resourceOffer(" exec1" , " host1" , 1 , TaskLocality .ANY ).isEmpty)
328+ assert(manager.resourceOffer(" exec1" , " host1" , TaskLocality .PROCESS_LOCAL ).isEmpty)
329+ assert(manager.resourceOffer(" exec1" , " host1" , TaskLocality .NODE_LOCAL ).isEmpty)
330+ assert(manager.resourceOffer(" exec1" , " host1" , TaskLocality .RACK_LOCAL ).isEmpty)
331+ assert(manager.resourceOffer(" exec1" , " host1" , TaskLocality .ANY ).isEmpty)
335332 }
336333
337334 // Run the task on exec1.1 - should work, and then fail it on exec1.1
338335 {
339- val offerResult = manager.resourceOffer(" exec1.1" , " host1" , 1 , TaskLocality .NODE_LOCAL )
336+ val offerResult = manager.resourceOffer(" exec1.1" , " host1" , TaskLocality .NODE_LOCAL )
340337 assert(offerResult.isDefined,
341338 " Expect resource offer to return a task for exec1.1, offerResult = " + offerResult)
342339
@@ -348,12 +345,12 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
348345 assert(! sched.taskSetsFailed.contains(taskSet.id))
349346
350347 // Ensure scheduling on exec1.1 fails after failure 2 due to blacklist
351- assert(manager.resourceOffer(" exec1.1" , " host1" , 1 , TaskLocality .NODE_LOCAL ).isEmpty)
348+ assert(manager.resourceOffer(" exec1.1" , " host1" , TaskLocality .NODE_LOCAL ).isEmpty)
352349 }
353350
354351 // Run the task on exec2 - should work, and then fail it on exec2
355352 {
356- val offerResult = manager.resourceOffer(" exec2" , " host2" , 1 , TaskLocality .ANY )
353+ val offerResult = manager.resourceOffer(" exec2" , " host2" , TaskLocality .ANY )
357354 assert(offerResult.isDefined, " Expect resource offer to return a task" )
358355
359356 assert(offerResult.get.index === 0 )
@@ -364,20 +361,20 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
364361 assert(! sched.taskSetsFailed.contains(taskSet.id))
365362
366363 // Ensure scheduling on exec2 fails after failure 3 due to blacklist
367- assert(manager.resourceOffer(" exec2" , " host2" , 1 , TaskLocality .ANY ).isEmpty)
364+ assert(manager.resourceOffer(" exec2" , " host2" , TaskLocality .ANY ).isEmpty)
368365 }
369366
370367 // After reschedule delay, scheduling on exec1 should be possible.
371368 clock.advance(rescheduleDelay)
372369
373370 {
374- val offerResult = manager.resourceOffer(" exec1" , " host1" , 1 , TaskLocality .PROCESS_LOCAL )
371+ val offerResult = manager.resourceOffer(" exec1" , " host1" , TaskLocality .PROCESS_LOCAL )
375372 assert(offerResult.isDefined, " Expect resource offer to return a task" )
376373
377374 assert(offerResult.get.index === 0 )
378375 assert(offerResult.get.executorId === " exec1" )
379376
380- assert(manager.resourceOffer(" exec1" , " host1" , 1 , TaskLocality .PROCESS_LOCAL ).isEmpty)
377+ assert(manager.resourceOffer(" exec1" , " host1" , TaskLocality .PROCESS_LOCAL ).isEmpty)
381378
382379 // Cause exec1 to fail : failure 4
383380 manager.handleFailedTask(offerResult.get.taskId, TaskState .FINISHED , TaskResultLost )
0 commit comments