Skip to content

[SPARK-12895][SPARK-12896] Migrate TaskMetrics to accumulators #10835

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 45 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
42ca72d
Change types of some signatures
Jan 18, 2016
2c62000
Boiler plate for all the new internal accums
Jan 18, 2016
1167722
Squashed commit of the following:
Jan 19, 2016
144df46
Implement TaskMetrics using Accumulators
Jan 19, 2016
5ec17c1
Fix accums not set on the driver
Jan 19, 2016
362cde5
Fix test compile
Jan 19, 2016
e43e8be
Make accum updates read from all registered accums
Jan 19, 2016
2330a37
Fix metrics being double counted on driver
Jan 19, 2016
cca87cc
Merge branch 'master' of github.com:apache/spark into task-metrics-us…
Jan 19, 2016
4ead1ba
Miscellaneous updates; make diff smaller
Jan 19, 2016
0f40753
Fix JsonProtocolSuite
Jan 19, 2016
76b605c
Fix SQLQuerySuite
Jan 19, 2016
7b5d840
Fix style
Jan 19, 2016
2069a78
Fix MiMa
Jan 19, 2016
d9813b1
Add test on accum values being zero'ed out + cleanups
Jan 19, 2016
40fd853
Add tests for TaskMetrics, which uncovered a bug
Jan 19, 2016
cdb3279
Minor test changes
Jan 19, 2016
8e46ee3
Merge branch 'master' of github.com:apache/spark into task-metrics-us…
Jan 19, 2016
628076d
Fix style
Jan 19, 2016
2a3cd27
Fix MiMa
Jan 19, 2016
67a1bee
Fix InputOutputMetricsSuite
Jan 20, 2016
ec6ea44
Fix TaskContextSuite
Jan 20, 2016
6355dbd
Fix ReplayListenerSuite
Jan 20, 2016
ed81584
Fix SparkListenerSuite
Jan 20, 2016
641f736
Merge branch 'master' of github.com:apache/spark into task-metrics-us…
Jan 20, 2016
4ca7328
Minor comment correction
Jan 20, 2016
17db1c9
Add test to verify internal accums are cleaned up
Jan 20, 2016
308db4c
Add deprecated matching methods for Input/OutputMetrics
Jan 20, 2016
b28fcd3
Address review comments
Jan 25, 2016
13bdb5c
Merge branch 'master' of github.com:apache/spark into task-metrics-us…
Jan 25, 2016
8130ae1
Implement SPARK-12896 (squashed)
Jan 20, 2016
ae15313
Merge branch 'master' of github.com:apache/spark into task-metrics-us…
Jan 25, 2016
63be6b8
Add test for setting result size
Jan 26, 2016
947bc99
Fix JsonProtocolSuite
Jan 26, 2016
7e7c2f4
Address review comments (not quite done yet)
Jan 26, 2016
943a6b8
Address review comments (round 3)
Jan 26, 2016
f15c244
Make AccumulableInfo#name an Option[String]
Jan 26, 2016
9ea0ceb
Fix style
Jan 26, 2016
a21375c
Address review comments (still round 3)
Jan 26, 2016
6893419
Merge branch 'master' of github.com:apache/spark into task-metrics-us…
Jan 26, 2016
d08a98e
Add more tests for accums JSON de/serialization
Jan 26, 2016
7677ba0
Fix SparkListenerSuite
Jan 26, 2016
4f1d823
Fix some weird indentations
Jan 26, 2016
dcf1b5b
Fix InternalAccumulatorSuite
Jan 27, 2016
9f964f2
Merge branch 'master' of github.com:apache/spark into task-metrics-us…
Jan 27, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -444,13 +444,7 @@ private long[] mergeSpillsWithTransferTo(SpillInfo[] spills, File outputFile) th
@Override
public Option<MapStatus> stop(boolean success) {
try {
// Update task metrics from accumulators (null in UnsafeShuffleWriterSuite)
Map<String, Accumulator<Object>> internalAccumulators =
taskContext.internalMetricsToAccumulators();
if (internalAccumulators != null) {
internalAccumulators.apply(InternalAccumulator.PEAK_EXECUTION_MEMORY())
.add(getPeakMemoryUsedBytes());
}
taskContext.taskMetrics().incPeakExecutionMemory(getPeakMemoryUsedBytes());

if (stopping) {
return Option.apply(null);
Expand Down
86 changes: 63 additions & 23 deletions core/src/main/scala/org/apache/spark/Accumulable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,40 +35,67 @@ import org.apache.spark.util.Utils
* [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are
* accumulating a set. You will add items to the set, and you will union two sets together.
*
* All accumulators created on the driver to be used on the executors must be registered with
* [[Accumulators]]. This is already done automatically for accumulators created by the user.
* Internal accumulators must be explicitly registered by the caller.
*
* Operations are not thread-safe.
*
* @param id ID of this accumulator; for internal use only.
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `R` and `T`
* @param name human-readable name for use in Spark's web UI
* @param internal if this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported
* to the driver via heartbeats. For internal [[Accumulable]]s, `R` must be
* thread safe so that they can be reported correctly.
* @param countFailedValues whether to accumulate values from failed tasks. This is set to true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reminds me: I believe that there's a paragraph in the Spark Programming Guide which describes the current accumulator semantics (sorta); it would be good to add new documentation for theses metric-style accumulators, since I imagine that a number of users would be interested in taking advantage of this new opt-in semantic in their own code.

We shouldn't block merging this patch on those doc updates (since I'm worried about merge-conflict potential and want to get this in sooner than later), but let's file a followup ticket so we don't forget.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, later

* for system and time metrics like serialization time or bytes spilled,
* and false for things with absolute values like number of input rows.
* This should be used for internal metrics only.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, overlooked this. I guess we'll revisit this if / when we decide to make this into a public-facing semantic.

* @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
class Accumulable[R, T] private[spark] (
initialValue: R,
class Accumulable[R, T] private (
val id: Long,
@transient initialValue: R,
param: AccumulableParam[R, T],
val name: Option[String],
internal: Boolean)
internal: Boolean,
private[spark] val countFailedValues: Boolean)
extends Serializable {

private[spark] def this(
@transient initialValue: R, param: AccumulableParam[R, T], internal: Boolean) = {
this(initialValue, param, None, internal)
initialValue: R,
param: AccumulableParam[R, T],
name: Option[String],
internal: Boolean,
countFailedValues: Boolean) = {
this(Accumulators.newId(), initialValue, param, name, internal, countFailedValues)
}

def this(@transient initialValue: R, param: AccumulableParam[R, T], name: Option[String]) =
this(initialValue, param, name, false)
private[spark] def this(
initialValue: R,
param: AccumulableParam[R, T],
name: Option[String],
internal: Boolean) = {
this(initialValue, param, name, internal, false /* countFailedValues */)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't write countFailedValues = false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it won't let me

}

def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
this(initialValue, param, None)
def this(initialValue: R, param: AccumulableParam[R, T], name: Option[String]) =
this(initialValue, param, name, false /* internal */)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

internal = False?


val id: Long = Accumulators.newId
def this(initialValue: R, param: AccumulableParam[R, T]) = this(initialValue, param, None)

@volatile @transient private var value_ : R = initialValue // Current value on master
val zero = param.zero(initialValue) // Zero value to be passed to workers
@volatile @transient private var value_ : R = initialValue // Current value on driver
val zero = param.zero(initialValue) // Zero value to be passed to executors
private var deserialized = false

Accumulators.register(this)
// In many places we create internal accumulators without access to the active context cleaner,
// so if we register them here then we may never unregister these accumulators. To avoid memory
// leaks, we require the caller to explicitly register internal accumulators elsewhere.
if (!internal) {
Accumulators.register(this)
}

/**
* If this [[Accumulable]] is internal. Internal [[Accumulable]]s will be reported to the driver
Expand All @@ -77,6 +104,17 @@ class Accumulable[R, T] private[spark] (
*/
private[spark] def isInternal: Boolean = internal

/**
* Return a copy of this [[Accumulable]].
*
* The copy will have the same ID as the original and will not be registered with
* [[Accumulators]] again. This method exists so that the caller can avoid passing the
* same mutable instance around.
*/
private[spark] def copy(): Accumulable[R, T] = {
new Accumulable[R, T](id, initialValue, param, name, internal, countFailedValues)
}

/**
* Add more data to this accumulator / accumulable
* @param term the data to add
Expand Down Expand Up @@ -106,7 +144,7 @@ class Accumulable[R, T] private[spark] (
def merge(term: R) { value_ = param.addInPlace(value_, term)}

/**
* Access the accumulator's current value; only allowed on master.
* Access the accumulator's current value; only allowed on driver.
*/
def value: R = {
if (!deserialized) {
Expand All @@ -128,7 +166,7 @@ class Accumulable[R, T] private[spark] (
def localValue: R = value_

/**
* Set the accumulator's value; only allowed on master.
* Set the accumulator's value; only allowed on driver.
*/
def value_= (newValue: R) {
if (!deserialized) {
Expand All @@ -139,22 +177,24 @@ class Accumulable[R, T] private[spark] (
}

/**
* Set the accumulator's value; only allowed on master
* Set the accumulator's value. For internal use only.
*/
def setValue(newValue: R) {
this.value = newValue
}
def setValue(newValue: R): Unit = { value_ = newValue }

/**
* Set the accumulator's value. For internal use only.
*/
private[spark] def setValueAny(newValue: Any): Unit = { setValue(newValue.asInstanceOf[R]) }

// Called by Java when deserializing an object
private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
in.defaultReadObject()
value_ = zero
deserialized = true

// Automatically register the accumulator when it is deserialized with the task closure.
//
// Note internal accumulators sent with task are deserialized before the TaskContext is created
// and are registered in the TaskContext constructor. Other internal accumulators, such SQL
// metrics, still need to register here.
// This is for external accumulators and internal ones that do not represent task level
// metrics, e.g. internal SQL metrics, which are per-operator.
val taskContext = TaskContext.get()
if (taskContext != null) {
taskContext.registerAccumulator(this)
Expand Down
101 changes: 74 additions & 27 deletions core/src/main/scala/org/apache/spark/Accumulator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@

package org.apache.spark

import scala.collection.{mutable, Map}
import java.util.concurrent.atomic.AtomicLong
import javax.annotation.concurrent.GuardedBy

import scala.collection.mutable
import scala.ref.WeakReference

import org.apache.spark.storage.{BlockId, BlockStatus}


/**
* A simpler value of [[Accumulable]] where the result type being accumulated is the same
Expand Down Expand Up @@ -49,14 +54,18 @@ import scala.ref.WeakReference
*
* @param initialValue initial value of accumulator
* @param param helper object defining how to add elements of type `T`
* @param name human-readable name associated with this accumulator
* @param internal whether this accumulator is used internally within Spark only
* @param countFailedValues whether to accumulate values from failed tasks
* @tparam T result type
*/
class Accumulator[T] private[spark] (
@transient private[spark] val initialValue: T,
param: AccumulatorParam[T],
name: Option[String],
internal: Boolean)
extends Accumulable[T, T](initialValue, param, name, internal) {
internal: Boolean,
override val countFailedValues: Boolean = false)
extends Accumulable[T, T](initialValue, param, name, internal, countFailedValues) {

def this(initialValue: T, param: AccumulatorParam[T], name: Option[String]) = {
this(initialValue, param, name, false)
Expand All @@ -75,43 +84,63 @@ private[spark] object Accumulators extends Logging {
* This global map holds the original accumulator objects that are created on the driver.
* It keeps weak references to these objects so that accumulators can be garbage-collected
* once the RDDs and user-code that reference them are cleaned up.
* TODO: Don't use a global map; these should be tied to a SparkContext at the very least.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this is one of the many blockers to multiple active SparkContexts and a source of leaked state in tests, so we should try to have this be scoped within SparkEnv in the future. Feel free to create a followup JIRA for this if you think it's a small task that someone should pick up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

*/
@GuardedBy("Accumulators")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this style of explicit documentation of locking / synchronization requirements. This is a good practice and we should do this more often.

val originals = mutable.Map[Long, WeakReference[Accumulable[_, _]]]()

private var lastId: Long = 0
private val nextId = new AtomicLong(0L)

def newId(): Long = synchronized {
lastId += 1
lastId
}
/**
* Return a globally unique ID for a new [[Accumulable]].
* Note: Once you copy the [[Accumulable]] the ID is no longer unique.
*/
def newId(): Long = nextId.getAndIncrement

/**
* Register an [[Accumulable]] created on the driver such that it can be used on the executors.
*
* All accumulators registered here can later be used as a container for accumulating partial
* values across multiple tasks. This is what [[org.apache.spark.scheduler.DAGScheduler]] does.
* Note: if an accumulator is registered here, it should also be registered with the active
* context cleaner for cleanup so as to avoid memory leaks.
*
* If an [[Accumulable]] with the same ID was already registered, this does nothing instead
* of overwriting it. This happens when we copy accumulators, e.g. when we reconstruct
* [[org.apache.spark.executor.TaskMetrics]] from accumulator updates.
*/
def register(a: Accumulable[_, _]): Unit = synchronized {
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
if (!originals.contains(a.id)) {
originals(a.id) = new WeakReference[Accumulable[_, _]](a)
}
}

def remove(accId: Long) {
synchronized {
originals.remove(accId)
}
/**
* Unregister the [[Accumulable]] with the given ID, if any.
*/
def remove(accId: Long): Unit = synchronized {
originals.remove(accId)
}

// Add values to the original accumulators with some given IDs
def add(values: Map[Long, Any]): Unit = synchronized {
for ((id, value) <- values) {
if (originals.contains(id)) {
// Since we are now storing weak references, we must check whether the underlying data
// is valid.
originals(id).get match {
case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
case None =>
throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
}
} else {
logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
/**
* Return the [[Accumulable]] registered with the given ID, if any.
*/
def get(id: Long): Option[Accumulable[_, _]] = synchronized {
originals.get(id).map { weakRef =>
// Since we are storing weak references, we must check whether the underlying data is valid.
weakRef.get.getOrElse {
throw new IllegalAccessError(s"Attempted to access garbage collected accumulator $id")
}
}
}

/**
* Clear all registered [[Accumulable]]s. For testing only.
*/
def clear(): Unit = synchronized {
originals.clear()
}

}


Expand Down Expand Up @@ -156,5 +185,23 @@ object AccumulatorParam {
def zero(initialValue: Float): Float = 0f
}

// TODO: Add AccumulatorParams for other types, e.g. lists and strings
// Note: when merging values, this param just adopts the newer value. This is used only
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little confusing and I maybe wonder whether it'd be worth having a class named something like LatestValueAccumulableParam[T] to handle this in a generic way, but for now this is okay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do that later

// internally for things that shouldn't really be accumulated across tasks, like input
// read method, which should be the same across all tasks in the same stage.
private[spark] object StringAccumulatorParam extends AccumulatorParam[String] {
def addInPlace(t1: String, t2: String): String = t2
def zero(initialValue: String): String = ""
}

// Note: this is expensive as it makes a copy of the list every time the caller adds an item.
// A better way to use this is to first accumulate the values yourself then them all at once.
private[spark] class ListAccumulatorParam[T] extends AccumulatorParam[Seq[T]] {
def addInPlace(t1: Seq[T], t2: Seq[T]): Seq[T] = t1 ++ t2
def zero(initialValue: Seq[T]): Seq[T] = Seq.empty[T]
}

// For the internal metric that records what blocks are updated in a particular task
private[spark] object UpdatedBlockStatusesAccumulatorParam
extends ListAccumulatorParam[(BlockId, BlockStatus)]

}
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,7 @@ case class Aggregator[K, V, C] (
Option(context).foreach { c =>
c.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
c.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
c.internalMetricsToAccumulators(
InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
c.taskMetrics().incPeakExecutionMemory(map.peakMemoryUsedBytes)
}
}
}
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
*/
private[spark] case class Heartbeat(
executorId: String,
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
accumUpdates: Array[(Long, Seq[AccumulableInfo])], // taskId -> accum updates
blockManagerId: BlockManagerId)

/**
Expand Down Expand Up @@ -119,14 +119,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock)
context.reply(true)

// Messages received from executors
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
case heartbeat @ Heartbeat(executorId, accumUpdates, blockManagerId) =>
if (scheduler != null) {
if (executorLastSeen.contains(executorId)) {
executorLastSeen(executorId) = clock.getTimeMillis()
eventLoopThread.submit(new Runnable {
override def run(): Unit = Utils.tryLogNonFatalError {
val unknownExecutor = !scheduler.executorHeartbeatReceived(
executorId, taskMetrics, blockManagerId)
executorId, accumUpdates, blockManagerId)
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
context.reply(response)
}
Expand Down
Loading