-
Notifications
You must be signed in to change notification settings - Fork 28.6k
[SPARK-12895][SPARK-12896] Migrate TaskMetrics to accumulators #10835
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
42ca72d
2c62000
1167722
144df46
5ec17c1
362cde5
e43e8be
2330a37
cca87cc
4ead1ba
0f40753
76b605c
7b5d840
2069a78
d9813b1
40fd853
cdb3279
8e46ee3
628076d
2a3cd27
67a1bee
ec6ea44
6355dbd
ed81584
641f736
4ca7328
17db1c9
308db4c
b28fcd3
13bdb5c
8130ae1
ae15313
63be6b8
947bc99
7e7c2f4
943a6b8
f15c244
9ea0ceb
a21375c
6893419
d08a98e
7677ba0
4f1d823
dcf1b5b
9f964f2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,40 +35,67 @@ import org.apache.spark.util.Utils | |
* [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are | ||
* accumulating a set. You will add items to the set, and you will union two sets together. | ||
* | ||
* All accumulators created on the driver to be used on the executors must be registered with | ||
* [[Accumulators]]. This is already done automatically for accumulators created by the user. | ||
* Internal accumulators must be explicitly registered by the caller. | ||
* | ||
* Operations are not thread-safe. | ||
* | ||
* @param id ID of this accumulator; for internal use only. | ||
* @param initialValue initial value of accumulator | ||
* @param param helper object defining how to add elements of type `R` and `T` | ||
* @param name human-readable name for use in Spark's web UI | ||
* @param internal if this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported | ||
* to the driver via heartbeats. For internal [[Accumulable]]s, `R` must be | ||
* thread safe so that they can be reported correctly. | ||
* @param countFailedValues whether to accumulate values from failed tasks. This is set to true | ||
* for system and time metrics like serialization time or bytes spilled, | ||
* and false for things with absolute values like number of input rows. | ||
* This should be used for internal metrics only. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Whoops, overlooked this. I guess we'll revisit this if / when we decide to make this into a public-facing semantic. |
||
* @tparam R the full accumulated data (result type) | ||
* @tparam T partial data that can be added in | ||
*/ | ||
class Accumulable[R, T] private[spark] ( | ||
initialValue: R, | ||
class Accumulable[R, T] private ( | ||
val id: Long, | ||
@transient initialValue: R, | ||
param: AccumulableParam[R, T], | ||
val name: Option[String], | ||
internal: Boolean) | ||
internal: Boolean, | ||
private[spark] val countFailedValues: Boolean) | ||
extends Serializable { | ||
|
||
private[spark] def this( | ||
@transient initialValue: R, param: AccumulableParam[R, T], internal: Boolean) = { | ||
this(initialValue, param, None, internal) | ||
initialValue: R, | ||
param: AccumulableParam[R, T], | ||
name: Option[String], | ||
internal: Boolean, | ||
countFailedValues: Boolean) = { | ||
this(Accumulators.newId(), initialValue, param, name, internal, countFailedValues) | ||
} | ||
|
||
def this(@transient initialValue: R, param: AccumulableParam[R, T], name: Option[String]) = | ||
this(initialValue, param, name, false) | ||
private[spark] def this( | ||
initialValue: R, | ||
param: AccumulableParam[R, T], | ||
name: Option[String], | ||
internal: Boolean) = { | ||
this(initialValue, param, name, internal, false /* countFailedValues */) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can't write There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it won't let me |
||
} | ||
|
||
def this(@transient initialValue: R, param: AccumulableParam[R, T]) = | ||
this(initialValue, param, None) | ||
def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) = | ||
this(initialValue, param, name, false /* internal */) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
val id: Long = Accumulators.newId | ||
def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None) | ||
|
||
@volatile @transient private var value_ : R = initialValue // Current value on master | ||
val zero = param.zero(initialValue) // Zero value to be passed to workers | ||
@volatile @transient private var value_ : R = initialValue // Current value on driver | ||
val zero = param.zero(initialValue) // Zero value to be passed to executors | ||
private var deserialized = false | ||
|
||
Accumulators.register(this) | ||
// In many places we create internal accumulators without access to the active context cleaner, | ||
// so if we register them here then we may never unregister these accumulators. To avoid memory | ||
// leaks, we require the caller to explicitly register internal accumulators elsewhere. | ||
if (!internal) { | ||
Accumulators.register(this) | ||
} | ||
|
||
/** | ||
* If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver | ||
|
@@ -77,6 +104,17 @@ class Accumulable[R, T] private[spark] ( | |
*/ | ||
private[spark] def isInternal: Boolean = internal | ||
|
||
/** | ||
* Return a copy of this [[Accumulable]]. | ||
* | ||
* The copy will have the same ID as the original and will not be registered with | ||
* [[Accumulators]] again. This method exists so that the caller can avoid passing the | ||
* same mutable instance around. | ||
*/ | ||
private[spark] def copy(): Accumulable[R, T] = { | ||
new Accumulable[R, T](id, initialValue, param, name, internal, countFailedValues) | ||
} | ||
|
||
/** | ||
* Add more data to this accumulator / accumulable | ||
* @param term the data to add | ||
|
@@ -106,7 +144,7 @@ class Accumulable[R, T] private[spark] ( | |
def merge(term: R) { value_ = param.addInPlace(value_, term)} | ||
|
||
/** | ||
* Access the accumulator's current value; only allowed on master. | ||
* Access the accumulator's current value; only allowed on driver. | ||
*/ | ||
def value: R = { | ||
if (!deserialized) { | ||
|
@@ -128,7 +166,7 @@ class Accumulable[R, T] private[spark] ( | |
def localValue: R = value_ | ||
|
||
/** | ||
* Set the accumulator's value; only allowed on master. | ||
* Set the accumulator's value; only allowed on driver. | ||
*/ | ||
def value_= (newValue: R) { | ||
if (!deserialized) { | ||
|
@@ -139,22 +177,24 @@ class Accumulable[R, T] private[spark] ( | |
} | ||
|
||
/** | ||
* Set the accumulator's value; only allowed on master | ||
* Set the accumulator's value. For internal use only. | ||
*/ | ||
def setValue(newValue: R) { | ||
this.value = newValue | ||
} | ||
def setValue(newValue: R): Unit = { value_ = newValue } | ||
|
||
/** | ||
* Set the accumulator's value. For internal use only. | ||
*/ | ||
private[spark] def setValueAny(newValue: Any): Unit = { setValue(newValue.asInstanceOf[R]) } | ||
|
||
// Called by Java when deserializing an object | ||
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException { | ||
in.defaultReadObject() | ||
value_ = zero | ||
deserialized = true | ||
|
||
// Automatically register the accumulator when it is deserialized with the task closure. | ||
// | ||
// Note internal accumulators sent with task are deserialized before the TaskContext is created | ||
// and are registered in the TaskContext constructor. Other internal accumulators, such SQL | ||
// metrics, still need to register here. | ||
// This is for external accumulators and internal ones that do not represent task level | ||
// metrics, e.g. internal SQL metrics, which are per-operator. | ||
val taskContext = TaskContext.get() | ||
if (taskContext != null) { | ||
taskContext.registerAccumulator(this) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,9 +17,14 @@ | |
|
||
package org.apache.spark | ||
|
||
import scala.collection.{mutable, Map} | ||
import java.util.concurrent.atomic.AtomicLong | ||
import javax.annotation.concurrent.GuardedBy | ||
|
||
import scala.collection.mutable | ||
import scala.ref.WeakReference | ||
|
||
import org.apache.spark.storage.{BlockId, BlockStatus} | ||
|
||
|
||
/** | ||
* A simpler value of [[Accumulable]] where the result type being accumulated is the same | ||
|
@@ -49,14 +54,18 @@ import scala.ref.WeakReference | |
* | ||
* @param initialValue initial value of accumulator | ||
* @param param helper object defining how to add elements of type `T` | ||
* @param name human-readable name associated with this accumulator | ||
* @param internal whether this accumulator is used internally within Spark only | ||
* @param countFailedValues whether to accumulate values from failed tasks | ||
* @tparam T result type | ||
*/ | ||
class Accumulator[T] private[spark] ( | ||
@transient private[spark] val initialValue: T, | ||
param: AccumulatorParam[T], | ||
name: Option[String], | ||
internal: Boolean) | ||
extends Accumulable[T, T](initialValue, param, name, internal) { | ||
internal: Boolean, | ||
override val countFailedValues: Boolean = false) | ||
extends Accumulable[T, T](initialValue, param, name, internal, countFailedValues) { | ||
|
||
def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = { | ||
this(initialValue, param, name, false) | ||
|
@@ -75,43 +84,63 @@ private[spark] object Accumulators extends Logging { | |
* This global map holds the original accumulator objects that are created on the driver. | ||
* It keeps weak references to these objects so that accumulators can be garbage-collected | ||
* once the RDDs and user-code that reference them are cleaned up. | ||
* TODO: Don't use a global map; these should be tied to a SparkContext at the very least. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep, this is one of the many blockers to multiple active SparkContexts and a source of leaked state in tests, so we should try to have this be scoped within SparkEnv in the future. Feel free to create a followup JIRA for this if you think it's a small task that someone should pick up. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
*/ | ||
@GuardedBy("Accumulators") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 on this style of explicit documentation of locking / synchronization requirements. This is a good practice and we should do this more often. |
||
val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]() | ||
|
||
private var lastId: Long = 0 | ||
private val nextId = new AtomicLong(0L) | ||
|
||
def newId(): Long = synchronized { | ||
lastId += 1 | ||
lastId | ||
} | ||
/** | ||
* Return a globally unique ID for a new [[Accumulable]]. | ||
* Note: Once you copy the [[Accumulable]] the ID is no longer unique. | ||
*/ | ||
def newId(): Long = nextId.getAndIncrement | ||
|
||
/** | ||
* Register an [[Accumulable]] created on the driver such that it can be used on the executors. | ||
* | ||
* All accumulators registered here can later be used as a container for accumulating partial | ||
* values across multiple tasks. This is what [[org.apache.spark.scheduler.DAGScheduler]] does. | ||
* Note: if an accumulator is registered here, it should also be registered with the active | ||
* context cleaner for cleanup so as to avoid memory leaks. | ||
* | ||
* If an [[Accumulable]] with the same ID was already registered, this does nothing instead | ||
* of overwriting it. This happens when we copy accumulators, e.g. when we reconstruct | ||
* [[org.apache.spark.executor.TaskMetrics]] from accumulator updates. | ||
*/ | ||
def register(a: Accumulable[_, _]): Unit = synchronized { | ||
originals(a.id) = new WeakReference[Accumulable[_, _]](a) | ||
if (!originals.contains(a.id)) { | ||
originals(a.id) = new WeakReference[Accumulable[_, _]](a) | ||
} | ||
} | ||
|
||
def remove(accId: Long) { | ||
synchronized { | ||
originals.remove(accId) | ||
} | ||
/** | ||
* Unregister the [[Accumulable]] with the given ID, if any. | ||
*/ | ||
def remove(accId: Long): Unit = synchronized { | ||
originals.remove(accId) | ||
} | ||
|
||
// Add values to the original accumulators with some given IDs | ||
def add(values: Map[Long, Any]): Unit = synchronized { | ||
for ((id, value) <- values) { | ||
if (originals.contains(id)) { | ||
// Since we are now storing weak references, we must check whether the underlying data | ||
// is valid. | ||
originals(id).get match { | ||
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value | ||
case None => | ||
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.") | ||
} | ||
} else { | ||
logWarning(s"Ignoring accumulator update for unknown accumulator id $id") | ||
/** | ||
* Return the [[Accumulable]] registered with the given ID, if any. | ||
*/ | ||
def get(id: Long): Option[Accumulable[_, _]] = synchronized { | ||
originals.get(id).map { weakRef => | ||
// Since we are storing weak references, we must check whether the underlying data is valid. | ||
weakRef.get.getOrElse { | ||
throw new IllegalAccessError(s"Attempted to access garbage collected accumulator $id") | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Clear all registered [[Accumulable]]s. For testing only. | ||
*/ | ||
def clear(): Unit = synchronized { | ||
originals.clear() | ||
} | ||
|
||
} | ||
|
||
|
||
|
@@ -156,5 +185,23 @@ object AccumulatorParam { | |
def zero(initialValue: Float): Float = 0f | ||
} | ||
|
||
// TODO: Add AccumulatorParams for other types, e.g. lists and strings | ||
// Note: when merging values, this param just adopts the newer value. This is used only | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a little confusing and I maybe wonder whether it'd be worth having a class named something like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. let's do that later |
||
// internally for things that shouldn't really be accumulated across tasks, like input | ||
// read method, which should be the same across all tasks in the same stage. | ||
private[spark] object StringAccumulatorParam extends AccumulatorParam[String] { | ||
def addInPlace(t1: String, t2: String): String = t2 | ||
def zero(initialValue: String): String = "" | ||
} | ||
|
||
// Note: this is expensive as it makes a copy of the list every time the caller adds an item. | ||
// A better way to use this is to first accumulate the values yourself then them all at once. | ||
private[spark] class ListAccumulatorParam[T] extends AccumulatorParam[Seq[T]] { | ||
def addInPlace(t1: Seq[T], t2: Seq[T]): Seq[T] = t1 ++ t2 | ||
def zero(initialValue: Seq[T]): Seq[T] = Seq.empty[T] | ||
} | ||
|
||
// For the internal metric that records what blocks are updated in a particular task | ||
private[spark] object UpdatedBlockStatusesAccumulatorParam | ||
extends ListAccumulatorParam[(BlockId, BlockStatus)] | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reminds me: I believe that there's a paragraph in the Spark Programming Guide which describes the current accumulator semantics (sorta); it would be good to add new documentation for theses metric-style accumulators, since I imagine that a number of users would be interested in taking advantage of this new opt-in semantic in their own code.
We shouldn't block merging this patch on those doc updates (since I'm worried about merge-conflict potential and want to get this in sooner than later), but let's file a followup ticket so we don't forget.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, later