Skip to content

Commit 511ec85

Browse files
committed
Merge remote-tracking branch 'upstream/master' into dt-timing
2 parents a95bc22 + 41e0a21 commit 511ec85

File tree

21 files changed

+375
-59
lines changed

21 files changed

+375
-59
lines changed

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,21 @@ import org.apache.spark.serializer.JavaSerializer
3636
*
3737
* @param initialValue initial value of accumulator
3838
* @param param helper object defining how to add elements of type `R` and `T`
39+
* @param name human-readable name for use in Spark's web UI
3940
* @tparam R the full accumulated data (result type)
4041
* @tparam T partial data that can be added in
4142
*/
4243
class Accumulable[R, T] (
4344
@transient initialValue: R,
44-
param: AccumulableParam[R, T])
45+
param: AccumulableParam[R, T],
46+
val name: Option[String])
4547
extends Serializable {
4648

47-
val id = Accumulators.newId
49+
def this(@transient initialValue: R, param: AccumulableParam[R, T]) =
50+
this(initialValue, param, None)
51+
52+
val id: Long = Accumulators.newId
53+
4854
@transient private var value_ = initialValue // Current value on master
4955
val zero = param.zero(initialValue) // Zero value to be passed to workers
5056
private var deserialized = false
@@ -219,8 +225,10 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
219225
* @param param helper object defining how to add elements of type `T`
220226
* @tparam T result type
221227
*/
222-
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T])
223-
extends Accumulable[T,T](initialValue, param)
228+
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String])
229+
extends Accumulable[T,T](initialValue, param, name) {
230+
def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None)
231+
}
224232

225233
/**
226234
* A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add
@@ -281,4 +289,7 @@ private object Accumulators {
281289
}
282290
}
283291
}
292+
293+
def stringifyPartialValue(partialValue: Any) = "%s".format(partialValue)
294+
def stringifyValue(value: Any) = "%s".format(value)
284295
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,15 @@ class SparkContext(config: SparkConf) extends Logging {
760760
def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
761761
new Accumulator(initialValue, param)
762762

763+
/**
764+
* Create an [[org.apache.spark.Accumulator]] variable of a given type, with a name for display
765+
* in the Spark UI. Tasks can "add" values to the accumulator using the `+=` method. Only the
766+
* driver can access the accumulator's `value`.
767+
*/
768+
def accumulator[T](initialValue: T, name: String)(implicit param: AccumulatorParam[T]) = {
769+
new Accumulator(initialValue, param, Some(name))
770+
}
771+
763772
/**
764773
* Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values
765774
* with `+=`. Only the driver can access the accumuable's `value`.
@@ -769,6 +778,16 @@ class SparkContext(config: SparkConf) extends Logging {
769778
def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) =
770779
new Accumulable(initialValue, param)
771780

781+
/**
782+
* Create an [[org.apache.spark.Accumulable]] shared variable, with a name for display in the
783+
* Spark UI. Tasks can add values to the accumuable using the `+=` operator. Only the driver can
784+
* access the accumuable's `value`.
785+
* @tparam T accumulator type
786+
* @tparam R type that can be added to the accumulator
787+
*/
788+
def accumulable[T, R](initialValue: T, name: String)(implicit param: AccumulableParam[T, R]) =
789+
new Accumulable(initialValue, param, Some(name))
790+
772791
/**
773792
* Create an accumulator from a "mutable collection" type.
774793
*

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,40 +429,99 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
429429
def intAccumulator(initialValue: Int): Accumulator[java.lang.Integer] =
430430
sc.accumulator(initialValue)(IntAccumulatorParam).asInstanceOf[Accumulator[java.lang.Integer]]
431431

432+
/**
433+
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
434+
* to using the `add` method. Only the master can access the accumulator's `value`.
435+
*
436+
* This version supports naming the accumulator for display in Spark's web UI.
437+
*/
438+
def intAccumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
439+
sc.accumulator(initialValue, name)(IntAccumulatorParam)
440+
.asInstanceOf[Accumulator[java.lang.Integer]]
441+
432442
/**
433443
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
434444
* to using the `add` method. Only the master can access the accumulator's `value`.
435445
*/
436446
def doubleAccumulator(initialValue: Double): Accumulator[java.lang.Double] =
437447
sc.accumulator(initialValue)(DoubleAccumulatorParam).asInstanceOf[Accumulator[java.lang.Double]]
438448

449+
/**
450+
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
451+
* to using the `add` method. Only the master can access the accumulator's `value`.
452+
*
453+
* This version supports naming the accumulator for display in Spark's web UI.
454+
*/
455+
def doubleAccumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
456+
sc.accumulator(initialValue, name)(DoubleAccumulatorParam)
457+
.asInstanceOf[Accumulator[java.lang.Double]]
458+
439459
/**
440460
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
441461
* to using the `add` method. Only the master can access the accumulator's `value`.
442462
*/
443463
def accumulator(initialValue: Int): Accumulator[java.lang.Integer] = intAccumulator(initialValue)
444464

465+
/**
466+
* Create an [[org.apache.spark.Accumulator]] integer variable, which tasks can "add" values
467+
* to using the `add` method. Only the master can access the accumulator's `value`.
468+
*
469+
* This version supports naming the accumulator for display in Spark's web UI.
470+
*/
471+
def accumulator(initialValue: Int, name: String): Accumulator[java.lang.Integer] =
472+
intAccumulator(initialValue, name)
473+
445474
/**
446475
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
447476
* to using the `add` method. Only the master can access the accumulator's `value`.
448477
*/
449478
def accumulator(initialValue: Double): Accumulator[java.lang.Double] =
450479
doubleAccumulator(initialValue)
451480

481+
482+
/**
483+
* Create an [[org.apache.spark.Accumulator]] double variable, which tasks can "add" values
484+
* to using the `add` method. Only the master can access the accumulator's `value`.
485+
*
486+
* This version supports naming the accumulator for display in Spark's web UI.
487+
*/
488+
def accumulator(initialValue: Double, name: String): Accumulator[java.lang.Double] =
489+
doubleAccumulator(initialValue, name)
490+
452491
/**
453492
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
454493
* values to using the `add` method. Only the master can access the accumulator's `value`.
455494
*/
456495
def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] =
457496
sc.accumulator(initialValue)(accumulatorParam)
458497

498+
/**
499+
* Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add"
500+
* values to using the `add` method. Only the master can access the accumulator's `value`.
501+
*
502+
* This version supports naming the accumulator for display in Spark's web UI.
503+
*/
504+
def accumulator[T](initialValue: T, name: String, accumulatorParam: AccumulatorParam[T])
505+
: Accumulator[T] =
506+
sc.accumulator(initialValue, name)(accumulatorParam)
507+
459508
/**
460509
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
461510
* can "add" values with `add`. Only the master can access the accumuable's `value`.
462511
*/
463512
def accumulable[T, R](initialValue: T, param: AccumulableParam[T, R]): Accumulable[T, R] =
464513
sc.accumulable(initialValue)(param)
465514

515+
/**
516+
* Create an [[org.apache.spark.Accumulable]] shared variable of the given type, to which tasks
517+
* can "add" values with `add`. Only the master can access the accumuable's `value`.
518+
*
519+
* This version supports naming the accumulator for display in Spark's web UI.
520+
*/
521+
def accumulable[T, R](initialValue: T, name: String, param: AccumulableParam[T, R])
522+
: Accumulable[T, R] =
523+
sc.accumulable(initialValue, name)(param)
524+
466525
/**
467526
* Broadcast a read-only variable to the cluster, returning a
468527
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler
19+
20+
import org.apache.spark.annotation.DeveloperApi
21+
22+
/**
23+
* :: DeveloperApi ::
24+
* Information about an [[org.apache.spark.Accumulable]] modified during a task or stage.
25+
*/
26+
@DeveloperApi
27+
class AccumulableInfo (
28+
val id: Long,
29+
val name: String,
30+
val update: Option[String], // represents a partial update within a task
31+
val value: String) {
32+
33+
override def equals(other: Any): Boolean = other match {
34+
case acc: AccumulableInfo =>
35+
this.id == acc.id && this.name == acc.name &&
36+
this.update == acc.update && this.value == acc.value
37+
case _ => false
38+
}
39+
}
40+
41+
object AccumulableInfo {
42+
def apply(id: Long, name: String, update: Option[String], value: String) =
43+
new AccumulableInfo(id, name, update, value)
44+
45+
def apply(id: Long, name: String, value: String) = new AccumulableInfo(id, name, None, value)
46+
}

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -883,8 +883,14 @@ class DAGScheduler(
883883
val task = event.task
884884
val stageId = task.stageId
885885
val taskType = Utils.getFormattedClassName(task)
886-
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
887-
event.taskMetrics))
886+
887+
// The success case is dealt with separately below, since we need to compute accumulator
888+
// updates before posting.
889+
if (event.reason != Success) {
890+
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
891+
event.taskMetrics))
892+
}
893+
888894
if (!stageIdToStage.contains(task.stageId)) {
889895
// Skip all the actions if the stage has been cancelled.
890896
return
@@ -906,12 +912,26 @@ class DAGScheduler(
906912
if (event.accumUpdates != null) {
907913
try {
908914
Accumulators.add(event.accumUpdates)
915+
event.accumUpdates.foreach { case (id, partialValue) =>
916+
val acc = Accumulators.originals(id).asInstanceOf[Accumulable[Any, Any]]
917+
// To avoid UI cruft, ignore cases where value wasn't updated
918+
if (acc.name.isDefined && partialValue != acc.zero) {
919+
val name = acc.name.get
920+
val stringPartialValue = Accumulators.stringifyPartialValue(partialValue)
921+
val stringValue = Accumulators.stringifyValue(acc.value)
922+
stage.info.accumulables(id) = AccumulableInfo(id, name, stringValue)
923+
event.taskInfo.accumulables +=
924+
AccumulableInfo(id, name, Some(stringPartialValue), stringValue)
925+
}
926+
}
909927
} catch {
910928
// If we see an exception during accumulator update, just log the error and move on.
911929
case e: Exception =>
912930
logError(s"Failed to update accumulators for $task", e)
913931
}
914932
}
933+
listenerBus.post(SparkListenerTaskEnd(stageId, taskType, event.reason, event.taskInfo,
934+
event.taskMetrics))
915935
stage.pendingTasks -= task
916936
task match {
917937
case rt: ResultTask[_, _] =>

core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import scala.collection.mutable.HashMap
21+
2022
import org.apache.spark.annotation.DeveloperApi
2123
import org.apache.spark.storage.RDDInfo
2224

@@ -37,6 +39,8 @@ class StageInfo(
3739
var completionTime: Option[Long] = None
3840
/** If the stage failed, the reason why. */
3941
var failureReason: Option[String] = None
42+
/** Terminal values of accumulables updated during this stage. */
43+
val accumulables = HashMap[Long, AccumulableInfo]()
4044

4145
def stageFailed(reason: String) {
4246
failureReason = Some(reason)

core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.scheduler
1919

20+
import scala.collection.mutable.ListBuffer
21+
2022
import org.apache.spark.annotation.DeveloperApi
2123

2224
/**
@@ -41,6 +43,13 @@ class TaskInfo(
4143
*/
4244
var gettingResultTime: Long = 0
4345

46+
/**
47+
* Intermediate updates to accumulables during this task. Note that it is valid for the same
48+
* accumulable to be updated multiple times in a single task or for two accumulables with the
49+
* same name but different IDs to exist in a task.
50+
*/
51+
val accumulables = ListBuffer[AccumulableInfo]()
52+
4453
/**
4554
* The time when the task has completed successfully (including the time to remotely fetch
4655
* results, if necessary).

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.ui.jobs
1919

20-
import scala.collection.mutable.{HashMap, ListBuffer}
20+
import scala.collection.mutable.{HashMap, ListBuffer, Map}
2121

2222
import org.apache.spark._
2323
import org.apache.spark.annotation.DeveloperApi
@@ -65,6 +65,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
6565
new StageUIData
6666
})
6767

68+
for ((id, info) <- stageCompleted.stageInfo.accumulables) {
69+
stageData.accumulables(id) = info
70+
}
71+
6872
poolToActiveStages.get(stageData.schedulingPool).foreach(_.remove(stageId))
6973
activeStages.remove(stageId)
7074
if (stage.failureReason.isEmpty) {
@@ -130,6 +134,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
130134
new StageUIData
131135
})
132136

137+
for (accumulableInfo <- info.accumulables) {
138+
stageData.accumulables(accumulableInfo.id) = accumulableInfo
139+
}
140+
133141
val execSummaryMap = stageData.executorSummary
134142
val execSummary = execSummaryMap.getOrElseUpdate(info.executorId, new ExecutorSummary)
135143

0 commit comments

Comments
 (0)