Skip to content

Commit

Permalink
add showProgress with jobInfo Unit Test
Browse files Browse the repository at this point in the history
  • Loading branch information
davidyuan1223 committed Oct 17, 2023
1 parent d12046d commit 10a56b1
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class SQLOperationListener(
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
if (sameGroupId(jobStart.properties)) {
val jobId = jobStart.jobId
val stageIds = jobStart.stageInfos.map(_.stageId)
val stageIds = jobStart.stageInfos.map(_.stageId).toSet
val stageSize = jobStart.stageInfos.size
if (executionId.isEmpty) {
executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY))
Expand Down Expand Up @@ -144,11 +144,14 @@ class SQLOperationListener(
val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber())
activeStages.synchronized {
if (activeStages.remove(stageAttempt) != null) {
activeJobs.asScala.foreach(item => {
if (item._2.stageIds.contains(stageId)) {
item._2.numCompleteStages.getAndIncrement()
}
})
stageInfo.getStatusString match {
case "succeeded" =>
activeJobs.asScala.foreach(item => {
if (item._2.stageIds.contains(stageId)) {
item._2.numCompleteStages.getAndIncrement()
}
})
}
withOperationLog(super.onStageCompleted(stageCompleted))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,10 @@ class SparkConsoleProgressBar(
val width = TerminalWidth / stages.size
val bar = stages.map { s =>
val total = s.numTasks
val result = findJobId(s.stageId)
var jobHeader = s"[There is no job about this stage]"
if (result.isDefined) {
val jobId = result.get
jobHeader = s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " +
s"/ ${liveJobs.get(jobId).numStages}) Stages] "
}
val jobHeader = findJobId(s.stageId).map(jobId =>
s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " +
s"/ ${liveJobs.get(jobId).numStages}) Stages] ").getOrElse(
"[There is no job about this stage] ")
val header = jobHeader + s"[Stage ${s.stageId}:"
val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]"
val w = width - header.length - tailer.length
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ case class SparkStageAttempt(stageId: Int, stageAttemptId: Int) {
}

class SparkStageInfo(val stageId: Int, val numTasks: Int) {
var numActiveTasks = new AtomicInteger(0)
var numCompleteTasks = new AtomicInteger(0)
val numActiveTasks = new AtomicInteger(0)
val numCompleteTasks = new AtomicInteger(0)
}

class SparkJobInfo(val numStages: Int, val stageIds: Seq[Int]) {
var numCompleteStages = new AtomicInteger(0)
class SparkJobInfo(val numStages: Int, val stageIds: Set[Int]) {
val numCompleteStages = new AtomicInteger(0)
}

0 comments on commit 10a56b1

Please sign in to comment.