Skip to content

Commit

Permalink
[SPARK-31378][CORE] stage level scheduling dynamic allocation bug wit…
Browse files Browse the repository at this point in the history
…h initial num executors

### What changes were proposed in this pull request?

I found a bug in the stage level scheduling dynamic allocation code when you have a non default profile and it has an initial number of executors the same as what the number of executors needed for the first job, then we don't properly request the executors.  This causes a hang.

The issue is that when a new stage is added and the initial number of executors is set, we set the target to be the initial number.  Unfortunately that makes the code in the update and sync function think it has already requested that number.  So to fix this, when there is an initial number we just go ahead and request executors at that point. This is basically what happens on startup to handle the case with the default profile.

### Why are the changes needed?

bug

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

unit test and manually test on yarn cluster. Went though multiple scenarios initial numbers, minimum number and number executor required by the first stage.

Closes apache#28146 from tgravescs/SPARK-31378.

Authored-by: Thomas Graves <tgraves@nvidia.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
tgravescs authored and dongjoon-hyun committed Apr 7, 2020
1 parent 8f010bd commit 30f1866
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -645,7 +645,6 @@ private[spark] class ExecutorAllocationManager(
resourceProfileIdToStageAttempt.getOrElseUpdate(
profId, new mutable.HashSet[StageAttempt]) += stageAttempt
numExecutorsToAddPerResourceProfileId.getOrElseUpdate(profId, 1)
numExecutorsTargetPerResourceProfileId.getOrElseUpdate(profId, initialNumExecutors)

// Compute the number of tasks requested by the stage on each host
var numTasksPending = 0
Expand All @@ -661,9 +660,20 @@ private[spark] class ExecutorAllocationManager(
}
stageAttemptToExecutorPlacementHints.put(stageAttempt,
(numTasksPending, hostToLocalTaskCountPerStage.toMap, profId))

// Update the executor placement hints
updateExecutorPlacementHints()

if (!numExecutorsTargetPerResourceProfileId.contains(profId)) {
numExecutorsTargetPerResourceProfileId.put(profId, initialNumExecutors)
if (initialNumExecutors > 0) {
logDebug(s"requesting executors, rpId: $profId, initial number is $initialNumExecutors")
// we need to trigger a schedule since we add an initial number here.
client.requestTotalExecutors(
numExecutorsTargetPerResourceProfileId.toMap,
numLocalityAwareTasksPerResourceProfileId.toMap,
rpIdToHostToLocalTaskCount)
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable

import org.mockito.ArgumentMatchers.{any, eq => meq}
import org.mockito.Mockito.{mock, never, verify, when}
import org.mockito.Mockito.{mock, never, times, verify, when}
import org.scalatest.PrivateMethodTester

import org.apache.spark.executor.ExecutorMetrics
Expand Down Expand Up @@ -265,6 +265,26 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
assert(numExecutorsTarget(manager, rprof1.id) === 10)
}

test("add executors multiple profiles initial num same as needed") {
// test when the initial number of executors equals the number needed for the first
// stage using a non default profile to make sure we request the intitial number
// properly. Here initial is 2, each executor in ResourceProfile 1 can have 2 tasks
// per executor, and start a stage with 4 tasks, which would need 2 executors.
val clock = new ManualClock(8888L)
val manager = createManager(createConf(0, 10, 2), clock)
val rp1 = new ResourceProfileBuilder()
val execReqs = new ExecutorResourceRequests().cores(2).resource("gpu", 2)
val taskReqs = new TaskResourceRequests().cpus(1).resource("gpu", 1)
rp1.require(execReqs).require(taskReqs)
val rprof1 = rp1.build
rpManager.addResourceProfile(rprof1)
when(client.requestTotalExecutors(any(), any(), any())).thenReturn(true)
post(SparkListenerStageSubmitted(createStageInfo(1, 4, rp = rprof1)))
// called once on start and a second time on stage submit with initial number
verify(client, times(2)).requestTotalExecutors(any(), any(), any())
assert(numExecutorsTarget(manager, rprof1.id) === 2)
}

test("remove executors multiple profiles") {
val manager = createManager(createConf(5, 10, 5))
val rp1 = new ResourceProfileBuilder()
Expand Down

0 comments on commit 30f1866

Please sign in to comment.