Skip to content

Commit

Permalink
[SPARK-5593][Core]Replace BlockManagerListener with ExecutorListener …
Browse files Browse the repository at this point in the history
…in ExecutorAllocationListener

More strictly, in ExecutorAllocationListener, we need to replace onBlockManagerAdded, onBlockManagerRemoved with onExecutorAdded,onExecutorRemoved. because at some time, onExecutorAdded and onExecutorRemoved are more accurate to express these meanings. example at SPARK-5529, BlockManager has been removed,but executor is existed.
 andrewor14 sryza

Author: lianhuiwang <lianhuiwang09@gmail.com>

Closes #4369 from lianhuiwang/SPARK-5593 and squashes the following commits:

333367c [lianhuiwang] Replace BlockManagerListener with ExecutorListener in ExecutorAllocationListener
  • Loading branch information
lianhuiwang authored and Andrew Or committed Feb 6, 2015
1 parent 9792bec commit 6072fcc
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -486,8 +486,8 @@ private[spark] class ExecutorAllocationManager(
}
}

override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded): Unit = {
val executorId = blockManagerAdded.blockManagerId.executorId
override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
val executorId = executorAdded.executorId
if (executorId != SparkContext.DRIVER_IDENTIFIER) {
// This guards against the race condition in which the `SparkListenerTaskStart`
// event is posted before the `SparkListenerBlockManagerAdded` event, which is
Expand All @@ -498,9 +498,8 @@ private[spark] class ExecutorAllocationManager(
}
}

override def onBlockManagerRemoved(
blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId)
override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = {
allocationManager.onExecutorRemoved(executorRemoved.executorId)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable
import org.scalatest.{FunSuite, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.cluster.ExecutorInfo

/**
* Test add and remove behavior of ExecutorAllocationManager.
Expand Down Expand Up @@ -144,8 +144,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {

// Verify that running a task reduces the cap
sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3)))
sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
0L, BlockManagerId("executor-1", "host1", 1), 100L))
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
0L, "executor-1", new ExecutorInfo("host1", 1)))
sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1")))
assert(numExecutorsPending(manager) === 4)
assert(addExecutors(manager) === 1)
Expand Down Expand Up @@ -578,30 +578,28 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).isEmpty)

// New executors have registered
sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
0L, BlockManagerId("executor-1", "host1", 1), 100L))
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
0L, "executor-1", new ExecutorInfo("host1", 1)))
assert(executorIds(manager).size === 1)
assert(executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 1)
assert(removeTimes(manager).contains("executor-1"))
sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
0L, BlockManagerId("executor-2", "host2", 1), 100L))
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
0L, "executor-2", new ExecutorInfo("host2", 1)))
assert(executorIds(manager).size === 2)
assert(executorIds(manager).contains("executor-2"))
assert(removeTimes(manager).size === 2)
assert(removeTimes(manager).contains("executor-2"))

// Existing executors have disconnected
sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved(
0L, BlockManagerId("executor-1", "host1", 1)))
sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-1", ""))
assert(executorIds(manager).size === 1)
assert(!executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 1)
assert(!removeTimes(manager).contains("executor-1"))

// Unknown executor has disconnected
sc.listenerBus.postToAll(SparkListenerBlockManagerRemoved(
0L, BlockManagerId("executor-3", "host3", 1)))
sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-3", ""))
assert(executorIds(manager).size === 1)
assert(removeTimes(manager).size === 1)
}
Expand All @@ -613,8 +611,8 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).isEmpty)

sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))
sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
0L, BlockManagerId("executor-1", "host1", 1), 100L))
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
0L, "executor-1", new ExecutorInfo("host1", 1)))
assert(executorIds(manager).size === 1)
assert(executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 0)
Expand All @@ -625,16 +623,16 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
val manager = sc.executorAllocationManager.get
assert(executorIds(manager).isEmpty)
assert(removeTimes(manager).isEmpty)
sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
0L, BlockManagerId("executor-1", "host1", 1), 100L))
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
0L, "executor-1", new ExecutorInfo("host1", 1)))
sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1")))

assert(executorIds(manager).size === 1)
assert(executorIds(manager).contains("executor-1"))
assert(removeTimes(manager).size === 0)

sc.listenerBus.postToAll(SparkListenerBlockManagerAdded(
0L, BlockManagerId("executor-2", "host1", 1), 100L))
sc.listenerBus.postToAll(SparkListenerExecutorAdded(
0L, "executor-2", new ExecutorInfo("host1", 1)))
assert(executorIds(manager).size === 2)
assert(executorIds(manager).contains("executor-2"))
assert(removeTimes(manager).size === 1)
Expand Down

0 comments on commit 6072fcc

Please sign in to comment.