Skip to content

Commit

Permalink
improvement_add_job_log fix
Browse files Browse the repository at this point in the history
1. remove duplicate synchronized
2. because the activeJobs is ConcurrentHashMap, so reduce synchronized
3. fix scala code style
4. change forEach to asScala code style
5. change conf str to KyuubiConf.XXX.key
  • Loading branch information
davidyuan1223 committed Oct 17, 2023
1 parent 59340b7 commit 780f9d1
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.kyuubi
import java.util.Properties
import java.util.concurrent.ConcurrentHashMap

import scala.jdk.CollectionConverters._

import org.apache.spark.scheduler._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd
Expand Down Expand Up @@ -81,44 +83,39 @@ class SQLOperationListener(
}

override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
activeJobs.synchronized {
if (sameGroupId(jobStart.properties)) {
val jobId = jobStart.jobId
val stageIds = jobStart.stageInfos.map(_.stageId)
val stageSize = jobStart.stageInfos.size
if (executionId.isEmpty) {
executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY))
.map(_.toLong)
consoleProgressBar
operation match {
case executeStatement: ExecuteStatement =>
executeStatement.setCompiledStateIfNeeded()
case _ =>
}
}
activeJobs.put(
jobId,
new SparkJobInfo(stageSize, stageIds)
)
withOperationLog {
info(s"Query [$operationId]: Job $jobId started with $stageSize stages," +
s" ${activeJobs.size()} active jobs running")
if (sameGroupId(jobStart.properties)) {
val jobId = jobStart.jobId
val stageIds = jobStart.stageInfos.map(_.stageId)
val stageSize = jobStart.stageInfos.size
if (executionId.isEmpty) {
executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY))
.map(_.toLong)
consoleProgressBar
operation match {
case executeStatement: ExecuteStatement =>
executeStatement.setCompiledStateIfNeeded()
case _ =>
}
}
activeJobs.put(
jobId,
new SparkJobInfo(stageSize, stageIds))
withOperationLog {
info(s"Query [$operationId]: Job $jobId started with $stageSize stages," +
s" ${activeJobs.size()} active jobs running")
}
}
}

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = activeJobs.synchronized {
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = {
val jobId = jobEnd.jobId
activeJobs.synchronized {
if (activeJobs.remove(jobId) != null) {
val hint = jobEnd.jobResult match {
case JobSucceeded => "succeeded"
case _ => "failed" // TODO: Handle JobFailed(exception: Exception)
}
withOperationLog {
info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} active jobs running")
}
if (activeJobs.remove(jobId) != null) {
val hint = jobEnd.jobResult match {
case JobSucceeded => "succeeded"
case _ => "failed" // TODO: Handle JobFailed(exception: Exception)
}
withOperationLog {
info(s"Query [$operationId]: Job $jobId $hint, ${activeJobs.size()} active jobs running")
}
}
}
Expand Down Expand Up @@ -147,13 +144,11 @@ class SQLOperationListener(
val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber())
activeStages.synchronized {
if (activeStages.remove(stageAttempt) != null) {
activeJobs.synchronized {
activeJobs.forEach((jobId, sparkJobInfo) => {
if (sparkJobInfo.stageIds.contains(stageId)) {
sparkJobInfo.numCompleteStages.getAndIncrement()
}
})
}
activeJobs.asScala.foreach(item => {
if (item._2.stageIds.contains(stageId)) {
item._2.numCompleteStages.getAndIncrement()
}
})
withOperationLog(super.onStageCompleted(stageCompleted))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,15 @@ class SparkConsoleProgressBar(
* @return jobId,if not exists, return -1
*/
private def findJobId(stageId: Int): Int = {
liveJobs.forEach((jobId, sparkJobInfo) => {
if (sparkJobInfo.stageIds.contains(stageId)) {
return jobId
}
})
-1
val result: Option[Int] = liveJobs.asScala.find(item => {
item._2.stageIds.contains(stageId)
}).map(_._1)
result match {
case Some(value) =>
value
case None =>
-1
}
}
/**
* Show progress bar in console. The progress bar is displayed in the next line
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ class SparkStageInfo(val stageId: Int, val numTasks: Int) {
var numCompleteTasks = new AtomicInteger(0)
}


class SparkJobInfo(val numStages: Int, val stageIds: Seq[Int]) {
var numCompleteStages = new AtomicInteger(0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,19 @@ import scala.collection.JavaConverters.asScalaBufferConverter
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TFetchOrientation, TFetchResultsReq, TOperationHandle}
import org.scalatest.time.SpanSugar._

import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.OPERATION_SPARK_LISTENER_ENABLED
import org.apache.kyuubi.engine.spark.WithSparkSQLEngine
import org.apache.kyuubi.operation.HiveJDBCTestHelper

class SQLOperationListenerSuite extends WithSparkSQLEngine with HiveJDBCTestHelper {

override def withKyuubiConf: Map[String, String] = Map(
"kyuubi.session.engine.spark.showProgress" -> "true",
"kyuubi.session.engine.spark.progress.update.interval" -> "200")

KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS.key -> "true",
KyuubiConf.ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL.key -> "200")

override protected def jdbcUrl: String = getJdbcUrl



test("operation listener with progress job info") {
val sql = "SELECT java_method('java.lang.Thread', 'sleep', 10000l) FROM range(1, 3, 1, 2);"
withSessionHandle { (client, handle) =>
Expand Down

0 comments on commit 780f9d1

Please sign in to comment.