Skip to content

SPARK-2380: Support displaying accumulator values in the web UI #1309

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,21 @@ import org.apache.spark.serializer.JavaSerializer
*
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `R` and `T`
* @param name human-readable name for use in Spark's web UI
* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
class Accumulable[R, T] (
@transient initialValue: R,
param: AccumulableParam[R, T])
param: AccumulableParam[R, T],
val name: Option[String])
extends Serializable {

val id = Accumulators.newId
def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
this(initialValue, param, None)

val id: Long = Accumulators.newId

@transient private var value_ = initialValue // Current value on master
val zero = param.zero(initialValue) // Zero value to be passed to workers
private var deserialized = false
Expand Down Expand Up @@ -219,8 +225,10 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
* @param param helper object defining how to add elements of type `T`
* @tparam T result type
*/
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T])
extends Accumulable[T,T](initialValue, param)
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
extends Accumulable[T,T](initialValue, param, name) {
def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
}

/**
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
Expand Down Expand Up @@ -281,4 +289,7 @@ private object Accumulators {
}
}
}

def stringifyPartialValue(partialValue: Any) = "%s".format(partialValue)
def stringifyValue(value: Any) = "%s".format(value)
}
19 changes: 19 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -760,6 +760,15 @@ class SparkContext(config: SparkConf) extends Logging {
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
new Accumulator(initialValue, param)

/**
* Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display
* in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the
* driver can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) = {
new Accumulator(initialValue, param, Some(name))
}

/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be (maybe you were planning to add it later since I know this is still WIP) a similar new accumulable method?

* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
* with `+=`. Only the driver can access the accumuable's `value`.
Expand All @@ -769,6 +778,16 @@ class SparkContext(config: SparkConf) extends Logging {
def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) =
new Accumulable(initialValue, param)

/**
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
* Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
* access the accumuable's `value`.
* @tparam T accumulator type
* @tparam R type that can be added to the accumulator
*/
def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]) =
new Accumulable(initialValue, param, Some(name))

/**
* Create an accumulator from a "mutable collection" type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,40 +429,99 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
def intAccumulator(initialValue: Int): Accumulator[java.lang.Integer] =
sc.accumulator(initialValue)(IntAccumulatorParam).asInstanceOf[Accumulator[java.lang.Integer]]

/**
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
def intAccumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
sc.accumulator(initialValue, name)(IntAccumulatorParam)
.asInstanceOf[Accumulator[java.lang.Integer]]

/**
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
def doubleAccumulator(initialValue: Double): Accumulator[java.lang.Double] =
sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]]

/**
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
def doubleAccumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
sc.accumulator(initialValue, name)(DoubleAccumulatorParam)
.asInstanceOf[Accumulator[java.lang.Double]]

/**
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue)

/**
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
def accumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
intAccumulator(initialValue, name)

/**
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*/
def accumulator(initialValue: Double): Accumulator[java.lang.Double] =
doubleAccumulator(initialValue)


/**
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
* to using the `add` method. Only the master can access the accumulator's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
def accumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
doubleAccumulator(initialValue, name)

/**
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
* values to using the `add` method. Only the master can access the accumulator's `value`.
*/
def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] =
sc.accumulator(initialValue)(accumulatorParam)

/**
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
* values to using the `add` method. Only the master can access the accumulator's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
def accumulator[T](initialValue: T, name: String, accumulatorParam: AccumulatorParam[T])
: Accumulator[T] =
sc.accumulator(initialValue, name)(accumulatorParam)

/**
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
* can "add" values with `add`. Only the master can access the accumuable's `value`.
*/
def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] =
sc.accumulable(initialValue)(param)

/**
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
* can "add" values with `add`. Only the master can access the accumuable's `value`.
*
* This version supports naming the accumulator for display in Spark's web UI.
*/
def accumulable[T, R](initialValue: T, name: String, param: AccumulableParam[T, R])
: Accumulable[T, R] =
sc.accumulable(initialValue, name)(param)

/**
* Broadcast a read-only variable to the cluster, returning a
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import org.apache.spark.annotation.DeveloperApi

/**
* :: DeveloperApi ::
* Information about an [[org.apache.spark.Accumulable]] modified during a task or stage.
*/
@DeveloperApi
class AccumulableInfo (
val id: Long,
val name: String,
val update: Option[String], // represents a partial update within a task
val value: String) {

override def equals(other: Any): Boolean = other match {
case acc: AccumulableInfo =>
this.id == acc.id && this.name == acc.name &&
this.update == acc.update && this.value == acc.value
case _ => false
}
}

object AccumulableInfo {
def apply(id: Long, name: String, update: Option[String], value: String) =
new AccumulableInfo(id, name, update, value)

def apply(id: Long, name: String, value: String) = new AccumulableInfo(id, name, None, value)
}
24 changes: 22 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,14 @@ class DAGScheduler(
val task = event.task
val stageId = task.stageId
val taskType = Utils.getFormattedClassName(task)
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))

// The success case is dealt with separately below, since we need to compute accumulator
// updates before posting.
if (event.reason != Success) {
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))
}

if (!stageIdToStage.contains(task.stageId)) {
// Skip all the actions if the stage has been cancelled.
return
Expand All @@ -906,12 +912,26 @@ class DAGScheduler(
if (event.accumUpdates != null) {
try {
Accumulators.add(event.accumUpdates)
event.accumUpdates.foreach { case (id, partialValue) =>
val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
// To avoid UI cruft, ignore cases where value wasn't updated
if (acc.name.isDefined && partialValue != acc.zero) {
val name = acc.name.get
val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
val stringValue = Accumulators.stringifyValue(acc.value)
stage.info.accumulables(id) = AccumulableInfo(id, name, stringValue)
event.taskInfo.accumulables +=
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
}
}
} catch {
// If we see an exception during accumulator update, just log the error and move on.
case e: Exception =>
logError(s"Failed to update accumulators for $task", e)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be moved to a method on the Accumulators companion object or something? These details about AccumulableInfo, prettyPartialValues, etc. aren't things that need to appear in the DAGScheduler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@markhamstra yeah we can move these elsewhere, good idea.

}
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
event.taskMetrics))
stage.pendingTasks -= task
task match {
case rt: ResultTask[_, _] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import scala.collection.mutable.HashMap

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.storage.RDDInfo

Expand All @@ -37,6 +39,8 @@ class StageInfo(
var completionTime: Option[Long] = None
/** If the stage failed, the reason why. */
var failureReason: Option[String] = None
/** Terminal values of accumulables updated during this stage. */
val accumulables = HashMap[Long, AccumulableInfo]()

def stageFailed(reason: String) {
failureReason = Some(reason)
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import scala.collection.mutable.ListBuffer

import org.apache.spark.annotation.DeveloperApi

/**
Expand All @@ -41,6 +43,13 @@ class TaskInfo(
*/
var gettingResultTime: Long = 0

/**
* Intermediate updates to accumulables during this task. Note that it is valid for the same
* accumulable to be updated multiple times in a single task or for two accumulables with the
* same name but different IDs to exist in a task.
*/
val accumulables = ListBuffer[AccumulableInfo]()

/**
* The time when the task has completed successfully (including the time to remotely fetch
* results, if necessary).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.ui.jobs

import scala.collection.mutable.{HashMap, ListBuffer}
import scala.collection.mutable.{HashMap, ListBuffer, Map}

import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -65,6 +65,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
new StageUIData
})

for ((id, info) <- stageCompleted.stageInfo.accumulables) {
stageData.accumulables(id) = info
}

poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId))
activeStages.remove(stageId)
if (stage.failureReason.isEmpty) {
Expand Down Expand Up @@ -130,6 +134,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
new StageUIData
})

for (accumulableInfo <- info.accumulables) {
stageData.accumulables(accumulableInfo.id) = accumulableInfo
}

val execSummaryMap = stageData.executorSummary
val execSummary = execSummaryMap.getOrElseUpdate(info.executorId, new ExecutorSummary)

Expand Down
Loading