@@ -19,8 +19,6 @@ package org.apache.spark.scheduler.local
19
19
20
20
import java .nio .ByteBuffer
21
21
22
- import scala .concurrent .duration ._
23
-
24
22
import akka .actor .{Actor , ActorRef , Props }
25
23
26
24
import org .apache .spark .{Logging , SparkContext , SparkEnv , TaskState }
@@ -48,8 +46,6 @@ private[spark] class LocalActor(
48
46
private val totalCores : Int )
49
47
extends Actor with ActorLogReceive with Logging {
50
48
51
- import context .dispatcher // to use Akka's scheduler.scheduleOnce()
52
-
53
49
private var freeCores = totalCores
54
50
55
51
private val localExecutorId = SparkContext .DRIVER_IDENTIFIER
@@ -78,16 +74,11 @@ private[spark] class LocalActor(
78
74
79
75
def reviveOffers () {
80
76
val offers = Seq (new WorkerOffer (localExecutorId, localExecutorHostname, freeCores))
81
- val tasks = scheduler.resourceOffers(offers).flatten
82
- for (task <- tasks) {
77
+ for (task <- scheduler.resourceOffers(offers).flatten) {
83
78
freeCores -= scheduler.CPUS_PER_TASK
84
79
executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
85
80
task.name, task.serializedTask)
86
81
}
87
- if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
88
- // Try to reviveOffer after 1 second, because scheduler may wait for locality timeout
89
- context.system.scheduler.scheduleOnce(1000 millis, self, ReviveOffers )
90
- }
91
82
}
92
83
}
93
84
0 commit comments