Skip to content

Commit a163574

Browse files
coolfroodJoshRosen
authored andcommitted
[SPARK-7795] [CORE] Speed up task scheduling in standalone mode by reusing serializer
My experiments with scheduling very short tasks in standalone cluster mode indicated that a significant amount of time was being spent in scheduling the tasks (>500ms for 256 tasks). I found that most of the time was being spent in creating a new instance of serializer for each task. Changing this to just one serializer brought down the scheduling time to 8ms. Author: Akshat Aranya <aaranya@quantcast.com> Closes #6323 from coolfrood/master and squashes the following commits: 12d8c9e [Akshat Aranya] Reduce visibility of serializer bd4a5dd [Akshat Aranya] Style fix 0b8ca93 [Akshat Aranya] Incorporate review comments fe530cd [Akshat Aranya] Speed up task scheduling in standalone mode by reusing serializer instead of creating a new one for each task.
1 parent 63a5ce7 commit a163574

File tree

1 file changed

+8
-4
lines changed

1 file changed

+8
-4
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
6969
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
7070
extends ThreadSafeRpcEndpoint with Logging {
7171

72+
// If this DriverEndpoint is changed to support multiple threads,
73+
// then this may need to be changed so that we don't share the serializer
74+
// instance across threads
75+
private val ser = SparkEnv.get.closureSerializer.newInstance()
76+
7277
override protected def log = CoarseGrainedSchedulerBackend.this.log
7378

7479
private val addressToExecutorId = new HashMap[RpcAddress, String]
@@ -163,7 +168,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
163168
}
164169

165170
// Make fake resource offers on all executors
166-
def makeOffers() {
171+
private def makeOffers() {
167172
launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
168173
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
169174
}.toSeq))
@@ -175,16 +180,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
175180
}
176181

177182
// Make fake resource offers on just one executor
178-
def makeOffers(executorId: String) {
183+
private def makeOffers(executorId: String) {
179184
val executorData = executorDataMap(executorId)
180185
launchTasks(scheduler.resourceOffers(
181186
Seq(new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores))))
182187
}
183188

184189
// Launch tasks returned by a set of resource offers
185-
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
190+
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
186191
for (task <- tasks.flatten) {
187-
val ser = SparkEnv.get.closureSerializer.newInstance()
188192
val serializedTask = ser.serialize(task)
189193
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
190194
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)

0 commit comments

Comments
 (0)