Skip to content

[SPARK-1112, 2156] (0.9 edition) Use correct akka frame size and overhead amounts. #1455

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
executorId: String,
hostPort: String,
cores: Int)
cores: Int,
actorSystem: ActorSystem)
extends Actor
with ExecutorBackend
with Logging {
Expand Down Expand Up @@ -93,6 +94,9 @@ private[spark] class CoarseGrainedExecutorBackend(
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
driver ! StatusUpdate(executorId, taskId, state, data)
}

override def akkaFrameSize() = actorSystem.settings.config.getBytes(
"akka.remote.netty.tcp.maximum-frame-size")
}

private[spark] object CoarseGrainedExecutorBackend {
Expand All @@ -110,7 +114,8 @@ private[spark] object CoarseGrainedExecutorBackend {
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores,
actorSystem),
name = "Executor")
workerUrl.foreach {
url =>
Expand Down
8 changes: 3 additions & 5 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,6 @@ private[spark] class Executor(
}
}

// Akka's message frame size. If task result is bigger than this, we use the block manager
// to send the result back.
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

// Start worker thread pool
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")

Expand Down Expand Up @@ -232,8 +228,10 @@ private[spark] class Executor(
val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
val serializedDirectResult = ser.serialize(directResult)
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)

val serializedResult = {
if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
if (serializedDirectResult.limit >= execBackend.akkaFrameSize() -
AkkaUtils.reservedSizeBytes) {
logInfo("Storing result for " + taskId + " in local BlockManager")
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ import org.apache.spark.TaskState.TaskState
*/
private[spark] trait ExecutorBackend {
def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)

// Exists as a work around for SPARK-1112. This only exists in branch-1.x of Spark.
def akkaFrameSize(): Long = Long.MaxValue
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
for (task <- tasks.flatten) {
val ser = SparkEnv.get.closureSerializer.newInstance()
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= akkaFrameSize - 1024) {
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ package org.apache.spark.scheduler.local
import java.nio.ByteBuffer

import akka.actor.{Actor, ActorRef, Props}

import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
import org.apache.spark.{Logging, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.util.AkkaUtils

private case class ReviveOffers()

Expand Down Expand Up @@ -106,4 +106,8 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
localActor ! StatusUpdate(taskId, state, serializedData)
}

// This limit is calculated only to preserve expected behavior in tests. In reality, since this
// backend sends messages over the existing actor system, there is no need to enforce a limit.
override def akkaFrameSize() = AkkaUtils.maxFrameSizeBytes(scheduler.sc.getConf)
}
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,7 @@ private[spark] object AkkaUtils {
def maxFrameSizeBytes(conf: SparkConf): Int = {
conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024
}

/** Space reserved for extra data in an Akka message besides serialized task or task result. */
val reservedSizeBytes = 200 * 1024
}
21 changes: 12 additions & 9 deletions core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {

test("remote fetch exceeds akka frame size") {
val newConf = new SparkConf
newConf.set("spark.akka.frameSize", "1")
newConf.set("spark.akka.askTimeout", "1") // Fail fast

val masterTracker = new MapOutputTrackerMaster(conf)
Expand All @@ -170,14 +169,18 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
val masterActor = actorRef.underlyingActor

// Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception.
// Note that the size is hand-selected here because map output statuses are compressed before
// being sent.
masterTracker.registerShuffle(20, 100)
(0 until 100).foreach { i =>
masterTracker.registerMapOutput(20, i, new MapStatus(
BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0)))
// Frame size should be 2 * the configured frame size, and MapOutputTrackerMasterActor should
// throw exception.
val shuffleId = 20
val numMaps = 2
val data = new Array[Byte](AkkaUtils.maxFrameSizeBytes(conf))
val random = new java.util.Random(0)
random.nextBytes(data) // Make it hard to compress.
masterTracker.registerShuffle(shuffleId, numMaps)
(0 until numMaps).foreach { i =>
masterTracker.registerMapOutput(shuffleId, i, new MapStatus(
BlockManagerId("999", "mps", 1000, 0), data))
}
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(shuffleId)) }
}
}