Skip to content

Commit eadc487

Browse files
author
Marcelo Vanzin
committed
[SPARK-27468][core] Track correct storage level and mem/disk usage for RDDs.
Two things are being fixed here. The first, explicitly explained in the referenced bug, is the storage level tracked for RDDs and partitions. Previously, the RDD level would change depending on the status reported by executors for the block they were storing, and individual blocks would reflect that. That is wrong because different blocks may be stored differently in different executors. So now the RDD tracks the user-provided storage level, while the individual partitions reflect the current storage level of that particular block, including the current number of replicas. The second fix is in the accounting of usage: block managers report the current mem and disk used by the block, not the change from before. So the status listener needs to track the previous usage of the blocks so that it can accurately calculate the changes. This requires a bit more memory to be used in the driver, but tests show it's not that big of a problem (a few MB for a 100k-partition RDD with all blocks cached). Some internal accounting was changed to save some memory, given the extra usage incurred by the above tracking. For reference, mem usage comparison (captured using jvisualvm) for 100k entries: - Scala HashMap[String, LiveRDDBlock]: 17MB - Scala HashMap[Int, LiveRDDBlock]: 11MB - OpenHashMap[Int, LiveRDDBlock]: 6MB - OpenHashMap[String, LiveRDDBlock]: 14MB So using an OHM when you have primitive keys saves a lot of space. When you have non-primitive keys, the savings don't add up to much, so maps that need string keys were left untouched. The unit tests were also changed to reflect the actual behavior of the block manager when sending update events to the driver.
1 parent c56a012 commit eadc487

File tree

4 files changed

+283
-116
lines changed

4 files changed

+283
-116
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 48 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ private[spark] class AppStatusListener(
4848
appStatusSource: Option[AppStatusSource] = None,
4949
lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {
5050

51+
import LiveEntityHelpers._
52+
5153
private var sparkVersion = SPARK_VERSION
5254
private var appInfo: v1.ApplicationInfo = null
5355
private var appSummary = new AppSummary(0, 0)
@@ -222,11 +224,10 @@ private[spark] class AppStatusListener(
222224
}
223225
// Remove all RDD partitions that reference the removed executor
224226
liveRDDs.values.foreach { rdd =>
225-
rdd.getPartitions.values
226-
.filter(_.executors.contains(event.executorId))
227-
.foreach { partition =>
227+
rdd.getPartitions().foreach { case (idx, partition) =>
228+
if (partition != null && partition.executors.contains(event.executorId)) {
228229
if (partition.executors.length == 1) {
229-
rdd.removePartition(partition.blockName)
230+
rdd.removePartition(idx)
230231
rdd.memoryUsed = addDeltaToValue(rdd.memoryUsed, partition.memoryUsed * -1)
231232
rdd.diskUsed = addDeltaToValue(rdd.diskUsed, partition.diskUsed * -1)
232233
} else {
@@ -235,13 +236,14 @@ private[spark] class AppStatusListener(
235236
rdd.diskUsed = addDeltaToValue(rdd.diskUsed,
236237
(partition.diskUsed / partition.executors.length) * -1)
237238
partition.update(partition.executors
238-
.filter(!_.equals(event.executorId)), rdd.storageLevel,
239+
.filter(!_.equals(event.executorId)),
239240
addDeltaToValue(partition.memoryUsed,
240241
(partition.memoryUsed / partition.executors.length) * -1),
241242
addDeltaToValue(partition.diskUsed,
242243
(partition.diskUsed / partition.executors.length) * -1))
243244
}
244245
}
246+
}
245247
update(rdd, now)
246248
}
247249
if (isExecutorActiveForLiveStages(exec)) {
@@ -782,35 +784,14 @@ private[spark] class AppStatusListener(
782784
override def onUnpersistRDD(event: SparkListenerUnpersistRDD): Unit = {
783785
liveRDDs.remove(event.rddId).foreach { liveRDD =>
784786
val storageLevel = liveRDD.info.storageLevel
785-
786-
// Use RDD partition info to update executor block info.
787-
liveRDD.getPartitions().foreach { case (_, part) =>
788-
part.executors.foreach { executorId =>
789-
liveExecutors.get(executorId).foreach { exec =>
790-
exec.rddBlocks = exec.rddBlocks - 1
791-
}
792-
}
793-
}
794-
795787
val now = System.nanoTime()
796788

797-
// Use RDD distribution to update executor memory and disk usage info.
798-
liveRDD.getDistributions().foreach { case (executorId, rddDist) =>
799-
liveExecutors.get(executorId).foreach { exec =>
800-
if (exec.hasMemoryInfo) {
801-
if (storageLevel.useOffHeap) {
802-
exec.usedOffHeap = addDeltaToValue(exec.usedOffHeap, -rddDist.offHeapUsed)
803-
} else {
804-
exec.usedOnHeap = addDeltaToValue(exec.usedOnHeap, -rddDist.onHeapUsed)
805-
}
806-
}
807-
exec.memoryUsed = addDeltaToValue(exec.memoryUsed, -rddDist.memoryUsed)
808-
exec.diskUsed = addDeltaToValue(exec.diskUsed, -rddDist.diskUsed)
789+
liveExecutors.values.foreach { exec =>
790+
if (exec.cleanupRDD(event.rddId, storageLevel.useOffHeap)) {
809791
maybeUpdate(exec, now)
810792
}
811793
}
812794
}
813-
814795
kvstore.delete(classOf[RDDStorageInfoWrapper], event.rddId)
815796
}
816797

@@ -902,76 +883,35 @@ private[spark] class AppStatusListener(
902883
.sortBy(_.stageId)
903884
}
904885

905-
/**
906-
* Apply a delta to a value, but ensure that it doesn't go negative.
907-
*/
908-
private def addDeltaToValue(old: Long, delta: Long): Long = math.max(0, old + delta)
909-
910886
private def updateRDDBlock(event: SparkListenerBlockUpdated, block: RDDBlockId): Unit = {
911887
val now = System.nanoTime()
912888
val executorId = event.blockUpdatedInfo.blockManagerId.executorId
913889

914-
// Whether values are being added to or removed from the existing accounting.
915-
val storageLevel = event.blockUpdatedInfo.storageLevel
916-
val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1)
917-
val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1)
918-
919-
val updatedStorageLevel = if (storageLevel.isValid) {
920-
Some(storageLevel.description)
921-
} else {
922-
None
923-
}
924-
925-
// We need information about the executor to update some memory accounting values in the
926-
// RDD info, so read that beforehand.
927-
val maybeExec = liveExecutors.get(executorId)
928-
var rddBlocksDelta = 0
929-
930-
// Update the executor stats first, since they are used to calculate the free memory
931-
// on tracked RDD distributions.
932-
maybeExec.foreach { exec =>
933-
updateExecutorMemoryDiskInfo(exec, storageLevel, memoryDelta, diskDelta)
934-
}
935-
936-
// Update the block entry in the RDD info, keeping track of the deltas above so that we
937-
// can update the executor information too.
938890
liveRDDs.get(block.rddId).foreach { rdd =>
939-
if (updatedStorageLevel.isDefined) {
940-
rdd.setStorageLevel(updatedStorageLevel.get)
941-
}
942-
943-
val partition = rdd.partition(block.name)
944-
945-
val executors = if (updatedStorageLevel.isDefined) {
946-
val current = partition.executors
947-
if (current.contains(executorId)) {
948-
current
891+
val isBlockCached = event.blockUpdatedInfo.storageLevel.isValid
892+
var diskDelta = event.blockUpdatedInfo.diskSize
893+
var memoryDelta = event.blockUpdatedInfo.memSize
894+
895+
liveExecutors.get(executorId).foreach { exec =>
896+
val oldBlock = if (isBlockCached) {
897+
exec.addBlock(block, event.blockUpdatedInfo.diskSize, event.blockUpdatedInfo.memSize,
898+
rdd.info.storageLevel.useOffHeap)
949899
} else {
950-
rddBlocksDelta = 1
951-
current :+ executorId
900+
exec.removeBlock(block, rdd.info.storageLevel.useOffHeap)
952901
}
953-
} else {
954-
rddBlocksDelta = -1
955-
partition.executors.filter(_ != executorId)
956-
}
957902

958-
// Only update the partition if it's still stored in some executor, otherwise get rid of it.
959-
if (executors.nonEmpty) {
960-
partition.update(executors, rdd.storageLevel,
961-
addDeltaToValue(partition.memoryUsed, memoryDelta),
962-
addDeltaToValue(partition.diskUsed, diskDelta))
963-
} else {
964-
rdd.removePartition(block.name)
965-
}
903+
if (oldBlock != null) {
904+
diskDelta -= oldBlock.diskSize
905+
memoryDelta -= oldBlock.memSize
906+
}
966907

967-
maybeExec.foreach { exec =>
968-
if (exec.rddBlocks + rddBlocksDelta > 0) {
908+
if (exec.hasRDDData(block.rddId)) {
969909
val dist = rdd.distribution(exec)
970910
dist.memoryUsed = addDeltaToValue(dist.memoryUsed, memoryDelta)
971911
dist.diskUsed = addDeltaToValue(dist.diskUsed, diskDelta)
972912

973913
if (exec.hasMemoryInfo) {
974-
if (storageLevel.useOffHeap) {
914+
if (rdd.info.storageLevel.useOffHeap) {
975915
dist.offHeapUsed = addDeltaToValue(dist.offHeapUsed, memoryDelta)
976916
} else {
977917
dist.onHeapUsed = addDeltaToValue(dist.onHeapUsed, memoryDelta)
@@ -991,18 +931,36 @@ private[spark] class AppStatusListener(
991931
}
992932
}
993933
}
934+
935+
maybeUpdate(exec, now)
936+
}
937+
938+
val partition = rdd.partition(block, isBlockCached)
939+
940+
val executors = if (isBlockCached) {
941+
val current = partition.executors
942+
if (current.contains(executorId)) {
943+
current
944+
} else {
945+
current :+ executorId
946+
}
947+
} else {
948+
partition.executors.filter(_ != executorId)
949+
}
950+
951+
// Only update the partition if it's still stored in some executor, otherwise get rid of it.
952+
if (executors.nonEmpty) {
953+
partition.update(executors,
954+
addDeltaToValue(partition.memoryUsed, memoryDelta),
955+
addDeltaToValue(partition.diskUsed, diskDelta))
956+
} else {
957+
rdd.removePartition(block.splitIndex)
994958
}
995959

996960
rdd.memoryUsed = addDeltaToValue(rdd.memoryUsed, memoryDelta)
997961
rdd.diskUsed = addDeltaToValue(rdd.diskUsed, diskDelta)
998962
update(rdd, now)
999963
}
1000-
1001-
// Finish updating the executor now that we know the delta in the number of blocks.
1002-
maybeExec.foreach { exec =>
1003-
exec.rddBlocks += rddBlocksDelta
1004-
maybeUpdate(exec, now)
1005-
}
1006964
}
1007965

1008966
private def getOrCreateExecutor(executorId: String, addTime: Long): LiveExecutor = {

0 commit comments

Comments
 (0)