Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[KYUUBI #6411] Grpc common support #6412

Open
wants to merge 37 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
dd39efd
fix-#4057 info:
davidyuan1223 Jan 13, 2023
86e4e1c
fix-#4057 info: modify the shellcheck errors file in ./bin
davidyuan1223 Jan 13, 2023
cb11935
Merge remote-tracking branch 'origin/fix-#4057' into fix-#4057
davidyuan1223 Jan 13, 2023
55a0a43
Merge pull request #10 from xiaoyuandajian/fix-#4057
davidyuan1223 Jan 13, 2023
c48ad38
remove the used blank lines
davidyuan1223 Mar 5, 2023
16237c2
Merge branch 'apache:master' into master
davidyuan1223 Apr 2, 2023
bfa6cbf
Merge branch 'apache:master' into master
davidyuan1223 Sep 22, 2023
e244029
Merge branch 'apache:master' into master
davidyuan1223 Sep 26, 2023
b616044
fix_4186
davidyuan1223 Sep 26, 2023
360d183
Merge branch 'master' into fix_4186
davidyuan1223 Sep 26, 2023
c83836b
Merge pull request #11 from davidyuan1223/fix_4186
davidyuan1223 Sep 26, 2023
40e80d9
Revert "fix_4186"
davidyuan1223 Sep 27, 2023
0925a4b
Merge pull request #12 from davidyuan1223/revert-11-fix_4186
davidyuan1223 Sep 27, 2023
2beccb6
Merge branch 'apache:master' into master
davidyuan1223 Sep 28, 2023
c8eb9a2
Merge branch 'apache:master' into master
davidyuan1223 Oct 10, 2023
56b91a3
fix_4186
davidyuan1223 Oct 10, 2023
57ec746
Merge pull request #13 from davidyuan1223/fix
davidyuan1223 Oct 10, 2023
72e7aea
Merge branch 'apache:master' into master
davidyuan1223 Oct 20, 2023
bcb0cf3
Merge remote-tracking branch 'origin/master'
davidyuan1223 Oct 26, 2023
8b51840
add common method to get session level config
davidyuan1223 Oct 26, 2023
6376466
Merge branch 'apache:master' into master
davidyuan1223 Dec 23, 2023
46001d4
Merge branch 'apache:master' into master
davidyuan1223 Jan 26, 2024
3d6c53b
add new module common-grpc
davidyuan1223 May 23, 2024
e16fd7f
fix_4186
davidyuan1223 Oct 10, 2023
a3a4ebd
add common method to get session level config
davidyuan1223 Oct 26, 2023
631ac9a
add new module common-grpc
davidyuan1223 May 23, 2024
0a52c2a
fork master new version
davidyuan1223 May 23, 2024
d92fdb6
Merge remote-tracking branch 'origin/grpc-common-support' into grpc-c…
davidyuan1223 May 23, 2024
4c5a4ae
fix format error
davidyuan1223 May 23, 2024
3e302aa
fix format error
davidyuan1223 May 23, 2024
c6af8ed
update settings.md
davidyuan1223 May 23, 2024
a1e0447
update
davidyuan1223 May 28, 2024
5bf1814
update
davidyuan1223 Jun 4, 2024
e031c8c
update
davidyuan1223 Jun 4, 2024
55cbae3
update
davidyuan1223 Jul 10, 2024
0c17121
update
davidyuan1223 Jul 15, 2024
2b959f9
add new module kyuubi-grpc
davidyuan1223 Jul 23, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,17 @@ class SQLOperationListener(
spark: SparkSession) extends StatsReportListener with Logging {

private val operationId: String = operation.getHandle.identifier.toString
private lazy val activeJobs = new java.util.HashSet[Int]()
private lazy val activeStages = new ConcurrentHashMap[SparkStageAttempt, SparkStageInfo]()
private var executionId: Option[Long] = None

private lazy val activeJobs = new ConcurrentHashMap[Int, JobInfo]()
private lazy val activeStages = new ConcurrentHashMap[StageAttempt, StageInfo]()

private var executionId: Option[Long] = None
private val conf: KyuubiConf = operation.getSession.sessionManager.getConf
private lazy val consoleProgressBar =
if (conf.get(ENGINE_SPARK_SHOW_PROGRESS)) {
Some(new SparkConsoleProgressBar(
operation,
activeJobs,
activeStages,
conf.get(ENGINE_SPARK_SHOW_PROGRESS_UPDATE_INTERVAL),
conf.get(ENGINE_SPARK_SHOW_PROGRESS_TIME_FORMAT)))
Expand Down Expand Up @@ -82,6 +84,7 @@ class SQLOperationListener(
override def onJobStart(jobStart: SparkListenerJobStart): Unit = activeJobs.synchronized {
if (sameGroupId(jobStart.properties)) {
val jobId = jobStart.jobId
val stageIds = jobStart.stageInfos.map(_.stageId).toList
val stageSize = jobStart.stageInfos.size
if (executionId.isEmpty) {
executionId = Option(jobStart.properties.getProperty(SPARK_SQL_EXECUTION_ID_KEY))
Expand All @@ -94,7 +97,10 @@ class SQLOperationListener(
}
}
withOperationLog {
activeJobs.add(jobId)
activeJobs.put(
jobId,
new JobInfo(stageSize, stageIds)
)
info(s"Query [$operationId]: Job $jobId started with $stageSize stages," +
s" ${activeJobs.size()} active jobs running")
}
Expand All @@ -103,7 +109,7 @@ class SQLOperationListener(

override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = activeJobs.synchronized {
val jobId = jobEnd.jobId
if (activeJobs.remove(jobId)) {
if (activeJobs.remove(jobId) != null ) {
val hint = jobEnd.jobResult match {
case JobSucceeded => "succeeded"
case _ => "failed" // TODO: Handle JobFailed(exception: Exception)
Expand Down Expand Up @@ -134,14 +140,21 @@ class SQLOperationListener(

override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
val stageInfo = stageCompleted.stageInfo
val stageAttempt = SparkStageAttempt(stageInfo.stageId, stageInfo.attemptNumber())
val stageAttempt = StageAttempt(stageInfo.stageId, stageInfo.attemptNumber())
activeJobs.forEach((_, jobInfo) => {
if (jobInfo.stageIds.contains(stageInfo.stageId)) {
jobInfo.numCompleteStages += 1
}
})

activeStages.synchronized {
if (activeStages.remove(stageAttempt) != null) {
withOperationLog(super.onStageCompleted(stageCompleted))
}
}
}


override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = activeStages.synchronized {
val stageAttempt = SparkStageAttempt(taskStart.stageId, taskStart.stageAttemptId)
if (activeStages.containsKey(stageAttempt)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ import org.apache.kyuubi.operation.Operation

class SparkConsoleProgressBar(
operation: Operation,
liveStages: ConcurrentHashMap[SparkStageAttempt, SparkStageInfo],
liveJobs: ConcurrentHashMap[Int, JobInfo],
liveStages: ConcurrentHashMap[StageAttempt, StageInfo],
updatePeriodMSec: Long,
timeFormat: String)
extends Logging {
Expand Down Expand Up @@ -80,6 +81,13 @@ class SparkConsoleProgressBar(
private def show(now: Long, stages: Seq[SparkStageInfo]): Unit = {
val width = TerminalWidth / stages.size
val bar = stages.map { s =>
// build job log info
val jobId: Option[Int] = liveJobs.asScala.find {
case (jobId, jobInfo) => jobInfo.stageIds.contains(s.stageId)
}.map(_._1)
val jobInfoHeader = s"[Job ${jobId} " +
s"(${liveJobs.get(jobId).numCompleteStages} / ${liveJobs.get(jobId).numStages}) Stages] "
// build stage log info
val total = s.numTasks
val header = s"[Stage ${s.stageId}:"
val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]"
Expand All @@ -93,7 +101,7 @@ class SparkConsoleProgressBar(
} else {
""
}
header + bar + tailer
jobInfoHeader + header + bar + tailer
}.mkString("")

// only refresh if it's changed OR after 1 minute (or the ssh connection will be closed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ case class SparkStageAttempt(stageId: Int, stageAttemptId: Int) {
override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)"
}

class SparkStageInfo(val stageId: Int, val numTasks: Int) {
var numActiveTasks = new AtomicInteger(0)
var numCompleteTasks = new AtomicInteger(0)
class JobInfo(val numStages: Int, val stageIds: Seq[Int]) {
var numCompleteStages = 0
}

class StageInfo(val stageId: Int, val numTasks: Int) {
var numActiveTasks = 0
var numCompleteTasks = 0
}