Skip to content

Commit d1daeb9

Browse files
author
Sundeep Narravula
committed
Incorporating review comments.
1 parent 8d97923 commit d1daeb9

File tree

9 files changed

+41
-28
lines changed

9 files changed

+41
-28
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1077,12 +1077,12 @@ class SparkContext(
10771077
}
10781078

10791079
/** Cancel a given job if it's scheduled or running */
1080-
def cancelJob(jobId: Int) {
1080+
private[spark] def cancelJob(jobId: Int) {
10811081
dagScheduler.cancelJob(jobId)
10821082
}
10831083

10841084
/** Cancel a given stage and all jobs associated with it */
1085-
def cancelStage(stageId: Int) {
1085+
private[spark] def cancelStage(stageId: Int) {
10861086
dagScheduler.cancelStage(stageId)
10871087
}
10881088

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,7 @@ class DAGScheduler(
494494
/**
495495
* Cancel a job that is running or waiting in the queue.
496496
*/
497-
private[spark] def cancelJob(jobId: Int) {
497+
def cancelJob(jobId: Int) {
498498
logInfo("Asked to cancel job " + jobId)
499499
eventProcessActor ! JobCancelled(jobId)
500500
}
@@ -570,11 +570,13 @@ class DAGScheduler(
570570
val activeInGroup = activeJobs.filter(activeJob =>
571571
groupId == activeJob.properties.get(SparkContext.SPARK_JOB_GROUP_ID))
572572
val jobIds = activeInGroup.map(_.jobId)
573-
jobIds.foreach(jobId => handleJobCancellation(jobId, "as part of cancelled job group %s".format(groupId)))
573+
jobIds.foreach(jobId => handleJobCancellation(jobId,
574+
"as part of cancelled job group %s".format(groupId)))
574575

575576
case AllJobsCancelled =>
576577
// Cancel all running jobs.
577-
runningStages.map(_.jobId).foreach(jobId => handleJobCancellation(jobId, "as part of cancellation of all jobs"))
578+
runningStages.map(_.jobId).foreach(jobId => handleJobCancellation(jobId,
579+
"as part of cancellation of all jobs"))
578580
activeJobs.clear() // These should already be empty by this point,
579581
jobIdToActiveJob.clear() // but just in case we lost track of some jobs...
580582

@@ -1003,7 +1005,7 @@ class DAGScheduler(
10031005

10041006
private def handleStageCancellation(stageId: Int) {
10051007
if (stageIdToJobIds.contains(stageId)) {
1006-
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray.sorted
1008+
val jobsThatUseStage: Array[Int] = stageIdToJobIds(stageId).toArray
10071009
jobsThatUseStage.foreach(jobId => {
10081010
handleJobCancellation(jobId, "because Stage %s was cancelled".format(stageId))
10091011
})
@@ -1016,7 +1018,7 @@ class DAGScheduler(
10161018
if (!jobIdToStageIds.contains(jobId)) {
10171019
logDebug("Trying to cancel unregistered job " + jobId)
10181020
} else {
1019-
failJobAndIndependentStages(jobIdToActiveJob(jobId),
1021+
failJobAndIndependentStages(jobIdToActiveJob(jobId),
10201022
"Job %d cancelled %s".format(jobId, reason), None)
10211023
}
10221024
}

core/src/main/scala/org/apache/spark/ui/SparkUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ private[spark] class SparkUI(
4646
val live = sc != null
4747

4848
val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf)
49-
val killEnabled = conf.get("spark.ui.killEnabled", "false").toBoolean
49+
val killEnabled = conf.getBoolean("spark.ui.killEnabled", true)
5050

5151
private val bindHost = Utils.localHostName()
5252
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)

core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ private[ui] class IndexPage(parent: JobProgressUI) {
3333
private val sc = parent.sc
3434
private lazy val listener = parent.listener
3535
private lazy val isFairScheduler = parent.isFairScheduler
36+
private val killEnabled = parent.killEnabled
3637

3738
def render(request: HttpServletRequest): Seq[Node] = {
3839
listener.synchronized {
@@ -41,6 +42,16 @@ private[ui] class IndexPage(parent: JobProgressUI) {
4142
val failedStages = listener.failedStages.reverse.toSeq
4243
val now = System.currentTimeMillis()
4344

45+
if (killEnabled) {
46+
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
47+
val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
48+
49+
if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
50+
sc.cancelStage(stageId)
51+
}
52+
}
53+
54+
4455
val activeStagesTable =
4556
new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent, parent.killEnabled)
4657
val completedStagesTable =

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ private[ui] class JobProgressListener(conf: SparkConf) extends SparkListener {
6161
val stageIdToPool = HashMap[Int, String]()
6262
val stageIdToDescription = HashMap[Int, String]()
6363
val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
64-
val jobIdToStageIds = HashMap[Int, Seq[Int]]()
6564

6665
val executorIdToBlockManagerId = HashMap[String, BlockManagerId]()
6766

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,5 +57,5 @@ private[ui] class JobProgressUI(parent: SparkUI) {
5757
(request: HttpServletRequest) => poolPage.render(request), parent.securityManager, basePath),
5858
createServletHandler("/stages",
5959
(request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath)
60-
)
60+
)
6161
}

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,11 @@ private[ui] class StagePage(parent: JobProgressUI) {
3232
private val basePath = parent.basePath
3333
private lazy val listener = parent.listener
3434
private lazy val sc = parent.sc
35-
private val killEnabled = parent.killEnabled
3635

3736
def render(request: HttpServletRequest): Seq[Node] = {
3837
listener.synchronized {
3938
val stageId = request.getParameter("id").toInt
4039

41-
if (killEnabled) {
42-
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
43-
44-
if (killFlag && listener.activeStages.contains(stageId)) {
45-
sc.cancelStage(stageId)
46-
}
47-
}
48-
4940
if (!listener.stageIdToTaskData.contains(stageId)) {
5041
val content =
5142
<div>

core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,11 @@ import org.apache.spark.ui.{WebUI, UIUtils}
2727
import org.apache.spark.util.Utils
2828

2929
/** Page showing list of all ongoing and recently finished stages */
30-
private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI, killEnabled: Boolean = false) {
30+
private[ui] class StageTable(
31+
stages: Seq[StageInfo],
32+
parent: JobProgressUI,
33+
killEnabled: Boolean = false) {
34+
3135
private val basePath = parent.basePath
3236
private lazy val listener = parent.listener
3337
private lazy val isFairScheduler = parent.isFairScheduler
@@ -76,16 +80,15 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI, kill
7680
<a href={"%s/stages/stage?id=%s".format(UIUtils.prependBaseUri(basePath), s.stageId)}>
7781
{s.name}
7882
</a>
79-
val killButton = if (killEnabled) {
80-
<form action={"%s/stages/stage/".format(UIUtils.prependBaseUri(basePath))}>
81-
<input type="hidden" value={"true"} name="terminate" />
82-
<input type="hidden" value={"" + s.stageId} name="id" />
83-
<input type="submit" value="Terminate Job"/>
84-
</form>
83+
val killLink = if (killEnabled) {
84+
<div>[<a href={"%s/stages?id=%s&terminate=true".format(UIUtils.prependBaseUri(basePath), s.stageId)}>
85+
Kill
86+
</a>]</div>
87+
8588
}
8689
val description = listener.stageIdToDescription.get(s.stageId)
87-
.map(d => <div><em>{d}</em></div><div>{nameLink} {killButton}</div>)
88-
.getOrElse(<div>{nameLink} {killButton}</div>)
90+
.map(d => <div><em>{d}</em></div><div>{nameLink} {killLink}</div>)
91+
.getOrElse(<div>{nameLink} {killLink}</div>)
8992

9093
return description
9194
}

docs/configuration.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,13 @@ Apart from these, the following properties are also available, and may be useful
190190
user that started the Spark job has view access.
191191
</td>
192192
</tr>
193+
<tr>
194+
<td>spark.ui.killEnabled</td>
195+
<td>true</td>
196+
<td>
197+
Allows stages and corresponding jobs to be killed from the web ui.
198+
</td>
199+
</tr>
193200
<tr>
194201
<td>spark.shuffle.compress</td>
195202
<td>true</td>

0 commit comments

Comments
 (0)