Skip to content

Commit

Permalink
[KYUUBI #4186] Spark showProgress with JobInfo
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

current version, when set `kyuubi.session.engine.spark.showProgress=true`, it will show stage's progress info,but the info only show stage's detail, now we need to add job info in this, just like
```
[Stage 1:>                                   (0 + 1) / 2]
```
to
```
[Job 1 (0 / 1) Stages] [Stage 1:>                                   (0 + 1) / 2]
```
**this update is useful when user want know their sql execute detail**

closes #4186

### _How was this patch tested?_
- [x] Add screenshots for manual tests if appropriate
**The photo show match log**
![image](https://github.com/apache/kyuubi/assets/51512358/73e0e2c0-f223-4d36-a3a7-638fb7691437)

- [x] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No

Closes #5410 from davidyuan1223/improvement_add_job_log.

Closes #4186

d8d03c4 [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SQLOperationListener.scala
a06e9a1 [david yuan] Update SparkConsoleProgressBar.scala
8544084 [david yuan] Merge branch 'apache:master' into improvement_add_job_log
963ff18 [david yuan] Update SparkConsoleProgressBar.scala
9e46356 [david yuan] Update SparkConsoleProgressBar.scala
8c04dca [david yuan] Update SQLOperationListener.scala
39751bf [davidyuan] fix
4f657e7 [davidyuan] fix deleted files
86756eb [david yuan] Merge branch 'apache:master' into improvement_add_job_log
0c9ac27 [davidyuan] add showProgress with jobInfo Unit Test
d4434a0 [davidyuan] Revert "add showProgress with jobInfo Unit Test"
84b1aa0 [davidyuan] Revert "improvement_add_job_log fix"
66126f9 [davidyuan] Merge remote-tracking branch 'origin/improvement_add_job_log' into improvement_add_job_log
228fd9c [davidyuan] add showProgress with jobInfo Unit Test
055e0ac [davidyuan] add showProgress with jobInfo Unit Test
e4aac34 [davidyuan] Merge remote-tracking branch 'origin/improvement_add_job_log' into improvement_add_job_log
b226ada [davidyuan] Merge remote-tracking branch 'origin/improvement_add_job_log' into improvement_add_job_log
a08799c [david yuan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala
a991b68 [david yuan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/StageStatus.scala
d12046d [davidyuan] add showProgress with jobInfo Unit Test
10a56b1 [davidyuan] add showProgress with jobInfo Unit Test
a973cdd [davidyuan] improvement_add_job_log fix 1. provide Option[Int] with JobId
e8a5108 [davidyuan] improvement_add_job_log fix 1. provide Option[Int] with JobId
7b9e874 [davidyuan] improvement_add_job_log fix 1. provide Option[Int] with JobId
5b4aaa8 [davidyuan] improvement_add_job_log fix 1. fix new end line 2. provide Option[Int] with JobId
780f9d1 [davidyuan] improvement_add_job_log fix 1. remove duplicate synchronized 2. because the activeJobs is ConcurrentHashMap, so reduce synchronized 3. fix scala code style 4. change forEach to asScala code style 5. change conf str to KyuubiConf.XXX.key
59340b7 [davidyuan] add showProgress with jobInfo Unit Test
af05089 [davidyuan] add showProgress with jobInfo Unit Test
c07535a [davidyuan] [Improvement] spark showProgress can briefly output info of the job #4186
d4bdec7 [yuanfuyuan] fix_4186
9fa8e73 [davidyuan] add showProgress with jobInfo Unit Test
49debfb [davidyuan] improvement_add_job_log fix 1. provide Option[Int] with JobId
5cf8714 [davidyuan] improvement_add_job_log fix 1. provide Option[Int] with JobId
249a422 [davidyuan] improvement_add_job_log fix 1. provide Option[Int] with JobId
e15fc71 [davidyuan] improvement_add_job_log fix 1. fix new end line 2. provide Option[Int] with JobId
4564ef9 [davidyuan] improvement_add_job_log fix 1. remove duplicate synchronized 2. because the activeJobs is ConcurrentHashMap, so reduce synchronized 3. fix scala code style 4. change forEach to asScala code style 5. change conf str to KyuubiConf.XXX.key
32ad075 [davidyuan] add showProgress with jobInfo Unit Test
d30492e [davidyuan] add showProgress with jobInfo Unit Test
6209c34 [davidyuan] [Improvement] spark showProgress can briefly output info of the job #4186
56b91a3 [yuanfuyuan] fix_4186

Lead-authored-by: davidyuan <yuanfuyuan@mafengwo.com>
Co-authored-by: davidyuan <davidyuan1223@gmail.com>
Co-authored-by: david yuan <51512358+davidyuan1223@users.noreply.github.com>
Co-authored-by: yuanfuyuan <1406957364@qq.com>
Co-authored-by: Cheng Pan <pan3793@gmail.com>
Signed-off-by: Cheng Pan <chengpan@apache.org>
(cherry picked from commit ed0d997)
Signed-off-by: Cheng Pan <chengpan@apache.org>
  • Loading branch information
5 people committed Oct 25, 2023
1 parent c0bfeed commit 5e57071
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.kyuubi
import java.util.Properties
import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._

import org.apache.spark.scheduler._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd
Expand All @@ -44,7 +46,7 @@ class SQLOperationListener(
spark: SparkSession) extends StatsReportListener with Logging {

private val operationId: String = operation.getHandle.identifier.toString
private lazy val activeJobs = new java.util.HashSet[Int]()
private lazy val activeJobs = new ConcurrentHashMap[Int, SparkJobInfo]()
private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, SparkStageInfo]()
private var executionId: Option[Long] = None

Expand All @@ -53,6 +55,7 @@ class SQLOperationListener(
if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) {
Some(new SparkConsoleProgressBar(
operation,
activeJobs,
activeStages,
conf.get(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL),
conf.get(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT)))
Expand All @@ -79,9 +82,10 @@ class SQLOperationListener(
}
}

override def onJobStart(jobStart: SparkListenerJobStart): Unit = activeJobs.synchronized {
override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
if (sameGroupId(jobStart.properties)) {
val jobId = jobStart.jobId
val stageIds = jobStart.stageInfos.map(_.stageId).toSet
val stageSize = jobStart.stageInfos.size
if (executionId.isEmpty) {
executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY))
Expand All @@ -93,17 +97,19 @@ class SQLOperationListener(
case _ =>
}
}
activeJobs.put(
jobId,
new SparkJobInfo(stageSize, stageIds))
withOperationLog {
activeJobs.add(jobId)
info(s"Query [$operationId]: Job $jobId started with $stageSize stages," +
s" ${activeJobs.size()} active jobs running")
}
}
}

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = activeJobs.synchronized {
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
val jobId = jobEnd.jobId
if (activeJobs.remove(jobId)) {
if (activeJobs.remove(jobId) != null) {
val hint = jobEnd.jobResult match {
case JobSucceeded => "succeeded"
case _ => "failed" // TODO: Handle JobFailed(exception: Exception)
Expand Down Expand Up @@ -134,9 +140,18 @@ class SQLOperationListener(

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
val stageInfo = stageCompleted.stageInfo
val stageId = stageInfo.stageId
val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber())
activeStages.synchronized {
if (activeStages.remove(stageAttempt) != null) {
stageInfo.getStatusString match {
case "succeeded" =>
activeJobs.asScala.foreach { case (_, jobInfo) =>
if (jobInfo.stageIds.contains(stageId)) {
jobInfo.numCompleteStages.getAndIncrement()
}
}
}
withOperationLog(super.onStageCompleted(stageCompleted))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.kyuubi.operation.Operation

class SparkConsoleProgressBar(
operation: Operation,
liveJobs: ConcurrentHashMap[Int, SparkJobInfo],
liveStages: ConcurrentHashMap[SparkStageAttempt, SparkStageInfo],
updatePeriodMSec: Long,
timeFormat: String)
Expand Down Expand Up @@ -72,6 +73,17 @@ class SparkConsoleProgressBar(
}
}

/**
* Use stageId to find stage's jobId
* @param stageId
* @return jobId (Optional)
*/
private def findJobId(stageId: Int): Option[Int] = {
liveJobs.asScala.collectFirst {
case (jobId, jobInfo) if jobInfo.stageIds.contains(stageId) => jobId
}
}

/**
* Show progress bar in console. The progress bar is displayed in the next line
* after your last output, keeps overwriting itself to hold in one line. The logging will follow
Expand All @@ -81,9 +93,13 @@ class SparkConsoleProgressBar(
val width = TerminalWidth / stages.size
val bar = stages.map { s =>
val total = s.numTasks
val header = s"[Stage ${s.stageId}:"
val jobHeader = findJobId(s.stageId).map(jobId =>
s"[Job $jobId (${liveJobs.get(jobId).numCompleteStages} " +
s"/ ${liveJobs.get(jobId).numStages}) Stages] ").getOrElse(
"[There is no job about this stage] ")
val header = jobHeader + s"[Stage ${s.stageId}:"
val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]"
val w = width - header.length - tailer.length
val w = width + jobHeader.length - header.length - tailer.length
val bar =
if (w > 0) {
val percent = w * s.numCompleteTasks.get / total
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ case class SparkStageAttempt(stageId: Int, stageAttemptId: Int) {
}

class SparkStageInfo(val stageId: Int, val numTasks: Int) {
var numActiveTasks = new AtomicInteger(0)
var numCompleteTasks = new AtomicInteger(0)
val numActiveTasks = new AtomicInteger(0)
val numCompleteTasks = new AtomicInteger(0)
}

class SparkJobInfo(val numStages: Int, val stageIds: Set[Int]) {
val numCompleteStages = new AtomicInteger(0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@ import scala.collection.JavaConverters.asScalaBufferConverter
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchOrientation, TFetchResultsReq, TOperationHandle}
import org.scalatest.time.SpanSugar._

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.OPERATION_SPARK_LISTENER_ENABLED
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
import org.apache.kyuubi.operation.HiveJDBCTestHelper

class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {

override def withKyuubiConf: Map[String, String] = Map.empty
override def withKyuubiConf: Map[String, String] = Map(
KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS.key -> "true",
KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL.key -> "200")

override protected def jdbcUrl: String = getJdbcUrl

Expand All @@ -54,6 +57,24 @@ class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelp
}
}

test("operation listener with progress job info") {
val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);"
withSessionHandle { (client, handle) =>
val req = new TExecuteStatementReq()
req.setSessionHandle(handle)
req.setStatement(sql)
val tExecuteStatementResp = client.ExecuteStatement(req)
val opHandle = tExecuteStatementResp.getOperationHandle
val fetchResultsReq = new TFetchResultsReq(opHandle, TFetchOrientation.FETCH_NEXT, 1000)
fetchResultsReq.setFetchType(1.toShort)
eventually(timeout(90.seconds), interval(500.milliseconds)) {
val resultsResp = client.FetchResults(fetchResultsReq)
val logs = resultsResp.getResults.getColumns.get(0).getStringVal.getValues.asScala
assert(logs.exists(_.matches(".*\\[Job .* Stages\\] \\[Stage .*\\]")))
}
}
}

test("SQLOperationListener configurable") {
val sql = "select /*+ REPARTITION(3, a) */ a from values(1) t(a);"
withSessionHandle { (client, handle) =>
Expand Down

0 comments on commit 5e57071

Please sign in to comment.