Skip to content

Commit 7edee34

Browse files
pwendellmengxr
authored andcommitted
[SPARK-1112, 2156] (0.9 edition) Use correct akka frame size and overhead amounts.
backport #1172 to branch-0.9. Author: Patrick Wendell <pwendell@gmail.com> Closes #1455 from mengxr/akka-fix-0.9 and squashes the following commits: a99f201 [Patrick Wendell] backport PR #1172 to branch-0.9
1 parent 0116dee commit 7edee34

File tree

7 files changed

+35
-19
lines changed

7 files changed

+35
-19
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ private[spark] class CoarseGrainedExecutorBackend(
3434
driverUrl: String,
3535
executorId: String,
3636
hostPort: String,
37-
cores: Int)
37+
cores: Int,
38+
actorSystem: ActorSystem)
3839
extends Actor
3940
with ExecutorBackend
4041
with Logging {
@@ -93,6 +94,9 @@ private[spark] class CoarseGrainedExecutorBackend(
9394
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
9495
driver ! StatusUpdate(executorId, taskId, state, data)
9596
}
97+
98+
override def akkaFrameSize() = actorSystem.settings.config.getBytes(
99+
"akka.remote.netty.tcp.maximum-frame-size")
96100
}
97101

98102
private[spark] object CoarseGrainedExecutorBackend {
@@ -110,7 +114,8 @@ private[spark] object CoarseGrainedExecutorBackend {
110114
// set it
111115
val sparkHostPort = hostname + ":" + boundPort
112116
actorSystem.actorOf(
113-
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
117+
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores,
118+
actorSystem),
114119
name = "Executor")
115120
workerUrl.foreach {
116121
url =>

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -117,10 +117,6 @@ private[spark] class Executor(
117117
}
118118
}
119119

120-
// Akka's message frame size. If task result is bigger than this, we use the block manager
121-
// to send the result back.
122-
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)
123-
124120
// Start worker thread pool
125121
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
126122

@@ -232,8 +228,10 @@ private[spark] class Executor(
232228
val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
233229
val serializedDirectResult = ser.serialize(directResult)
234230
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)
231+
235232
val serializedResult = {
236-
if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
233+
if (serializedDirectResult.limit >= execBackend.akkaFrameSize() -
234+
AkkaUtils.reservedSizeBytes) {
237235
logInfo("Storing result for " + taskId + " in local BlockManager")
238236
val blockId = TaskResultBlockId(taskId)
239237
env.blockManager.putBytes(

core/src/main/scala/org/apache/spark/executor/ExecutorBackend.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,7 @@ import org.apache.spark.TaskState.TaskState
2525
*/
2626
private[spark] trait ExecutorBackend {
2727
def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
28+
29+
// Exists as a work around for SPARK-1112. This only exists in branch-1.x of Spark.
30+
def akkaFrameSize(): Long = Long.MaxValue
2831
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
142142
for (task <- tasks.flatten) {
143143
val ser = SparkEnv.get.closureSerializer.newInstance()
144144
val serializedTask = ser.serialize(task)
145-
if (serializedTask.limit >= akkaFrameSize - 1024) {
145+
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
146146
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
147147
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
148148
try {

core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ package org.apache.spark.scheduler.local
2020
import java.nio.ByteBuffer
2121

2222
import akka.actor.{Actor, ActorRef, Props}
23-
24-
import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
23+
import org.apache.spark.{Logging, SparkEnv, TaskState}
2524
import org.apache.spark.TaskState.TaskState
2625
import org.apache.spark.executor.{Executor, ExecutorBackend}
2726
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
27+
import org.apache.spark.util.AkkaUtils
2828

2929
private case class ReviveOffers()
3030

@@ -106,4 +106,8 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
106106
override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
107107
localActor ! StatusUpdate(taskId, state, serializedData)
108108
}
109+
110+
// This limit is calculated only to preserve expected behavior in tests. In reality, since this
111+
// backend sends messages over the existing actor system, there is no need to enforce a limit.
112+
override def akkaFrameSize() = AkkaUtils.maxFrameSizeBytes(scheduler.sc.getConf)
109113
}

core/src/main/scala/org/apache/spark/util/AkkaUtils.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,4 +115,7 @@ private[spark] object AkkaUtils {
115115
def maxFrameSizeBytes(conf: SparkConf): Int = {
116116
conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024
117117
}
118+
119+
/** Space reserved for extra data in an Akka message besides serialized task or task result. */
120+
val reservedSizeBytes = 200 * 1024
118121
}

core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
161161

162162
test("remote fetch exceeds akka frame size") {
163163
val newConf = new SparkConf
164-
newConf.set("spark.akka.frameSize", "1")
165164
newConf.set("spark.akka.askTimeout", "1") // Fail fast
166165

167166
val masterTracker = new MapOutputTrackerMaster(conf)
@@ -170,14 +169,18 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
170169
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
171170
val masterActor = actorRef.underlyingActor
172171

173-
// Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception.
174-
// Note that the size is hand-selected here because map output statuses are compressed before
175-
// being sent.
176-
masterTracker.registerShuffle(20, 100)
177-
(0 until 100).foreach { i =>
178-
masterTracker.registerMapOutput(20, i, new MapStatus(
179-
BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0)))
172+
// Frame size should be 2 * the configured frame size, and MapOutputTrackerMasterActor should
173+
// throw exception.
174+
val shuffleId = 20
175+
val numMaps = 2
176+
val data = new Array[Byte](AkkaUtils.maxFrameSizeBytes(conf))
177+
val random = new java.util.Random(0)
178+
random.nextBytes(data) // Make it hard to compress.
179+
masterTracker.registerShuffle(shuffleId, numMaps)
180+
(0 until numMaps).foreach { i =>
181+
masterTracker.registerMapOutput(shuffleId, i, new MapStatus(
182+
BlockManagerId("999", "mps", 1000, 0), data))
180183
}
181-
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
184+
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(shuffleId)) }
182185
}
183186
}

0 commit comments

Comments
 (0)