diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala index 4e4a940d295..c362af72870 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala @@ -44,7 +44,7 @@ class SQLOperationListener( spark: SparkSession) extends StatsReportListener with Logging { private val operationId: String = operation.getHandle.identifier.toString - private lazy val activeJobs = new java.util.HashSet[Int]() + private lazy val activeJobs = new ConcurrentHashMap[Int, SparkJobInfo]() private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, SparkStageInfo]() private var executionId: Option[Long] = None @@ -53,6 +53,7 @@ class SQLOperationListener( if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) { Some(new SparkConsoleProgressBar( operation, + activeJobs, activeStages, conf.get(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL), conf.get(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT))) @@ -79,37 +80,45 @@ class SQLOperationListener( } } - override def onJobStart(jobStart: SparkListenerJobStart): Unit = activeJobs.synchronized { - if (sameGroupId(jobStart.properties)) { - val jobId = jobStart.jobId - val stageSize = jobStart.stageInfos.size - if (executionId.isEmpty) { - executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY)) - .map(_.toLong) - consoleProgressBar - operation match { - case executeStatement: ExecuteStatement => - executeStatement.setCompiledStateIfNeeded() - case _ => + override def onJobStart(jobStart: SparkListenerJobStart): Unit = { + activeJobs.synchronized { + if (sameGroupId(jobStart.properties)) { + val jobId = jobStart.jobId + val stageIds = jobStart.stageInfos.map(_.stageId) + val stageSize = jobStart.stageInfos.size + if (executionId.isEmpty) { + executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY)) + .map(_.toLong) + consoleProgressBar + operation match { + case executeStatement: ExecuteStatement => + executeStatement.setCompiledStateIfNeeded() + case _ => + } + } + activeJobs.put( + jobId, + new SparkJobInfo(stageSize, stageIds) + ) + withOperationLog { + info(s"Query [$operationId]: Job $jobId started with $stageSize stages," + + s" ${activeJobs.size()} active jobs running") } - } - withOperationLog { - activeJobs.add(jobId) - info(s"Query [$operationId]: Job $jobId started with $stageSize stages," + - s" ${activeJobs.size()} active jobs running") } } } override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = activeJobs.synchronized { val jobId = jobEnd.jobId - if (activeJobs.remove(jobId)) { - val hint = jobEnd.jobResult match { - case JobSucceeded => "succeeded" - case _ => "failed" // TODO: Handle JobFailed(exception: Exception) - } - withOperationLog { - info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} active jobs running") + activeJobs.synchronized { + if (activeJobs.remove(jobId) != null) { + val hint = jobEnd.jobResult match { + case JobSucceeded => "succeeded" + case _ => "failed" // TODO: Handle JobFailed(exception: Exception) + } + withOperationLog { + info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} active jobs running") + } } } } @@ -134,9 +143,17 @@ class SQLOperationListener( override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { val stageInfo = stageCompleted.stageInfo + val stageId = stageInfo.stageId val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber()) activeStages.synchronized { if (activeStages.remove(stageAttempt) != null) { + activeJobs.synchronized { + activeJobs.forEach((jobId, sparkJobInfo) => { + if (sparkJobInfo.stageIds.contains(stageId)) { + sparkJobInfo.numCompleteStages.getAndIncrement() + } + }) + } withOperationLog(super.onStageCompleted(stageCompleted)) } } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala index dc8b493cc04..27b41c2b8b4 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkConsoleProgressBar.scala @@ -29,6 +29,7 @@ import org.apache.kyuubi.operation.Operation class SparkConsoleProgressBar( operation: Operation, + liveJobs: ConcurrentHashMap[Int, SparkJobInfo], liveStages: ConcurrentHashMap[SparkStageAttempt, SparkStageInfo], updatePeriodMSec: Long, timeFormat: String) @@ -71,7 +72,14 @@ class SparkConsoleProgressBar( show(now, stages.take(3)) // display at most 3 stages in same time } } - + private def findJobId(stageId: Int): Int = { + liveJobs.forEach((jobId, sparkJobInfo) => { + if (sparkJobInfo.stageIds.contains(stageId)) { + return jobId + } + }) + -1 + } /** * Show progress bar in console. The progress bar is displayed in the next line * after your last output, keeps overwriting itself to hold in one line. The logging will follow @@ -81,7 +89,13 @@ class SparkConsoleProgressBar( val width = TerminalWidth / stages.size val bar = stages.map { s => val total = s.numTasks - val header = s"[Stage ${s.stageId}:" + val jobId = findJobId(s.stageId) + var jobHeader = s"[There is no job about this stage]" + if (jobId != -1) { + jobHeader = s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " + + s"/ ${liveJobs.get(jobId).numStages}) Stages] " + } + val header = jobHeader + s"[Stage ${s.stageId}:" val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]" val w = width - header.length - tailer.length val bar = diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala index 2ea9c3fdae6..cd745ea1921 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala @@ -27,3 +27,8 @@ class SparkStageInfo(val stageId: Int, val numTasks: Int) { var numActiveTasks = new AtomicInteger(0) var numCompleteTasks = new AtomicInteger(0) } + + +class SparkJobInfo(val numStages: Int, val stageIds: Seq[Int]) { + var numCompleteStages = new AtomicInteger(0) +} \ No newline at end of file