Skip to content

Commit 345fdd9

Browse files
wangyumGitHub Enterprise
authored andcommitted
[CARMEL-7377][CARMEL-5063] Improve spark dynamic allocation mechanism (apache#121)
1 parent d11af64 commit 345fdd9

File tree

8 files changed

+342
-15
lines changed

8 files changed

+342
-15
lines changed

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 220 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ import com.codahale.metrics.{Counter, Gauge, MetricRegistry}
2828
import org.apache.spark.internal.{config, Logging}
2929
import org.apache.spark.internal.config._
3030
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
31-
import org.apache.spark.internal.config.Tests.TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED
31+
import org.apache.spark.internal.config.Tests.{TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED, TEST_SCHEDULE_INTERVAL}
3232
import org.apache.spark.metrics.source.Source
33+
import org.apache.spark.resource.{ResourceProfile, ResourceProfileManager}
3334
import org.apache.spark.resource.ResourceProfile.UNKNOWN_RESOURCE_PROFILE_ID
34-
import org.apache.spark.resource.ResourceProfileManager
3535
import org.apache.spark.scheduler._
3636
import org.apache.spark.scheduler.dynalloc.ExecutorMonitor
3737
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
@@ -104,7 +104,8 @@ private[spark] class ExecutorAllocationManager(
104104
cleaner: Option[ContextCleaner] = None,
105105
clock: Clock = new SystemClock(),
106106
resourceProfileManager: ResourceProfileManager,
107-
reliableShuffleStorage: Boolean)
107+
reliableShuffleStorage: Boolean,
108+
taskScheduler: Option[TaskScheduler] = None)
108109
extends Logging {
109110

110111
allocationManager =>
@@ -126,6 +127,10 @@ private[spark] class ExecutorAllocationManager(
126127
// During testing, the methods to actually kill and add executors are mocked out
127128
private val testing = conf.get(DYN_ALLOCATION_TESTING)
128129

130+
private val tasksPerExecutorForFullParallelism = resourceProfileManager
131+
.resourceProfileFromId(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
132+
.maxTasksPerExecutor(conf)
133+
129134
private val executorAllocationRatio =
130135
conf.get(DYN_ALLOCATION_EXECUTOR_ALLOCATION_RATIO)
131136

@@ -151,7 +156,8 @@ private[spark] class ExecutorAllocationManager(
151156
private var addTime: Long = NOT_SET
152157

153158
// Polling loop interval (ms)
154-
private val intervalMillis: Long = 100
159+
private val intervalMillis: Long =
160+
if (Utils.isTesting) conf.get(TEST_SCHEDULE_INTERVAL) else 1000
155161

156162
// Listener for Spark events that impact the allocation policy
157163
val listener = new ExecutorAllocationListener
@@ -180,6 +186,18 @@ private[spark] class ExecutorAllocationManager(
180186
// ResourceProfile id to Host to possible task running on it, used for executor placement.
181187
private var rpIdToHostToLocalTaskCount: Map[Int, Map[String, Int]] = Map.empty
182188

189+
private val removeExecutorsPerStats = conf.get(DYN_ALLOCATION_REMOVE_EXECUTOR_PER_STATS)
190+
private val removeExecutorsPerStatsIntervalS =
191+
conf.get(DYN_ALLOCATION_REMOVE_EXECUTOR_PER_STATS_INTERVAL)
192+
// the executor number kept rate compared to system task stats
193+
private val executorNumKeepRatio = 1.3
194+
private[spark] val systemTaskStats: SystemTaskStats = conf.get(DYN_ALLOCATION_STATS_TYPE) match {
195+
case "avg" => new SystemAvgTaskStats
196+
case _ => new SystemMaxTaskStats
197+
}
198+
199+
private var nextRemoveTime: Long = -1
200+
183201
/**
184202
* Verify that the settings specified through the config are valid.
185203
* If not, throw an appropriate exception.
@@ -288,11 +306,9 @@ private[spark] class ExecutorAllocationManager(
288306
* under the current load to satisfy all running and pending tasks, rounded up.
289307
*/
290308
private[spark] def maxNumExecutorsNeededPerResourceProfile(rpId: Int): Int = {
291-
val pendingTask = listener.pendingTasksPerResourceProfile(rpId)
292309
val pendingSpeculative = listener.pendingSpeculativeTasksPerResourceProfile(rpId)
293310
val unschedulableTaskSets = listener.pendingUnschedulableTaskSetsPerResourceProfile(rpId)
294-
val running = listener.totalRunningTasksPerResourceProfile(rpId)
295-
val numRunningOrPendingTasks = pendingTask + pendingSpeculative + running
311+
val numRunningOrPendingTasks = currTotalTaskNum(Some(rpId))
296312
val rp = resourceProfileManager.resourceProfileFromId(rpId)
297313
val tasksPerExecutor = rp.maxTasksPerExecutor(conf)
298314
logDebug(s"max needed for rpId: $rpId numpending: $numRunningOrPendingTasks," +
@@ -341,13 +357,56 @@ private[spark] class ExecutorAllocationManager(
341357
initializing = false
342358
}
343359

360+
val now = clock.nanoTime()
344361
// Update executor target number only after initializing flag is unset
345-
updateAndSyncNumExecutorsTarget(clock.nanoTime())
346-
if (executorIdsToBeRemoved.nonEmpty) {
347-
removeExecutors(executorIdsToBeRemoved)
362+
updateAndSyncNumExecutorsTarget(now)
363+
if (removeExecutorsPerStats) {
364+
executorIdsToBeRemoved.head._2
365+
val currTaskNum = systemTaskStats.updateCurrTaskNum(now, currTotalTaskNum)
366+
if (currTaskNum > 0) {
367+
removeExecutorsBaseOnStats(now, currTaskNum)
368+
}
369+
} else if (executorIdsToBeRemoved.nonEmpty) {
370+
removeExecutors(executorIdsToBeRemoved, "idle timeout")
371+
}
372+
}
373+
374+
private def currTotalTaskNum(rpId: Option[Int] = None): Int = {
375+
taskScheduler.map(_.totalTasks()).getOrElse {
376+
// The total task number in listener may not be accurate
377+
// because of the possible event loss in the event queue
378+
rpId.map { id =>
379+
listener.pendingTasksPerResourceProfile(id) +
380+
listener.pendingSpeculativeTasksPerResourceProfile(id) +
381+
listener.totalRunningTasksPerResourceProfile(id)
382+
}.getOrElse(listener.totalTasks())
348383
}
349384
}
350385

386+
private def removeExecutorsBaseOnStats(now: Long, latestTotalTaskNum: Int): Unit = {
387+
if (now < nextRemoveTime) {
388+
return
389+
}
390+
nextRemoveTime = now + TimeUnit.SECONDS.toNanos(removeExecutorsPerStatsIntervalS)
391+
val consideredTotalTaskNum = systemTaskStats.lastFiveMinStat
392+
var expectedExecutorCnt = math.ceil(consideredTotalTaskNum *
393+
executorNumKeepRatio / tasksPerExecutorForFullParallelism).toInt
394+
395+
val currExecutorCnt = executorMonitor.executorCount
396+
if (expectedExecutorCnt < minNumExecutors) {
397+
expectedExecutorCnt = minNumExecutors
398+
}
399+
if (currExecutorCnt <= expectedExecutorCnt) {
400+
return
401+
}
402+
403+
val removeCnt = currExecutorCnt - expectedExecutorCnt
404+
logInfo(s"Current executor cnt:$currExecutorCnt, " +
405+
s"expected executor cnt:$expectedExecutorCnt, will remove $removeCnt executors")
406+
val removeExecutorIDs = executorMonitor.executorsWithLeastRunningTasks(removeCnt)
407+
removeExecutors(removeExecutorIDs, "less executors needed based on stats")
408+
}
409+
351410
/**
352411
* Updates our target number of executors for each ResourceProfile and then syncs the result
353412
* with the cluster manager.
@@ -533,7 +592,8 @@ private[spark] class ExecutorAllocationManager(
533592
* Request the cluster manager to remove the given executors.
534593
* Returns the list of executors which are removed.
535594
*/
536-
private def removeExecutors(executors: Seq[(String, Int)]): Seq[String] = synchronized {
595+
private def removeExecutors(
596+
executors: Seq[(String, Int)], reason: String): Seq[String] = synchronized {
537597
val executorIdsToBeRemoved = new ArrayBuffer[String]
538598
logDebug(s"Request to remove executorIds: ${executors.mkString(", ")}")
539599
val numExecutorsTotalPerRpId = mutable.Map[Int, Int]()
@@ -603,7 +663,7 @@ private[spark] class ExecutorAllocationManager(
603663
} else {
604664
executorMonitor.executorsKilled(executorsRemoved.toSeq)
605665
}
606-
logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to idle timeout.")
666+
logInfo(s"Executors ${executorsRemoved.mkString(",")} removed due to $reason.")
607667
executorsRemoved.toSeq
608668
} else {
609669
logWarning(s"Unable to reach the cluster manager to kill executor/s " +
@@ -936,6 +996,40 @@ private[spark] class ExecutorAllocationManager(
936996
}.sum
937997
}
938998

999+
/**
1000+
* An estimate of the total number of pending tasks remaining for currently running stages. Does
1001+
* not account for tasks which may have failed and been resubmitted.
1002+
*
1003+
* Note: This is not thread-safe without the caller owning the `allocationManager` lock.
1004+
*/
1005+
def pendingTasks(): Int = {
1006+
stageAttemptToNumTasks.map { case (stageAttempt, numTasks) =>
1007+
numTasks - stageAttemptToTaskIndices.get(stageAttempt).map(_.size).getOrElse(0)
1008+
}.sum
1009+
}
1010+
1011+
def pendingSpeculativeTasks(): Int = {
1012+
stageAttemptToPendingSpeculativeTasks.map { case (sa, tasks) =>
1013+
tasks.size - stageAttemptToSpeculativeTaskIndices.get(sa).map(_.size).getOrElse(0)
1014+
}.sum
1015+
}
1016+
1017+
def totalPendingTasks(): Int = {
1018+
pendingTasks + pendingSpeculativeTasks
1019+
}
1020+
1021+
/**
1022+
* The number of tasks currently running across all stages.
1023+
* Include running-but-zombie stage attempts
1024+
*/
1025+
def totalRunningTasks(): Int = {
1026+
stageAttemptToNumRunningTask.values.sum
1027+
}
1028+
1029+
def totalTasks(): Int = {
1030+
totalRunningTasks() + totalPendingTasks()
1031+
}
1032+
9391033
/**
9401034
* Update the Executor placement hints (the number of tasks with locality preferences,
9411035
* a map where each pair is a node and the number of tasks that would like to be scheduled
@@ -1011,6 +1105,9 @@ private[spark] class ExecutorAllocationManagerSource(
10111105
.map(executorAllocationManager.maxNumExecutorsNeededPerResourceProfile(_)).sum, 0)
10121106
registerGauge("numberDecommissioningExecutors",
10131107
executorAllocationManager.executorMonitor.decommissioningCount, 0)
1108+
registerGauge("lastOneMinStat", executorAllocationManager.systemTaskStats.lastOneMinStat, 0)
1109+
registerGauge("lastFiveMinStat", executorAllocationManager.systemTaskStats.lastFiveMinStat, 0)
1110+
registerGauge("currActiveCoresNum", executorAllocationManager.totalCores, 0)
10141111
}
10151112

10161113
private object ExecutorAllocationManager {
@@ -1020,3 +1117,114 @@ private object ExecutorAllocationManager {
10201117
private[spark] case class TargetNumUpdates(delta: Int, oldNumExecutorsTarget: Int)
10211118

10221119
}
1120+
1121+
trait SystemTaskStats {
1122+
def updateCurrTaskNum(now: Long, taskNumCal: Option[Int] => Int): Int
1123+
1124+
def lastOneMinStat: Int
1125+
1126+
def lastFiveMinStat: Int
1127+
1128+
def lastFifteenMinStat: Int
1129+
}
1130+
1131+
class SystemMaxTaskStats extends SystemTaskStats {
1132+
val TICK_INTERVAL: Long = TimeUnit.SECONDS.toNanos(5)
1133+
private var nextTick: Long = ExecutorAllocationManager.NOT_SET
1134+
1135+
private final val SLOT_NUM = 180
1136+
private val taskNumArray = new Array[Int](SLOT_NUM)
1137+
private var currSlot: Int = 0
1138+
1139+
def updateCurrTaskNum(now: Long, taskNumCal: Option[Int] => Int): Int = {
1140+
if (nextTick == ExecutorAllocationManager.NOT_SET) {
1141+
nextTick = now
1142+
}
1143+
if (now < nextTick) return -1
1144+
val totalTaskNum = taskNumCal(None)
1145+
nextTick += TICK_INTERVAL
1146+
taskNumArray(currSlot) = totalTaskNum
1147+
currSlot += 1
1148+
if (currSlot > (SLOT_NUM - 1)) {
1149+
currSlot = 0
1150+
}
1151+
totalTaskNum
1152+
}
1153+
1154+
def lastOneMinStat: Int = {
1155+
calMaxTaskNumInPreviousSlots(12)
1156+
}
1157+
1158+
def lastFiveMinStat: Int = {
1159+
calMaxTaskNumInPreviousSlots(60)
1160+
}
1161+
1162+
def lastFifteenMinStat: Int = {
1163+
calMaxTaskNumInPreviousSlots(180)
1164+
}
1165+
1166+
def calMaxTaskNumInPreviousSlots(n: Int): Int = {
1167+
var maxTaskNum = 0
1168+
var nextSlot = currSlot
1169+
for (_ <- 1 to n) {
1170+
nextSlot = nextSlot - 1
1171+
if (nextSlot < 0) {
1172+
nextSlot = SLOT_NUM - 1
1173+
}
1174+
if (taskNumArray(nextSlot) > maxTaskNum) {
1175+
maxTaskNum = taskNumArray(nextSlot)
1176+
}
1177+
}
1178+
maxTaskNum
1179+
}
1180+
}
1181+
1182+
/**
1183+
* Moving average like unix load average
1184+
* @see <a href="
1185+
* https://www.helpsystems.com/resources/guides/unix-load-average-part-1-how-it-works">
1186+
* UNIX Load Average Part 1: How It Works</a>
1187+
*/
1188+
class SystemAvgTaskStats extends SystemTaskStats {
1189+
val EXP_1 = 1884 /* 1/exp(5sec/1min) as fixed-point */
1190+
val EXP_5 = 2014 /* 1/exp(5sec/5min) */
1191+
val EXP_15 = 2037 /* 1/exp(5sec/15min) */
1192+
val F_SHIFT = 11
1193+
val FIXED_1 = 1 << F_SHIFT
1194+
1195+
val TICK_INTERVAL: Long = TimeUnit.SECONDS.toNanos(5)
1196+
1197+
private var nextTick: Long = ExecutorAllocationManager.NOT_SET
1198+
private var _lastOneMinAvgTaskNum: Int = _
1199+
private var _lastFiveMinAvgTaskNum: Int = _
1200+
private var _lastFifteenMinAvgTaskNum: Int = _
1201+
1202+
def updateCurrTaskNum(now: Long, taskNumCal: Option[Int] => Int): Int = {
1203+
if (nextTick == ExecutorAllocationManager.NOT_SET) {
1204+
nextTick = now
1205+
val taskNum = taskNumCal(None)
1206+
_lastOneMinAvgTaskNum = taskNum
1207+
_lastFiveMinAvgTaskNum = taskNum
1208+
_lastFifteenMinAvgTaskNum = taskNum
1209+
}
1210+
if (now < nextTick) return -1
1211+
nextTick += TICK_INTERVAL
1212+
val taskNum = taskNumCal(None)
1213+
_lastOneMinAvgTaskNum = calNewLoad(_lastOneMinAvgTaskNum, EXP_1, taskNum)
1214+
_lastFiveMinAvgTaskNum = calNewLoad(_lastFiveMinAvgTaskNum, EXP_5, taskNum)
1215+
_lastFifteenMinAvgTaskNum = calNewLoad(_lastFifteenMinAvgTaskNum, EXP_15, taskNum)
1216+
taskNum
1217+
}
1218+
1219+
def calNewLoad(load: Int, exp: Int, newVal: Int): Int = {
1220+
var newLoad: Long = load
1221+
newLoad *= exp
1222+
newLoad += newVal * (FIXED_1 - exp)
1223+
newLoad = newLoad >> F_SHIFT
1224+
newLoad.toInt
1225+
}
1226+
1227+
def lastOneMinStat: Int = _lastOneMinAvgTaskNum
1228+
def lastFiveMinStat: Int = _lastFiveMinAvgTaskNum
1229+
def lastFifteenMinStat: Int = _lastFifteenMinAvgTaskNum
1230+
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -689,7 +689,8 @@ class SparkContext(config: SparkConf) extends Logging {
689689
Some(new ExecutorAllocationManager(
690690
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
691691
cleaner = cleaner, resourceProfileManager = resourceProfileManager,
692-
reliableShuffleStorage = _shuffleDriverComponents.supportsReliableStorage()))
692+
reliableShuffleStorage = _shuffleDriverComponents.supportsReliableStorage(),
693+
taskScheduler = Some(_taskScheduler)))
693694
case _ =>
694695
None
695696
}

core/src/main/scala/org/apache/spark/internal/config/Tests.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@ private[spark] object Tests {
2626
.longConf
2727
.createWithDefault(Runtime.getRuntime.maxMemory)
2828

29+
val TEST_SCHEDULE_INTERVAL =
30+
ConfigBuilder("spark.testing.dynamicAllocation.scheduleInterval")
31+
.version("3.5.0")
32+
.longConf
33+
.createWithDefault(100)
34+
2935
val TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED =
3036
ConfigBuilder("spark.testing.dynamicAllocation.schedule.enabled")
3137
.version("3.1.0")

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -662,6 +662,24 @@ package object config {
662662
.version("1.2.0")
663663
.timeConf(TimeUnit.SECONDS).createWithDefault(1)
664664

665+
private[spark] val DYN_ALLOCATION_REMOVE_EXECUTOR_PER_STATS =
666+
ConfigBuilder("spark.dynamicAllocation.removeExecutorPerSystemStats")
667+
.version("3.5.0")
668+
.booleanConf
669+
.createWithDefault(false)
670+
671+
private[spark] val DYN_ALLOCATION_REMOVE_EXECUTOR_PER_STATS_INTERVAL =
672+
ConfigBuilder("spark.dynamicAllocation.removeExecutorPerSystemStats.interval")
673+
.version("3.5.0")
674+
.timeConf(TimeUnit.SECONDS).createWithDefault(300)
675+
676+
private[spark] val DYN_ALLOCATION_STATS_TYPE =
677+
ConfigBuilder("spark.dynamicAllocation.system.stats.type")
678+
.version("3.5.0")
679+
.stringConf
680+
.checkValues(Set("max", "avg"))
681+
.createWithDefault("max")
682+
665683
private[spark] val DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT =
666684
ConfigBuilder("spark.dynamicAllocation.sustainedSchedulerBacklogTimeout")
667685
.version("1.2.0")

core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1332,6 +1332,16 @@ private[spark] class TaskSchedulerImpl(
13321332
}
13331333
}
13341334

1335+
override def totalTasks(): Int = {
1336+
var totalTaskNum = 0
1337+
for (taskSet <- rootPool.getTaskSetQueue) {
1338+
if (!taskSet.isZombie) {
1339+
totalTaskNum += (taskSet.numTasks - taskSet.successfulTasks)
1340+
}
1341+
}
1342+
totalTaskNum
1343+
}
1344+
13351345
override def taskSummary(): TaskSummary = {
13361346
var totalTasks = 0
13371347
var runningTasks = 0

0 commit comments

Comments
 (0)