Skip to content

Commit 74f82c7

Browse files
committed
SPARK-2380: Support displaying accumulator values in the web UI
This patch adds support for giving accumulators user-visible names and displaying accumulator values in the web UI. This allows users to create custom counters that can display in the UI. The current approach displays both the accumulator deltas caused by each task and a "current" value of the accumulator totals for each stage, which gets update as tasks finish. Currently in Spark developers have been extending the `TaskMetrics` functionality to provide custom instrumentation for RDD's. This provides a potentially nicer alternative of going through the existing accumulator framework (actually `TaskMetrics` and accumulators are on an awkward collision course as we add more features to the former). The current patch demo's how we can use the feature to provide instrumentation for RDD input sizes. The nice thing about going through accumulators is that users can read the current value of the data being tracked in their programs. This could be useful to e.g. decide to short-circuit a Spark stage depending on how things are going. ![counters](https://cloud.githubusercontent.com/assets/320616/3488815/6ee7bc34-0505-11e4-84ce-e36d9886e2cf.png) Author: Patrick Wendell <pwendell@gmail.com> Closes #1309 from pwendell/metrics and squashes the following commits: 8815308 [Patrick Wendell] Merge remote-tracking branch 'apache/master' into HEAD 93fbe0f [Patrick Wendell] Other minor fixes cc43f68 [Patrick Wendell] Updating unit tests c991b1b [Patrick Wendell] Moving some code into the Accumulators class 9a9ba3c [Patrick Wendell] More merge fixes c5ace9e [Patrick Wendell] More merge conflicts 1da15e3 [Patrick Wendell] Merge remote-tracking branch 'apache/master' into metrics 9860c55 [Patrick Wendell] Potential solution to posting listener events 0bb0e33 [Patrick Wendell] Remove "display" variable and assume display = name.isDefined 0ec4ac7 [Patrick Wendell] Java API's e95bf69 [Patrick Wendell] Stash be97261 [Patrick Wendell] Style fix 8407308 [Patrick Wendell] Removing examples in Hadoop and RDD class 64d405f [Patrick Wendell] Adding missing file 5d8b156 [Patrick Wendell] Changes based on Kay's review. 9f18bad [Patrick Wendell] Minor style changes and tests 7a63abc [Patrick Wendell] Adding Json serialization and responding to Reynold's feedback ad85076 [Patrick Wendell] Example of using named accumulators for custom RDD metrics. 0b72660 [Patrick Wendell] Initial WIP example of supporing globally named accumulators.
1 parent ac3440f commit 74f82c7

File tree

13 files changed

+294
-27
lines changed

13 files changed

+294
-27
lines changed

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,21 @@ import org.apache.spark.serializer.JavaSerializer
3636
*
3737
* @param initialValue initial value of accumulator
3838
* @param param helper object defining how to add elements of type `R` and `T`
39+
* @param name human-readable name for use in Spark's web UI
3940
* @tparam R the full accumulated data (result type)
4041
* @tparam T partial data that can be added in
4142
*/
4243
class Accumulable[R, T] (
4344
@transient initialValue: R,
44-
param: AccumulableParam[R, T])
45+
param: AccumulableParam[R, T],
46+
val name: Option[String])
4547
extends Serializable {
4648

47-
val id = Accumulators.newId
49+
def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
50+
this(initialValue, param, None)
51+
52+
val id: Long = Accumulators.newId
53+
4854
@transient private var value_ = initialValue // Current value on master
4955
val zero = param.zero(initialValue) // Zero value to be passed to workers
5056
private var deserialized = false
@@ -219,8 +225,10 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
219225
* @param param helper object defining how to add elements of type `T`
220226
* @tparam T result type
221227
*/
222-
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T])
223-
extends Accumulable[T,T](initialValue, param)
228+
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
229+
extends Accumulable[T,T](initialValue, param, name) {
230+
def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
231+
}
224232

225233
/**
226234
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
@@ -281,4 +289,7 @@ private object Accumulators {
281289
}
282290
}
283291
}
292+
293+
def stringifyPartialValue(partialValue: Any) = "%s".format(partialValue)
294+
def stringifyValue(value: Any) = "%s".format(value)
284295
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,15 @@ class SparkContext(config: SparkConf) extends Logging {
760760
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
761761
new Accumulator(initialValue, param)
762762

763+
/**
764+
* Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display
765+
* in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the
766+
* driver can access the accumulator's `value`.
767+
*/
768+
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) = {
769+
new Accumulator(initialValue, param, Some(name))
770+
}
771+
763772
/**
764773
* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
765774
* with `+=`. Only the driver can access the accumuable's `value`.
@@ -769,6 +778,16 @@ class SparkContext(config: SparkConf) extends Logging {
769778
def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) =
770779
new Accumulable(initialValue, param)
771780

781+
/**
782+
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
783+
* Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
784+
* access the accumuable's `value`.
785+
* @tparam T accumulator type
786+
* @tparam R type that can be added to the accumulator
787+
*/
788+
def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]) =
789+
new Accumulable(initialValue, param, Some(name))
790+
772791
/**
773792
* Create an accumulator from a "mutable collection" type.
774793
*

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,40 +429,99 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
429429
def intAccumulator(initialValue: Int): Accumulator[java.lang.Integer] =
430430
sc.accumulator(initialValue)(IntAccumulatorParam).asInstanceOf[Accumulator[java.lang.Integer]]
431431

432+
/**
433+
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
434+
* to using the `add` method. Only the master can access the accumulator's `value`.
435+
*
436+
* This version supports naming the accumulator for display in Spark's web UI.
437+
*/
438+
def intAccumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
439+
sc.accumulator(initialValue, name)(IntAccumulatorParam)
440+
.asInstanceOf[Accumulator[java.lang.Integer]]
441+
432442
/**
433443
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
434444
* to using the `add` method. Only the master can access the accumulator's `value`.
435445
*/
436446
def doubleAccumulator(initialValue: Double): Accumulator[java.lang.Double] =
437447
sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]]
438448

449+
/**
450+
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
451+
* to using the `add` method. Only the master can access the accumulator's `value`.
452+
*
453+
* This version supports naming the accumulator for display in Spark's web UI.
454+
*/
455+
def doubleAccumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
456+
sc.accumulator(initialValue, name)(DoubleAccumulatorParam)
457+
.asInstanceOf[Accumulator[java.lang.Double]]
458+
439459
/**
440460
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
441461
* to using the `add` method. Only the master can access the accumulator's `value`.
442462
*/
443463
def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue)
444464

465+
/**
466+
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
467+
* to using the `add` method. Only the master can access the accumulator's `value`.
468+
*
469+
* This version supports naming the accumulator for display in Spark's web UI.
470+
*/
471+
def accumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
472+
intAccumulator(initialValue, name)
473+
445474
/**
446475
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
447476
* to using the `add` method. Only the master can access the accumulator's `value`.
448477
*/
449478
def accumulator(initialValue: Double): Accumulator[java.lang.Double] =
450479
doubleAccumulator(initialValue)
451480

481+
482+
/**
483+
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
484+
* to using the `add` method. Only the master can access the accumulator's `value`.
485+
*
486+
* This version supports naming the accumulator for display in Spark's web UI.
487+
*/
488+
def accumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
489+
doubleAccumulator(initialValue, name)
490+
452491
/**
453492
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
454493
* values to using the `add` method. Only the master can access the accumulator's `value`.
455494
*/
456495
def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] =
457496
sc.accumulator(initialValue)(accumulatorParam)
458497

498+
/**
499+
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
500+
* values to using the `add` method. Only the master can access the accumulator's `value`.
501+
*
502+
* This version supports naming the accumulator for display in Spark's web UI.
503+
*/
504+
def accumulator[T](initialValue: T, name: String, accumulatorParam: AccumulatorParam[T])
505+
: Accumulator[T] =
506+
sc.accumulator(initialValue, name)(accumulatorParam)
507+
459508
/**
460509
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
461510
* can "add" values with `add`. Only the master can access the accumuable's `value`.
462511
*/
463512
def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] =
464513
sc.accumulable(initialValue)(param)
465514

515+
/**
516+
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
517+
* can "add" values with `add`. Only the master can access the accumuable's `value`.
518+
*
519+
* This version supports naming the accumulator for display in Spark's web UI.
520+
*/
521+
def accumulable[T, R](initialValue: T, name: String, param: AccumulableParam[T, R])
522+
: Accumulable[T, R] =
523+
sc.accumulable(initialValue, name)(param)
524+
466525
/**
467526
* Broadcast a read-only variable to the cluster, returning a
468527
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import org.apache.spark.annotation.DeveloperApi
21+
22+
/**
23+
* :: DeveloperApi ::
24+
* Information about an [[org.apache.spark.Accumulable]] modified during a task or stage.
25+
*/
26+
@DeveloperApi
27+
class AccumulableInfo (
28+
val id: Long,
29+
val name: String,
30+
val update: Option[String], // represents a partial update within a task
31+
val value: String) {
32+
33+
override def equals(other: Any): Boolean = other match {
34+
case acc: AccumulableInfo =>
35+
this.id == acc.id && this.name == acc.name &&
36+
this.update == acc.update && this.value == acc.value
37+
case _ => false
38+
}
39+
}
40+
41+
object AccumulableInfo {
42+
def apply(id: Long, name: String, update: Option[String], value: String) =
43+
new AccumulableInfo(id, name, update, value)
44+
45+
def apply(id: Long, name: String, value: String) = new AccumulableInfo(id, name, None, value)
46+
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -883,8 +883,14 @@ class DAGScheduler(
883883
val task = event.task
884884
val stageId = task.stageId
885885
val taskType = Utils.getFormattedClassName(task)
886-
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
887-
event.taskMetrics))
886+
887+
// The success case is dealt with separately below, since we need to compute accumulator
888+
// updates before posting.
889+
if (event.reason != Success) {
890+
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
891+
event.taskMetrics))
892+
}
893+
888894
if (!stageIdToStage.contains(task.stageId)) {
889895
// Skip all the actions if the stage has been cancelled.
890896
return
@@ -906,12 +912,26 @@ class DAGScheduler(
906912
if (event.accumUpdates != null) {
907913
try {
908914
Accumulators.add(event.accumUpdates)
915+
event.accumUpdates.foreach { case (id, partialValue) =>
916+
val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
917+
// To avoid UI cruft, ignore cases where value wasn't updated
918+
if (acc.name.isDefined && partialValue != acc.zero) {
919+
val name = acc.name.get
920+
val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
921+
val stringValue = Accumulators.stringifyValue(acc.value)
922+
stage.info.accumulables(id) = AccumulableInfo(id, name, stringValue)
923+
event.taskInfo.accumulables +=
924+
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
925+
}
926+
}
909927
} catch {
910928
// If we see an exception during accumulator update, just log the error and move on.
911929
case e: Exception =>
912930
logError(s"Failed to update accumulators for $task", e)
913931
}
914932
}
933+
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
934+
event.taskMetrics))
915935
stage.pendingTasks -= task
916936
task match {
917937
case rt: ResultTask[_, _] =>

core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import scala.collection.mutable.HashMap
21+
2022
import org.apache.spark.annotation.DeveloperApi
2123
import org.apache.spark.storage.RDDInfo
2224

@@ -37,6 +39,8 @@ class StageInfo(
3739
var completionTime: Option[Long] = None
3840
/** If the stage failed, the reason why. */
3941
var failureReason: Option[String] = None
42+
/** Terminal values of accumulables updated during this stage. */
43+
val accumulables = HashMap[Long, AccumulableInfo]()
4044

4145
def stageFailed(reason: String) {
4246
failureReason = Some(reason)

core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import scala.collection.mutable.ListBuffer
21+
2022
import org.apache.spark.annotation.DeveloperApi
2123

2224
/**
@@ -41,6 +43,13 @@ class TaskInfo(
4143
*/
4244
var gettingResultTime: Long = 0
4345

46+
/**
47+
* Intermediate updates to accumulables during this task. Note that it is valid for the same
48+
* accumulable to be updated multiple times in a single task or for two accumulables with the
49+
* same name but different IDs to exist in a task.
50+
*/
51+
val accumulables = ListBuffer[AccumulableInfo]()
52+
4453
/**
4554
* The time when the task has completed successfully (including the time to remotely fetch
4655
* results, if necessary).

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.ui.jobs
1919

20-
import scala.collection.mutable.{HashMap, ListBuffer}
20+
import scala.collection.mutable.{HashMap, ListBuffer, Map}
2121

2222
import org.apache.spark._
2323
import org.apache.spark.annotation.DeveloperApi
@@ -65,6 +65,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
6565
new StageUIData
6666
})
6767

68+
for ((id, info) <- stageCompleted.stageInfo.accumulables) {
69+
stageData.accumulables(id) = info
70+
}
71+
6872
poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId))
6973
activeStages.remove(stageId)
7074
if (stage.failureReason.isEmpty) {
@@ -130,6 +134,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
130134
new StageUIData
131135
})
132136

137+
for (accumulableInfo <- info.accumulables) {
138+
stageData.accumulables(accumulableInfo.id) = accumulableInfo
139+
}
140+
133141
val execSummaryMap = stageData.executorSummary
134142
val execSummary = execSummaryMap.getOrElseUpdate(info.executorId, new ExecutorSummary)
135143

0 commit comments

Comments
 (0)