Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 95 additions & 7 deletions core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,14 @@ private[spark] class AppStatusListener(

// Keep track of live entities, so that task metrics can be efficiently updated (without
// causing too many writes to the underlying store, and other expensive operations).
private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]()
private val liveJobs = new HashMap[Int, LiveJob]()
private val liveExecutors = new HashMap[String, LiveExecutor]()
private val deadExecutors = new HashMap[String, LiveExecutor]()
private val liveTasks = new HashMap[Long, LiveTask]()
private val liveRDDs = new HashMap[Int, LiveRDD]()
private val pools = new HashMap[String, SchedulerPool]()
// variables are visible for tests.
private[spark] val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]()
private[spark] val liveJobs = new HashMap[Int, LiveJob]()
private[spark] val liveExecutors = new HashMap[String, LiveExecutor]()
private[spark] val deadExecutors = new HashMap[String, LiveExecutor]()
private[spark] val liveTasks = new HashMap[Long, LiveTask]()
private[spark] val liveRDDs = new HashMap[Int, LiveRDD]()
private[spark] val pools = new HashMap[String, SchedulerPool]()

private val SQL_EXECUTION_ID_KEY = "spark.sql.execution.id"
// Keep the active executor count as a separate variable to avoid having to do synchronization
Expand Down Expand Up @@ -103,6 +104,87 @@ private[spark] class AppStatusListener(
}
}

// visible for tests
private[spark] def recoverLiveEntities(): Unit = {
if (!live) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may need to cooperate with SPARK-28594's config.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think it should be run by default to guarantee AppStatusListener and KVStore is in sync, but let's wait for another voices.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Oct 7, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... I've found we don't recover AppStatusSource as well since it will not be passed when live == false. If possible I'd put assertion whether KVStore is empty when live == true then, but it depends on the possibility. If not, we may have to live with.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible I'd put assertion whether KVStore is empty when live == true then, but it depends on the possibility.

I think our current usage of a live(true) AppStatusListener guarantees the empty KVStore at the initialization step. So I don't understand well for depends on the possibility ? Do I miss something ?

And yet, it seems that we don't have isEmpty api for KVStore. Otherwise, I could put an assertion to reach a compromises between us.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think our current usage of a live(true) AppStatusListener guarantees the empty KVStore at the initialization step. So I don't understand well for depends on the possibility ?

That's a "context" we might have a chance to break eventually and as a side-effect it will break here. I'm in favor of doing defensive programming: if there're preconditions it should be mentioned anywhere or asserted. But I agree we don't have isEmpty api for KVStore - let's leave it as it is.

kvstore.view(classOf[JobDataWrapper])
.asScala.filter(_.info.status == JobExecutionStatus.RUNNING)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Safer condition would be either UNKNOWN or RUNNING, but I guess it shouldn't be UNKNOWN so seems OK.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I didn't see any place we set Job status to UNKNOWN.

.map(_.toLiveJob).foreach(job => liveJobs.put(job.jobId, job))

kvstore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(_.info.isActive)
.map(_.toLiveExecutor).foreach(exec => liveExecutors.put(exec.executorId, exec))

kvstore.view(classOf[ExecutorSummaryWrapper]).asScala.filter(!_.info.isActive)
.map(_.toLiveExecutor).foreach(exec => deadExecutors.put(exec.executorId, exec))

kvstore.view(classOf[StageDataWrapper]).asScala
.filter { stageData =>
stageData.info.status == v1.StageStatus.PENDING ||
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logically, activeTasks > 0 is counted in liveStages regardless of status:

// [SPARK-24415] Wait for all tasks to finish before removing stage from live list
val removeStage =
stage.activeTasks == 0 &&
(v1.StageStatus.COMPLETE.equals(stage.status) ||
v1.StageStatus.FAILED.equals(stage.status))
if (removeStage) {
update(stage, now, last = true)
} else {
maybeUpdate(stage, now)
}

// Remove stage only if there are no active tasks remaining
val removeStage = stage.activeTasks == 0
update(stage, now, last = removeStage)
if (removeStage) {
liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptNumber))
}

... except status is SKIPPED:

// Check if there are any pending stages that match this job; mark those as skipped.
val it = liveStages.entrySet.iterator()
while (it.hasNext()) {
val e = it.next()
if (job.stageIds.contains(e.getKey()._1)) {
val stage = e.getValue()
if (v1.StageStatus.PENDING.equals(stage.status)) {
stage.status = v1.StageStatus.SKIPPED
job.skippedStages += stage.info.stageId
job.skippedTasks += stage.info.numTasks
job.activeStages -= 1
pools.get(stage.schedulingPool).foreach { pool =>
pool.stageIds = pool.stageIds - stage.info.stageId
update(pool, now)
}
it.remove()
update(stage, now, last = true)
}
}
}

so the condition is actually much complicated than that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch here ! And I attached another possible condition for this:

(stageData.info.numActiveTasks > 0 && stageData.info.status != v1.StageStatus.SKIPPED)

stageData.info.status == v1.StageStatus.ACTIVE ||
(stageData.info.numActiveTasks > 0 && stageData.info.status != v1.StageStatus.SKIPPED)
}.map { stageData =>
val stageId = stageData.info.stageId
val jobs = liveJobs.values.filter(_.stageIds.contains(stageId)).toSeq
stageData.toLiveStage(jobs)
}.foreach { stage =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indentation of block for foreach looks to be off

val stageId = stage.info.stageId
val stageAttempt = stage.info.attemptNumber()
liveStages.put((stageId, stageAttempt), stage)

kvstore.view(classOf[ExecutorStageSummaryWrapper])
.index("stage")
.first(Array(stageId, stageAttempt))
.last(Array(stageId, stageAttempt))
.asScala
.map(_.toLiveExecutorStageSummary)
.foreach { esummary =>
stage.executorSummaries.put(esummary.executorId, esummary)
if (esummary.isBlacklisted) {
stage.blackListedExecutors += esummary.executorId
liveExecutors.get(esummary.executorId).foreach(_.isBlacklisted = true)
liveExecutors.get(esummary.executorId).foreach(_.blacklistedInStages += stageId)
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary two empty lines - one empty line is enough

kvstore.view(classOf[TaskDataWrapper])
.parent(Array(stageId, stageAttempt))
.index(TaskIndexNames.STATUS)
.first(TaskState.RUNNING.toString)
.last(TaskState.RUNNING.toString)
.asScala
.map(_.toLiveTask)
.foreach { task =>
liveTasks.put(task.info.taskId, task)
stage.activeTasksPerExecutor(task.info.executorId) += 1
}

stage.savedTasks.addAndGet(kvstore.count(classOf[TaskDataWrapper]).intValue())
}

kvstore.view(classOf[RDDStorageInfoWrapper]).asScala
Copy link
Contributor

@HeartSaVioR HeartSaVioR Oct 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same here, but just a suggestion (as the indentation is not dramatically changed)

.foreach { rddWrapper =>
val liveRdd = rddWrapper.toLiveRDD(liveExecutors)
liveRDDs.put(liveRdd.info.id, liveRdd)
}

kvstore.view(classOf[PoolData]).asScala.foreach { poolData =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: same here

val schedulerPool = poolData.toSchedulerPool
pools.put(schedulerPool.name, schedulerPool)
}
}
}

// used for tests only
private[spark] def clearLiveEntities(): Unit = {
liveStages.clear()
liveJobs.clear()
liveExecutors.clear()
deadExecutors.clear()
liveTasks.clear()
liveRDDs.clear()
pools.clear()
}

override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case SparkListenerLogStart(version) => sparkVersion = version
case _ =>
Expand Down Expand Up @@ -877,6 +959,12 @@ private[spark] class AppStatusListener(
}
}

// used in tests only
private[spark] def flush(): Unit = {
val now = System.nanoTime()
flush(update(_, now))
}

/** Go through all `LiveEntity`s and use `entityFlushFunc(entity)` to flush them. */
private def flush(entityFlushFunc: LiveEntity => Unit): Unit = {
liveStages.values.asScala.foreach { stage =>
Expand Down
51 changes: 29 additions & 22 deletions core/src/main/scala/org/apache/spark/status/LiveEntity.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,21 +59,23 @@ private[spark] abstract class LiveEntity {

}

private class LiveJob(
private[spark] class LiveJob(
val jobId: Int,
name: String,
val name: String,
val submissionTime: Option[Date],
val stageIds: Seq[Int],
jobGroup: Option[String],
numTasks: Int,
sqlExecutionId: Option[Long]) extends LiveEntity {
val jobGroup: Option[String],
val numTasks: Int,
val sqlExecutionId: Option[Long]) extends LiveEntity {

var activeTasks = 0
var completedTasks = 0
var failedTasks = 0

// Holds both the stage ID and the task index, packed into a single long value.
val completedIndices = new OpenHashSet[Long]()
// will only be set when recover LiveJob is needed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add completedIndices and completedStages into new KVStore entity class, and restore LiveJob from both JobDataWrapper and new class? I guess you would like to avoid modifying JobDataWrapper, but I'd worry the state of instance differs from origin vs restored so would like to explore new option here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't mind adding these to JobDataWrapper as well - it could bring problems on backward compatibility though. It sounds me as tradeoff between "possibly inaccurate" vs "no backward compatibility".

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding completedIndices and completedStages to JobDataWrapper would be OK since JobDataWrapper is not under api.v1 package ?

And, I think correctness is aways the first thing we need to take care of.

var numCompletedIndices = 0

var killedTasks = 0
var killedSummary: Map[String, Int] = Map()
Expand All @@ -85,6 +87,8 @@ private class LiveJob(
var completionTime: Option[Date] = None

var completedStages: Set[Int] = Set()
// will only be set when recover LiveJob is needed.
var numCompletedStages = 0
var activeStages = 0
var failedStages = 0

Expand All @@ -104,9 +108,9 @@ private class LiveJob(
skippedTasks,
failedTasks,
killedTasks,
completedIndices.size,
completedIndices.size + numCompletedIndices,
activeStages,
completedStages.size,
completedStages.size + numCompletedStages,
skippedStages.size,
failedStages,
killedSummary)
Expand All @@ -115,7 +119,7 @@ private class LiveJob(

}

private class LiveTask(
private[spark] class LiveTask(
var info: TaskInfo,
stageId: Int,
stageAttemptId: Int,
Expand Down Expand Up @@ -229,7 +233,7 @@ private class LiveTask(

}

private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity {
private[spark] class LiveExecutor(val executorId: String, _addTime: Long) extends LiveEntity {

var hostPort: String = null
var host: String = null
Expand Down Expand Up @@ -272,7 +276,7 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE
def hasMemoryInfo: Boolean = totalOnHeap >= 0L

// peak values for executor level metrics
val peakExecutorMetrics = new ExecutorMetrics()
var peakExecutorMetrics = new ExecutorMetrics()

def hostname: String = if (host != null) host else hostPort.split(":")(0)

Expand Down Expand Up @@ -316,10 +320,10 @@ private class LiveExecutor(val executorId: String, _addTime: Long) extends LiveE
}
}

private class LiveExecutorStageSummary(
private[spark] class LiveExecutorStageSummary(
stageId: Int,
attemptId: Int,
executorId: String) extends LiveEntity {
val executorId: String) extends LiveEntity {

import LiveEntityHelpers._

Expand Down Expand Up @@ -353,7 +357,7 @@ private class LiveExecutorStageSummary(

}

private class LiveStage extends LiveEntity {
private[spark] class LiveStage extends LiveEntity {

import LiveEntityHelpers._

Expand All @@ -370,6 +374,8 @@ private class LiveStage extends LiveEntity {
var completedTasks = 0
var failedTasks = 0
val completedIndices = new OpenHashSet[Int]()
// will only be set when recover LiveStage is needed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here: Could we add completedIndices into new KVStore entity class, and restore LiveStage from both StageDataWrapper and new class? Adding it to the StageDataWrapper is also fine for me.

var numCompletedIndices = 0

var killedTasks = 0
var killedSummary: Map[String, Int] = Map()
Expand Down Expand Up @@ -405,7 +411,7 @@ private class LiveStage extends LiveEntity {
numCompleteTasks = completedTasks,
numFailedTasks = failedTasks,
numKilledTasks = killedTasks,
numCompletedIndices = completedIndices.size,
numCompletedIndices = completedIndices.size + numCompletedIndices,

submissionTime = info.submissionTime.map(new Date(_)),
firstTaskLaunchedTime =
Expand Down Expand Up @@ -464,7 +470,7 @@ private class LiveStage extends LiveEntity {
* used by the partition in the executors, and thus may differ from the storage level requested
* by the application.
*/
private class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) {
private[spark] class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) {

import LiveEntityHelpers._

Expand Down Expand Up @@ -496,7 +502,7 @@ private class LiveRDDPartition(val blockName: String, rddLevel: StorageLevel) {

}

private class LiveRDDDistribution(exec: LiveExecutor) {
private[spark] class LiveRDDDistribution(exec: LiveExecutor) {

import LiveEntityHelpers._

Expand All @@ -513,6 +519,7 @@ private class LiveRDDDistribution(exec: LiveExecutor) {
def toApi(): v1.RDDDataDistribution = {
if (lastUpdate == null) {
lastUpdate = new v1.RDDDataDistribution(
executorId,
weakIntern(exec.hostPort),
memoryUsed,
exec.maxMemory - exec.memoryUsed,
Expand All @@ -535,18 +542,18 @@ private class LiveRDDDistribution(exec: LiveExecutor) {
* RDDs, this covers the case where an early stage is run on the unpersisted RDD, and a later stage
* it started after the RDD is marked for caching.
*/
private class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends LiveEntity {
private[spark] class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends LiveEntity {

import LiveEntityHelpers._

var memoryUsed = 0L
var diskUsed = 0L

private val levelDescription = weakIntern(storageLevel.description)
private val partitions = new HashMap[String, LiveRDDPartition]()
private val partitionSeq = new RDDPartitionSeq()
private[spark] val partitions = new HashMap[String, LiveRDDPartition]()
private[spark] val partitionSeq = new RDDPartitionSeq()

private val distributions = new HashMap[String, LiveRDDDistribution]()
private[spark] val distributions = new HashMap[String, LiveRDDDistribution]()

def partition(blockName: String): LiveRDDPartition = {
partitions.getOrElseUpdate(blockName, {
Expand Down Expand Up @@ -600,7 +607,7 @@ private class LiveRDD(val info: RDDInfo, storageLevel: StorageLevel) extends Liv

}

private class SchedulerPool(name: String) extends LiveEntity {
private[spark] class SchedulerPool(val name: String) extends LiveEntity {

var stageIds = Set[Int]()

Expand Down Expand Up @@ -750,7 +757,7 @@ private object LiveEntityHelpers {
* Internally, the sequence is mutable, and elements can modify the data they expose. Additions and
* removals are O(1). It is not safe to do multiple writes concurrently.
*/
private class RDDPartitionSeq extends Seq[v1.RDDPartitionInfo] {
private[spark] class RDDPartitionSeq extends Seq[v1.RDDPartitionInfo] {

@volatile private var _head: LiveRDDPartition = null
@volatile private var _tail: LiveRDDPartition = null
Expand Down
28 changes: 26 additions & 2 deletions core/src/main/scala/org/apache/spark/status/api/v1/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.metrics.ExecutorMetricType
import org.apache.spark.resource.ResourceInformation
import org.apache.spark.status.{LiveExecutor, LiveRDDDistribution, LiveRDDPartition}
import org.apache.spark.storage.StorageLevel

case class ApplicationInfo private[spark](
id: String,
Expand Down Expand Up @@ -181,6 +183,7 @@ class RDDStorageInfo private[spark](
val partitions: Option[Seq[RDDPartitionInfo]])

class RDDDataDistribution private[spark](
val executorId: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we changing model in api/v1? Seems like the change is needed, but we may also need to check where it doesn't break existing one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the api versioning requirements are listed here: https://spark.apache.org/docs/latest/monitoring.html#api-versioning-policy

New fields may be added to existing endpoints

so its OK to add new fields.

but, if we're adding things that really don't make sense as part of the api, then maybe we should just store a different object instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @squito, thanks for providing the api versioning requirements. That's really helpful.

val address: String,
val memoryUsed: Long,
val memoryRemaining: Long,
Expand All @@ -192,14 +195,35 @@ class RDDDataDistribution private[spark](
@JsonDeserialize(contentAs = classOf[JLong])
val onHeapMemoryRemaining: Option[Long],
@JsonDeserialize(contentAs = classOf[JLong])
val offHeapMemoryRemaining: Option[Long])
val offHeapMemoryRemaining: Option[Long]) {

private[spark] def toLiveRDDDistribution(executors: scala.collection.Map[String, LiveExecutor])
: LiveRDDDistribution = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these added methods should probably be private[spark]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary ? Seems we've already has private[spark] scope for RDDDataDistribution and other objects which has toLiveXXX methods in storeTypes. So, those objects can't be constructed out of spark and these toLiveXXX methods won't be accessible, either.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The classes themselves are public. The private[spark] is just applied to the constructor (that's why its in an unusual position, that's the scala way of scoping the constructor).

val exec = executors.get(executorId).get
val liveRDDDistribution = new LiveRDDDistribution(exec)
liveRDDDistribution.memoryUsed = memoryUsed
liveRDDDistribution.diskUsed = diskUsed
liveRDDDistribution.onHeapUsed = onHeapMemoryUsed.getOrElse(0)
liveRDDDistribution.offHeapUsed = offHeapMemoryUsed.getOrElse(0)
liveRDDDistribution.lastUpdate = this
liveRDDDistribution
}
}

class RDDPartitionInfo private[spark](
val blockName: String,
val storageLevel: String,
val memoryUsed: Long,
val diskUsed: Long,
val executors: Seq[String])
val executors: Seq[String]) {

def toLiveRDDPartition: LiveRDDPartition = {
val liveRDDPartition = new LiveRDDPartition(blockName,
StorageLevel.fromDescription(storageLevel))
liveRDDPartition.value = this
liveRDDPartition
}
}

class StageData private[spark](
val status: StageStatus,
Expand Down
Loading