Skip to content

Commit 0848fbf

Browse files
author
Marcelo Vanzin
committed
[SPARK-27468][core] Track correct storage level of RDDs and partitions.
Previously, the RDD level would change depending on the status reported by executors for the block they were storing, and individual blocks would reflect that. That is wrong because different blocks may be stored differently in different executors. So now the RDD tracks the user-provided storage level, while the individual partitions reflect the current storage level of that particular block, including the current number of replicas.
1 parent e1ea806 commit 0848fbf

File tree

4 files changed

+50
-33
lines changed

4 files changed

+50
-33
lines changed

core/src/main/scala/org/apache/spark/status/AppStatusListener.scala

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,8 @@ private[spark] class AppStatusListener(
234234
(partition.memoryUsed / partition.executors.length) * -1)
235235
rdd.diskUsed = addDeltaToValue(rdd.diskUsed,
236236
(partition.diskUsed / partition.executors.length) * -1)
237-
partition.update(partition.executors
238-
.filter(!_.equals(event.executorId)), rdd.storageLevel,
237+
partition.update(
238+
partition.executors.filter(!_.equals(event.executorId)),
239239
addDeltaToValue(partition.memoryUsed,
240240
(partition.memoryUsed / partition.executors.length) * -1),
241241
addDeltaToValue(partition.diskUsed,
@@ -495,7 +495,7 @@ private[spark] class AppStatusListener(
495495

496496
event.stageInfo.rddInfos.foreach { info =>
497497
if (info.storageLevel.isValid) {
498-
liveUpdate(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info)), now)
498+
liveUpdate(liveRDDs.getOrElseUpdate(info.id, new LiveRDD(info, info.storageLevel)), now)
499499
}
500500
}
501501

@@ -916,12 +916,6 @@ private[spark] class AppStatusListener(
916916
val diskDelta = event.blockUpdatedInfo.diskSize * (if (storageLevel.useDisk) 1 else -1)
917917
val memoryDelta = event.blockUpdatedInfo.memSize * (if (storageLevel.useMemory) 1 else -1)
918918

919-
val updatedStorageLevel = if (storageLevel.isValid) {
920-
Some(storageLevel.description)
921-
} else {
922-
None
923-
}
924-
925919
// We need information about the executor to update some memory accounting values in the
926920
// RDD info, so read that beforehand.
927921
val maybeExec = liveExecutors.get(executorId)
@@ -936,13 +930,9 @@ private[spark] class AppStatusListener(
936930
// Update the block entry in the RDD info, keeping track of the deltas above so that we
937931
// can update the executor information too.
938932
liveRDDs.get(block.rddId).foreach { rdd =>
939-
if (updatedStorageLevel.isDefined) {
940-
rdd.setStorageLevel(updatedStorageLevel.get)
941-
}
942-
943933
val partition = rdd.partition(block.name)
944934

945-
val executors = if (updatedStorageLevel.isDefined) {
935+
val executors = if (storageLevel.isValid) {
946936
val current = partition.executors
947937
if (current.contains(executorId)) {
948938
current
@@ -957,7 +947,7 @@ private[spark] class AppStatusListener(
957947

958948
// Only update the partition if it's still stored in some executor, otherwise get rid of it.
959949
if (executors.nonEmpty) {
960-
partition.update(executors, rdd.storageLevel,
950+
partition.update(executors,
961951
addDeltaToValue(partition.memoryUsed, memoryDelta),
962952
addDeltaToValue(partition.diskUsed, diskDelta))
963953
} else {

core/src/main/scala/org/apache/spark/status/LiveEntity.scala

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
3030
import org.apache.spark.resource.ResourceInformation
3131
import org.apache.spark.scheduler.{AccumulableInfo, StageInfo, TaskInfo}
3232
import org.apache.spark.status.api.v1
33-
import org.apache.spark.storage.RDDInfo
33+
import org.apache.spark.storage.{RDDInfo, StorageLevel}
3434
import org.apache.spark.ui.SparkUI
3535
import org.apache.spark.util.AccumulatorContext
3636
import org.apache.spark.util.collection.OpenHashSet
@@ -458,7 +458,7 @@ private class LiveStage extends LiveEntity {
458458

459459
}
460460

461-
private class LiveRDDPartition(val blockName: String) {
461+
private class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) {
462462

463463
import LiveEntityHelpers._
464464

@@ -476,12 +476,13 @@ private class LiveRDDPartition(val blockName: String) {
476476

477477
def update(
478478
executors: Seq[String],
479-
storageLevel: String,
480479
memoryUsed: Long,
481480
diskUsed: Long): Unit = {
481+
val level = StorageLevel(diskUsed > 0, memoryUsed > 0, rddLevel.useOffHeap,
482+
if (memoryUsed > 0) rddLevel.deserialized else false, executors.size)
482483
value = new v1.RDDPartitionInfo(
483484
blockName,
484-
weakIntern(storageLevel),
485+
weakIntern(level.description),
485486
memoryUsed,
486487
diskUsed,
487488
executors)
@@ -520,27 +521,28 @@ private class LiveRDDDistribution(exec: LiveExecutor) {
520521

521522
}
522523

523-
private class LiveRDD(val info: RDDInfo) extends LiveEntity {
524+
/**
525+
* Tracker for data related to a persisted RDD. Note the storage level is kept immutable here,
526+
* following the current behavior of `RDD.persist()`, even though it is mutable in the `RDDInfo`
527+
* structure.
528+
*/
529+
private class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends LiveEntity {
524530

525531
import LiveEntityHelpers._
526532

527-
var storageLevel: String = weakIntern(info.storageLevel.description)
528533
var memoryUsed = 0L
529534
var diskUsed = 0L
530535

536+
private val levelDescription = weakIntern(storageLevel.description)
531537
private val partitions = new HashMap[String, LiveRDDPartition]()
532538
private val partitionSeq = new RDDPartitionSeq()
533539

534540
private val distributions = new HashMap[String, LiveRDDDistribution]()
535541

536-
def setStorageLevel(level: String): Unit = {
537-
this.storageLevel = weakIntern(level)
538-
}
539-
540542
def partition(blockName: String): LiveRDDPartition = {
541543
partitions.getOrElseUpdate(blockName, {
542-
val part = new LiveRDDPartition(blockName)
543-
part.update(Nil, storageLevel, 0L, 0L)
544+
val part = new LiveRDDPartition(blockName, storageLevel)
545+
part.update(Nil, 0L, 0L)
544546
partitionSeq.addPartition(part)
545547
part
546548
})
@@ -578,7 +580,7 @@ private class LiveRDD(val info: RDDInfo) extends LiveEntity {
578580
info.name,
579581
info.numPartitions,
580582
partitions.size,
581-
storageLevel,
583+
levelDescription,
582584
memoryUsed,
583585
diskUsed,
584586
dists,

core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
4242
.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
4343
.set(ASYNC_TRACKING_ENABLED, false)
4444

45+
private val twoReplicaMemAndDiskLevel = StorageLevel(true, true, false, true, 2)
46+
4547
private var time: Long = _
4648
private var testDir: File = _
4749
private var store: ElementTrackingStore = _
@@ -763,6 +765,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
763765
assert(part.memoryUsed === rdd1b1.memSize * 2)
764766
assert(part.diskUsed === rdd1b1.diskSize * 2)
765767
assert(part.executors === Seq(bm1.executorId, bm2.executorId))
768+
assert(part.storageLevel === twoReplicaMemAndDiskLevel.description)
766769
}
767770

768771
check[ExecutorSummaryWrapper](bm2.executorId) { exec =>
@@ -800,9 +803,30 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
800803
assert(exec.info.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize)
801804
}
802805

803-
// Remove block 1 from bm 1.
806+
// Evict block 1 from memory in bm 1. Note that because of SPARK-29319, the disk size
807+
// is reported as "0" here to avoid double-counting; the current behavior of the block
808+
// manager is to provide the actual disk size of the block.
809+
listener.onBlockUpdated(SparkListenerBlockUpdated(
810+
BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.DISK_ONLY,
811+
rdd1b1.memSize, 0L)))
812+
813+
check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
814+
assert(wrapper.info.numCachedPartitions === 2L)
815+
assert(wrapper.info.memoryUsed === rdd1b1.memSize + rdd1b2.memSize)
816+
assert(wrapper.info.diskUsed === 2 * rdd1b1.diskSize + rdd1b2.diskSize)
817+
assert(wrapper.info.dataDistribution.get.size === 2L)
818+
assert(wrapper.info.partitions.get.size === 2L)
819+
}
820+
821+
check[ExecutorSummaryWrapper](bm1.executorId) { exec =>
822+
assert(exec.info.rddBlocks === 2L)
823+
assert(exec.info.memoryUsed === rdd1b2.memSize)
824+
assert(exec.info.diskUsed === rdd1b1.diskSize + rdd1b2.diskSize)
825+
}
826+
827+
// Remove block 1 from bm 1; note memSize = 0 due to the eviction above.
804828
listener.onBlockUpdated(SparkListenerBlockUpdated(
805-
BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.NONE, rdd1b1.memSize, rdd1b1.diskSize)))
829+
BlockUpdatedInfo(bm1, rdd1b1.blockId, StorageLevel.NONE, 0, rdd1b1.diskSize)))
806830

807831
check[RDDStorageInfoWrapper](rdd1b1.rddId) { wrapper =>
808832
assert(wrapper.info.numCachedPartitions === 2L)
@@ -1571,7 +1595,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
15711595
assert(dist.memoryRemaining === maxMemory - dist.memoryUsed)
15721596

15731597
val part1 = wrapper.info.partitions.get.find(_.blockName === rdd1b1.blockId.name).get
1574-
assert(part1.storageLevel === level.description)
1598+
assert(part1.storageLevel === twoReplicaMemAndDiskLevel.description)
15751599
assert(part1.memoryUsed === 2 * rdd1b1.memSize)
15761600
assert(part1.diskUsed === 2 * rdd1b1.diskSize)
15771601
assert(part1.executors === Seq(bm1.executorId, bm2.executorId))

core/src/test/scala/org/apache/spark/status/LiveEntitySuite.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.status
1919

2020
import org.apache.spark.SparkFunSuite
21+
import org.apache.spark.storage.StorageLevel
2122

2223
class LiveEntitySuite extends SparkFunSuite {
2324

@@ -59,8 +60,8 @@ class LiveEntitySuite extends SparkFunSuite {
5960
}
6061

6162
private def newPartition(i: Int): LiveRDDPartition = {
62-
val part = new LiveRDDPartition(i.toString)
63-
part.update(Seq(i.toString), i.toString, i, i)
63+
val part = new LiveRDDPartition(i.toString, StorageLevel.MEMORY_AND_DISK)
64+
part.update(Seq(i.toString), i, i)
6465
part
6566
}
6667

0 commit comments

Comments
 (0)