Skip to content

Commit 43f679f

Browse files
ashangitWilliam Montaz
authored andcommitted
Merge pull request apache#66 from jcuquemelle/SPARK-22683-criteo2.2
[SPARK-22683][CORE] Allow tuning the number of dynamically allocated executors
1 parent 1e15998 commit 43f679f

File tree

3 files changed

+52
-1
lines changed

3 files changed

+52
-1
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,9 +116,13 @@ private[spark] class ExecutorAllocationManager(
116116
// TODO: The default value of 1 for spark.executor.cores works right now because dynamic
117117
// allocation is only supported for YARN and the default number of cores per executor in YARN is
118118
// 1, but it might need to be attained differently for different cluster managers
119-
private val tasksPerExecutor =
119+
private val taskSlotPerExecutor =
120120
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)
121121

122+
private val tasksPerExecutorSlot = conf.getInt("spark.dynamicAllocation.tasksPerExecutorSlot", 1)
123+
124+
private val tasksPerExecutor = tasksPerExecutorSlot * taskSlotPerExecutor
125+
122126
validateSettings()
123127

124128
// Number of executors to add in the next round

core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,34 @@ class ExecutorAllocationManagerSuite
145145
assert(numExecutorsToAdd(manager) === 1)
146146
}
147147

148+
test("tasksPerExecutorSlot is correctly handled") {
149+
val conf = new SparkConf()
150+
.setMaster("myDummyLocalExternalClusterManager")
151+
.setAppName("test-executor-allocation-manager")
152+
.set("spark.dynamicAllocation.enabled", "true")
153+
.set("spark.dynamicAllocation.testing", "true")
154+
155+
val sc0 = new SparkContext(conf)
156+
contexts += sc0
157+
var manager = sc0.executorAllocationManager.get
158+
assert(tasksPerExecutor(manager) === 1)
159+
sc0.stop()
160+
161+
val conf1 = conf.clone.set("spark.dynamicAllocation.tasksPerExecutorSlot", "2")
162+
val sc1 = new SparkContext(conf1)
163+
contexts += sc1
164+
manager = sc1.executorAllocationManager.get
165+
assert(tasksPerExecutor(manager) === 2)
166+
sc1.stop()
167+
168+
val conf2 = conf1.clone.set("spark.executor.cores", "2")
169+
val sc2 = new SparkContext(conf2)
170+
contexts += sc2
171+
manager = sc2.executorAllocationManager.get
172+
assert(tasksPerExecutor(manager) === 4)
173+
sc2.stop()
174+
}
175+
148176
test("add executors capped by num pending tasks") {
149177
sc = createSparkContext(0, 10, 0)
150178
val manager = sc.executorAllocationManager.get
@@ -1193,6 +1221,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
11931221
private val _onExecutorBusy = PrivateMethod[Unit]('onExecutorBusy)
11941222
private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks)
11951223
private val _hostToLocalTaskCount = PrivateMethod[Map[String, Int]]('hostToLocalTaskCount)
1224+
private val _tasksPerExecutor = PrivateMethod[Int]('tasksPerExecutor)
11961225
private val _onSpeculativeTaskSubmitted = PrivateMethod[Unit]('onSpeculativeTaskSubmitted)
11971226
private val _totalRunningTasks = PrivateMethod[Int]('totalRunningTasks)
11981227

@@ -1285,6 +1314,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
12851314
private def hostToLocalTaskCount(manager: ExecutorAllocationManager): Map[String, Int] = {
12861315
manager invokePrivate _hostToLocalTaskCount()
12871316
}
1317+
1318+
private def tasksPerExecutor(manager: ExecutorAllocationManager): Int = {
1319+
manager invokePrivate _tasksPerExecutor()
1320+
}
12881321
}
12891322

12901323
/**

docs/configuration.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1767,6 +1767,7 @@ Apart from these, the following properties are also available, and may be useful
17671767
<code>spark.dynamicAllocation.minExecutors</code>,
17681768
<code>spark.dynamicAllocation.maxExecutors</code>, and
17691769
<code>spark.dynamicAllocation.initialExecutors</code>
1770+
<code>spark.dynamicAllocation.tasksPerExecutorSlots</code>
17701771
</td>
17711772
</tr>
17721773
<tr>
@@ -1811,6 +1812,19 @@ Apart from these, the following properties are also available, and may be useful
18111812
Lower bound for the number of executors if dynamic allocation is enabled.
18121813
</td>
18131814
</tr>
1815+
<tr>
1816+
<td><code>spark.dynamicAllocation.tasksPerSlot</code></td>
1817+
<td>1</td>
1818+
<td>
1819+
Each executor can process a certain number of tasks in parallel (task slots).
1820+
The number of task slots per executor is: executor.cores / task.cpus.
1821+
The ExecutorAllocationManager will set a target number of running executors equal to:
1822+
nbCurrentTask / (taskSlots * tasksPerSlot), with nbCurrentTask being the total number
1823+
of running and backlogged tasks. With the default value of 1, each available task slot
1824+
will compute a single task in average, which gives the best latency. With small tasks
1825+
however, this setting wastes a lot of resources due to executor allocation overhead
1826+
</td>
1827+
</tr>
18141828
<tr>
18151829
<td><code>spark.dynamicAllocation.schedulerBacklogTimeout</code></td>
18161830
<td>1s</td>

0 commit comments

Comments
 (0)