-
Notifications
You must be signed in to change notification settings - Fork 29k
[WIP][SPARK-29261][SQL][CORE] Support recover live entities from KVStore for (SQL)AppStatusListener #25943
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP][SPARK-29261][SQL][CORE] Support recover live entities from KVStore for (SQL)AppStatusListener #25943
Changes from all commits
e2e6fb8
bee8325
51f7d31
585dc15
3a23f50
521ec7b
eb7a63a
136ace4
29972e4
2b8008e
8344d3a
cbd9a8c
d0be796
b56e3aa
fbce6ed
90601de
a7d352d
82cfb29
3512303
7caf9d1
c3ca273
46715b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -69,13 +69,14 @@ private[spark] class AppStatusListener( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Keep track of live entities, so that task metrics can be efficiently updated (without | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // causing too many writes to the underlying store, and other expensive operations). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private val liveJobs = new HashMap[Int, LiveJob]() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private val liveExecutors = new HashMap[String, LiveExecutor]() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private val deadExecutors = new HashMap[String, LiveExecutor]() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private val liveTasks = new HashMap[Long, LiveTask]() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private val liveRDDs = new HashMap[Int, LiveRDD]() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private val pools = new HashMap[String, SchedulerPool]() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // variables are visible for tests. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private[spark] val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private[spark] val liveJobs = new HashMap[Int, LiveJob]() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private[spark] val liveExecutors = new HashMap[String, LiveExecutor]() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private[spark] val deadExecutors = new HashMap[String, LiveExecutor]() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private[spark] val liveTasks = new HashMap[Long, LiveTask]() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private[spark] val liveRDDs = new HashMap[Int, LiveRDD]() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private[spark] val pools = new HashMap[String, SchedulerPool]() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Keep the active executor count as a separate variable to avoid having to do synchronization | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -103,6 +104,87 @@ private[spark] class AppStatusListener( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // visible for tests | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private[spark] def recoverLiveEntities(): Unit = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!live) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| kvstore.view(classOf[JobDataWrapper]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .asScala.filter(_.info.status == JobExecutionStatus.RUNNING) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Safer condition would be either
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah. I didn't see any place we set Job status to |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .map(_.toLiveJob).foreach(job => liveJobs.put(job.jobId, job)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| kvstore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(_.info.isActive) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .map(_.toLiveExecutor).foreach(exec => liveExecutors.put(exec.executorId, exec)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| kvstore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(!_.info.isActive) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .map(_.toLiveExecutor).foreach(exec => deadExecutors.put(exec.executorId, exec)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| kvstore.view(classOf[StageDataWrapper]).asScala | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .filter { stageData => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stageData.info.status == v1.StageStatus.PENDING || | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Logically, spark/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala Lines 605 to 614 in 5a512e8
spark/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala Lines 743 to 748 in 5a512e8
... except status is SKIPPED: spark/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala Lines 408 to 429 in 5a512e8
so the condition is actually much complicated than that.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch here ! And I attached another possible condition for this:
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stageData.info.status == v1.StageStatus.ACTIVE || | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| (stageData.info.numActiveTasks > 0 && stageData.info.status != v1.StageStatus.SKIPPED) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }.map { stageData => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val stageId = stageData.info.stageId | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val jobs = liveJobs.values.filter(_.stageIds.contains(stageId)).toSeq | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stageData.toLiveStage(jobs) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }.foreach { stage => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: indentation of block for foreach looks to be off |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val stageId = stage.info.stageId | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val stageAttempt = stage.info.attemptNumber() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| liveStages.put((stageId, stageAttempt), stage) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| kvstore.view(classOf[ExecutorStageSummaryWrapper]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .index("stage") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .first(Array(stageId, stageAttempt)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .last(Array(stageId, stageAttempt)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .asScala | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .map(_.toLiveExecutorStageSummary) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .foreach { esummary => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stage.executorSummaries.put(esummary.executorId, esummary) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (esummary.isBlacklisted) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stage.blackListedExecutors += esummary.executorId | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| liveExecutors.get(esummary.executorId).foreach(_.isBlacklisted = true) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| liveExecutors.get(esummary.executorId).foreach(_.blacklistedInStages += stageId) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: unnecessary two empty lines - one empty line is enough |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| kvstore.view(classOf[TaskDataWrapper]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .parent(Array(stageId, stageAttempt)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .index(TaskIndexNames.STATUS) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .first(TaskState.RUNNING.toString) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .last(TaskState.RUNNING.toString) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .asScala | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .map(_.toLiveTask) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .foreach { task => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| liveTasks.put(task.info.taskId, task) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stage.activeTasksPerExecutor(task.info.executorId) += 1 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stage.savedTasks.addAndGet(kvstore.count(classOf[TaskDataWrapper]).intValue()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| kvstore.view(classOf[RDDStorageInfoWrapper]).asScala | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: same here, but just a suggestion (as the indentation is not dramatically changed) |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .foreach { rddWrapper => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val liveRdd = rddWrapper.toLiveRDD(liveExecutors) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| liveRDDs.put(liveRdd.info.id, liveRdd) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| kvstore.view(classOf[PoolData]).asScala.foreach { poolData => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: same here |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val schedulerPool = poolData.toSchedulerPool | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pools.put(schedulerPool.name, schedulerPool) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // used for tests only | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private[spark] def clearLiveEntities(): Unit = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| liveStages.clear() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| liveJobs.clear() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| liveExecutors.clear() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| deadExecutors.clear() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| liveTasks.clear() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| liveRDDs.clear() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| pools.clear() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| override def onOtherEvent(event: SparkListenerEvent): Unit = event match { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case SparkListenerLogStart(version) => sparkVersion = version | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| case _ => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -877,6 +959,12 @@ private[spark] class AppStatusListener( | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // used in tests only | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private[spark] def flush(): Unit = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| val now = System.nanoTime() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| flush(update(_, now)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private def flush(entityFlushFunc: LiveEntity => Unit): Unit = { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| liveStages.values.asScala.foreach { stage => | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,21 +59,23 @@ private[spark] abstract class LiveEntity { | |
|
|
||
| } | ||
|
|
||
| private class LiveJob( | ||
| private[spark] class LiveJob( | ||
| val jobId: Int, | ||
| name: String, | ||
| val name: String, | ||
| val submissionTime: Option[Date], | ||
| val stageIds: Seq[Int], | ||
| jobGroup: Option[String], | ||
| numTasks: Int, | ||
| sqlExecutionId: Option[Long]) extends LiveEntity { | ||
| val jobGroup: Option[String], | ||
| val numTasks: Int, | ||
| val sqlExecutionId: Option[Long]) extends LiveEntity { | ||
|
|
||
| var activeTasks = 0 | ||
| var completedTasks = 0 | ||
| var failedTasks = 0 | ||
|
|
||
| // Holds both the stage ID and the task index, packed into a single long value. | ||
| val completedIndices = new OpenHashSet[Long]() | ||
| // will only be set when recover LiveJob is needed. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we add
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't mind adding these to JobDataWrapper as well - it could bring problems on backward compatibility though. It sounds me as tradeoff between "possibly inaccurate" vs "no backward compatibility".
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think adding And, I think correctness is aways the first thing we need to take care of. |
||
| var numCompletedIndices = 0 | ||
|
|
||
| var killedTasks = 0 | ||
| var killedSummary: Map[String, Int] = Map() | ||
|
|
@@ -85,6 +87,8 @@ private class LiveJob( | |
| var completionTime: Option[Date] = None | ||
|
|
||
| var completedStages: Set[Int] = Set() | ||
| // will only be set when recover LiveJob is needed. | ||
| var numCompletedStages = 0 | ||
| var activeStages = 0 | ||
| var failedStages = 0 | ||
|
|
||
|
|
@@ -104,9 +108,9 @@ private class LiveJob( | |
| skippedTasks, | ||
| failedTasks, | ||
| killedTasks, | ||
| completedIndices.size, | ||
| completedIndices.size + numCompletedIndices, | ||
| activeStages, | ||
| completedStages.size, | ||
| completedStages.size + numCompletedStages, | ||
| skippedStages.size, | ||
| failedStages, | ||
| killedSummary) | ||
|
|
@@ -115,7 +119,7 @@ private class LiveJob( | |
|
|
||
| } | ||
|
|
||
| private class LiveTask( | ||
| private[spark] class LiveTask( | ||
| var info: TaskInfo, | ||
| stageId: Int, | ||
| stageAttemptId: Int, | ||
|
|
@@ -229,7 +233,7 @@ private class LiveTask( | |
|
|
||
| } | ||
|
|
||
| private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity { | ||
| private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity { | ||
|
|
||
| var hostPort: String = null | ||
| var host: String = null | ||
|
|
@@ -272,7 +276,7 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE | |
| def hasMemoryInfo: Boolean = totalOnHeap >= 0L | ||
|
|
||
| // peak values for executor level metrics | ||
| val peakExecutorMetrics = new ExecutorMetrics() | ||
| var peakExecutorMetrics = new ExecutorMetrics() | ||
|
|
||
| def hostname: String = if (host != null) host else hostPort.split(":")(0) | ||
|
|
||
|
|
@@ -316,10 +320,10 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE | |
| } | ||
| } | ||
|
|
||
| private class LiveExecutorStageSummary( | ||
| private[spark] class LiveExecutorStageSummary( | ||
| stageId: Int, | ||
| attemptId: Int, | ||
| executorId: String) extends LiveEntity { | ||
| val executorId: String) extends LiveEntity { | ||
|
|
||
| import LiveEntityHelpers._ | ||
|
|
||
|
|
@@ -353,7 +357,7 @@ private class LiveExecutorStageSummary( | |
|
|
||
| } | ||
|
|
||
| private class LiveStage extends LiveEntity { | ||
| private[spark] class LiveStage extends LiveEntity { | ||
|
|
||
| import LiveEntityHelpers._ | ||
|
|
||
|
|
@@ -370,6 +374,8 @@ private class LiveStage extends LiveEntity { | |
| var completedTasks = 0 | ||
| var failedTasks = 0 | ||
| val completedIndices = new OpenHashSet[Int]() | ||
| // will only be set when recover LiveStage is needed. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same here: Could we add |
||
| var numCompletedIndices = 0 | ||
|
|
||
| var killedTasks = 0 | ||
| var killedSummary: Map[String, Int] = Map() | ||
|
|
@@ -405,7 +411,7 @@ private class LiveStage extends LiveEntity { | |
| numCompleteTasks = completedTasks, | ||
| numFailedTasks = failedTasks, | ||
| numKilledTasks = killedTasks, | ||
| numCompletedIndices = completedIndices.size, | ||
| numCompletedIndices = completedIndices.size + numCompletedIndices, | ||
|
|
||
| submissionTime = info.submissionTime.map(new Date(_)), | ||
| firstTaskLaunchedTime = | ||
|
|
@@ -464,7 +470,7 @@ private class LiveStage extends LiveEntity { | |
| * used by the partition in the executors, and thus may differ from the storage level requested | ||
| * by the application. | ||
| */ | ||
| private class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) { | ||
| private[spark] class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) { | ||
|
|
||
| import LiveEntityHelpers._ | ||
|
|
||
|
|
@@ -496,7 +502,7 @@ private class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) { | |
|
|
||
| } | ||
|
|
||
| private class LiveRDDDistribution(exec: LiveExecutor) { | ||
| private[spark] class LiveRDDDistribution(exec: LiveExecutor) { | ||
|
|
||
| import LiveEntityHelpers._ | ||
|
|
||
|
|
@@ -513,6 +519,7 @@ private class LiveRDDDistribution(exec: LiveExecutor) { | |
| def toApi(): v1.RDDDataDistribution = { | ||
| if (lastUpdate == null) { | ||
| lastUpdate = new v1.RDDDataDistribution( | ||
| executorId, | ||
| weakIntern(exec.hostPort), | ||
| memoryUsed, | ||
| exec.maxMemory - exec.memoryUsed, | ||
|
|
@@ -535,18 +542,18 @@ private class LiveRDDDistribution(exec: LiveExecutor) { | |
| * RDDs, this covers the case where an early stage is run on the unpersisted RDD, and a later stage | ||
| * it started after the RDD is marked for caching. | ||
| */ | ||
| private class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends LiveEntity { | ||
| private[spark] class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends LiveEntity { | ||
|
|
||
| import LiveEntityHelpers._ | ||
|
|
||
| var memoryUsed = 0L | ||
| var diskUsed = 0L | ||
|
|
||
| private val levelDescription = weakIntern(storageLevel.description) | ||
| private val partitions = new HashMap[String, LiveRDDPartition]() | ||
| private val partitionSeq = new RDDPartitionSeq() | ||
| private[spark] val partitions = new HashMap[String, LiveRDDPartition]() | ||
| private[spark] val partitionSeq = new RDDPartitionSeq() | ||
|
|
||
| private val distributions = new HashMap[String, LiveRDDDistribution]() | ||
| private[spark] val distributions = new HashMap[String, LiveRDDDistribution]() | ||
|
|
||
| def partition(blockName: String): LiveRDDPartition = { | ||
| partitions.getOrElseUpdate(blockName, { | ||
|
|
@@ -600,7 +607,7 @@ private class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends Liv | |
|
|
||
| } | ||
|
|
||
| private class SchedulerPool(name: String) extends LiveEntity { | ||
| private[spark] class SchedulerPool(val name: String) extends LiveEntity { | ||
|
|
||
| var stageIds = Set[Int]() | ||
|
|
||
|
|
@@ -750,7 +757,7 @@ private object LiveEntityHelpers { | |
| * Internally, the sequence is mutable, and elements can modify the data they expose. Additions and | ||
| * removals are O(1). It is not safe to do multiple writes concurrently. | ||
| */ | ||
| private class RDDPartitionSeq extends Seq[v1.RDDPartitionInfo] { | ||
| private[spark] class RDDPartitionSeq extends Seq[v1.RDDPartitionInfo] { | ||
|
|
||
| @volatile private var _head: LiveRDDPartition = null | ||
| @volatile private var _tail: LiveRDDPartition = null | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,8 @@ import org.apache.spark.JobExecutionStatus | |
| import org.apache.spark.executor.ExecutorMetrics | ||
| import org.apache.spark.metrics.ExecutorMetricType | ||
| import org.apache.spark.resource.ResourceInformation | ||
| import org.apache.spark.status.{LiveExecutor, LiveRDDDistribution, LiveRDDPartition} | ||
| import org.apache.spark.storage.StorageLevel | ||
|
|
||
| case class ApplicationInfo private[spark]( | ||
| id: String, | ||
|
|
@@ -181,6 +183,7 @@ class RDDStorageInfo private[spark]( | |
| val partitions: Option[Seq[RDDPartitionInfo]]) | ||
|
|
||
| class RDDDataDistribution private[spark]( | ||
| val executorId: String, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are we changing model in api/v1? Seems like the change is needed, but we may also need to check where it doesn't break existing one.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the api versioning requirements are listed here: https://spark.apache.org/docs/latest/monitoring.html#api-versioning-policy
so its OK to add new fields. but, if we're adding things that really don't make sense as part of the api, then maybe we should just store a different object instead.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @squito, thanks for providing the api versioning requirements. That's really helpful. |
||
| val address: String, | ||
| val memoryUsed: Long, | ||
| val memoryRemaining: Long, | ||
|
|
@@ -192,14 +195,35 @@ class RDDDataDistribution private[spark]( | |
| @JsonDeserialize(contentAs = classOf[JLong]) | ||
| val onHeapMemoryRemaining: Option[Long], | ||
| @JsonDeserialize(contentAs = classOf[JLong]) | ||
| val offHeapMemoryRemaining: Option[Long]) | ||
| val offHeapMemoryRemaining: Option[Long]) { | ||
|
|
||
| private[spark] def toLiveRDDDistribution(executors: scala.collection.Map[String, LiveExecutor]) | ||
| : LiveRDDDistribution = { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. these added methods should probably be
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this necessary ? Seems we've already has
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The classes themselves are public. The |
||
| val exec = executors.get(executorId).get | ||
| val liveRDDDistribution = new LiveRDDDistribution(exec) | ||
| liveRDDDistribution.memoryUsed = memoryUsed | ||
| liveRDDDistribution.diskUsed = diskUsed | ||
| liveRDDDistribution.onHeapUsed = onHeapMemoryUsed.getOrElse(0) | ||
| liveRDDDistribution.offHeapUsed = offHeapMemoryUsed.getOrElse(0) | ||
| liveRDDDistribution.lastUpdate = this | ||
| liveRDDDistribution | ||
| } | ||
| } | ||
|
|
||
| class RDDPartitionInfo private[spark]( | ||
| val blockName: String, | ||
| val storageLevel: String, | ||
| val memoryUsed: Long, | ||
| val diskUsed: Long, | ||
| val executors: Seq[String]) | ||
| val executors: Seq[String]) { | ||
|
|
||
| def toLiveRDDPartition: LiveRDDPartition = { | ||
| val liveRDDPartition = new LiveRDDPartition(blockName, | ||
| StorageLevel.fromDescription(storageLevel)) | ||
| liveRDDPartition.value = this | ||
| liveRDDPartition | ||
| } | ||
| } | ||
|
|
||
| class StageData private[spark]( | ||
| val status: StageStatus, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may need to cooperate with SPARK-28594's config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think it should be run by default to guarantee AppStatusListener and KVStore is in sync, but let's wait for another voices.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... I've found we don't recover AppStatusSource as well since it will not be passed when
live == false. If possible I'd put assertion whether KVStore is empty whenlive == truethen, but it depends on the possibility. If not, we may have to live with.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think our current usage of a live(true)
AppStatusListenerguarantees the empty KVStore at the initialization step. So I don't understand well fordepends on the possibility? Do I miss something ?And yet, it seems that we don't have
isEmptyapi for KVStore. Otherwise, I could put an assertion to reach a compromises between us.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a "context" we might have a chance to break eventually and as a side-effect it will break here. I'm in favor of doing defensive programming: if there're preconditions it should be mentioned anywhere or asserted. But I agree we don't have isEmpty api for KVStore - let's leave it as it is.