Skip to content

Commit 7a63abc

Browse files
committed
Adding Json serialization and responding to Reynold's feedback
1 parent ad85076 commit 7a63abc

File tree

9 files changed

+58
-22
lines changed

9 files changed

+58
-22
lines changed

core/src/main/scala/org/apache/spark/Accumulators.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,8 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa
226226
* @param param helper object defining how to add elements of type `T`
227227
* @tparam T result type
228228
*/
229-
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], _name: String, _display: Boolean)
230-
extends Accumulable[T,T](initialValue, param) {
229+
class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], _name: String,
230+
_display: Boolean) extends Accumulable[T,T](initialValue, param) {
231231
override def name = if (_name.eq(null)) s"accumulator_$id" else _name
232232
override def display = _display
233233
def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, null, true)

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ class HadoopRDD[K, V](
119119
minPartitions)
120120
}
121121

122-
val hadoopInputBytes = sc.accumulator(0L, s"rdd-$id.input.bytes.hadoop")(SparkContext.LongAccumulatorParam)
122+
private val accName = s"rdd-$id.input.bytes.hadoop"
123+
val hadoopInputBytes = sc.accumulator(0L, accName)(SparkContext.LongAccumulatorParam)
123124

124125
protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)
125126

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -824,8 +824,8 @@ class DAGScheduler(
824824
if (partialValue != acc.zero) {
825825
val stringPartialValue = s"${partialValue}"
826826
val stringValue = s"${acc.value}"
827-
stageToInfos(stage).accumulatorValues(name) = stringValue
828-
event.taskInfo.accumValues += ((name, stringPartialValue))
827+
stageToInfos(stage).accumulatedValues(name) = stringValue
828+
event.taskInfo.accumulableValues += ((name, stringPartialValue))
829829
}
830830
}
831831
}

core/src/main/scala/org/apache/spark/scheduler/StageInfo.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ class StageInfo(
4141
/** If the stage failed, the reason why. */
4242
var failureReason: Option[String] = None
4343
/** Terminal values of accumulables updated during this stage. */
44-
val accumulatorValues: Map[String, String] = HashMap[String, String]()
44+
val accumulatedValues: Map[String, String] = HashMap[String, String]()
4545

4646
def stageFailed(reason: String) {
4747
failureReason = Some(reason)

core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,10 @@ class TaskInfo(
4444
var gettingResultTime: Long = 0
4545

4646
/**
47-
* Terminal values of accumulables updated during this task.
47+
* Intermediate updates to accumulables during this task. Note that it is valid for the same
48+
* accumulable to be updated multiple times in a single task.
4849
*/
49-
val accumValues = ListBuffer[(String, String)]()
50+
val accumulableValues = ListBuffer[(String, String)]()
5051

5152
/**
5253
* The time when the task has completed successfully (including the time to remotely fetch

core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
7676
poolToActiveStages(stageIdToPool(stageId)).remove(stageId)
7777

7878
val accumulables = stageIdToAccumulables.getOrElseUpdate(stageId, HashMap[String, String]())
79-
stageCompleted.stageInfo.accumulatorValues.foreach { case (name, value) =>
79+
stageCompleted.stageInfo.accumulatedValues.foreach { case (name, value) =>
8080
accumulables(name) = value
8181
}
8282

@@ -156,7 +156,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener {
156156

157157
if (info != null) {
158158
val accumulables = stageIdToAccumulables.getOrElseUpdate(sid, HashMap[String, String]())
159-
info.accumValues.map { case (name, value) =>
159+
info.accumulableValues.map { case (name, value) =>
160160
accumulables(name) = value
161161
}
162162

core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
105105
// scalastyle:on
106106
val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value")
107107
def accumulableRow(acc: (String, String)) = <tr><td>{acc._1}</td><td>{acc._2}</td></tr>
108-
val accumulableTable = UIUtils.listingTable(accumulableHeaders, accumulableRow, accumulables.toSeq)
108+
val accumulableTable = UIUtils.listingTable(accumulableHeaders, accumulableRow,
109+
accumulables.toSeq)
109110

110111
val taskHeaders: Seq[String] =
111112
Seq(
@@ -289,7 +290,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
289290
<td sorttable_customkey={gcTime.toString}>
290291
{if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
291292
</td>
292-
<td>{Unparsed(info.accumValues.map{ case (k, v) => s"$k += $v" }.mkString("<br/>"))}</td>
293+
<td>{Unparsed(info.accumulableValues.map{ case (k, v) => s"$k: $v" }.mkString("<br/>"))}</td>
293294
<!--
294295
TODO: Add this back after we add support to hide certain columns.
295296
<td sorttable_customkey={serializationTime.toString}>

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,14 @@ private[spark] object JsonProtocol {
190190
("Details" -> stageInfo.details) ~
191191
("Submission Time" -> submissionTime) ~
192192
("Completion Time" -> completionTime) ~
193-
("Failure Reason" -> failureReason)
193+
("Failure Reason" -> failureReason) ~
194+
("Accumulated Values" -> mapToJson(stageInfo.accumulatedValues))
194195
}
195196

196197
def taskInfoToJson(taskInfo: TaskInfo): JValue = {
198+
val accumUpdateMap = taskInfo.accumulableValues.map { case (k, v) =>
199+
mapToJson(Map(k -> v))
200+
}.toList
197201
("Task ID" -> taskInfo.taskId) ~
198202
("Index" -> taskInfo.index) ~
199203
("Attempt" -> taskInfo.attempt) ~
@@ -204,7 +208,8 @@ private[spark] object JsonProtocol {
204208
("Speculative" -> taskInfo.speculative) ~
205209
("Getting Result Time" -> taskInfo.gettingResultTime) ~
206210
("Finish Time" -> taskInfo.finishTime) ~
207-
("Failed" -> taskInfo.failed)
211+
("Failed" -> taskInfo.failed) ~
212+
("Accumulable Updates" -> JArray(accumUpdateMap))
208213
}
209214

210215
def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
@@ -485,11 +490,17 @@ private[spark] object JsonProtocol {
485490
val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
486491
val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
487492
val failureReason = Utils.jsonOption(json \ "Failure Reason").map(_.extract[String])
493+
val accumulatedValues = (json \ "Accumulated Values").extractOpt[JObject].map(mapFromJson(_))
488494

489495
val stageInfo = new StageInfo(stageId, stageName, numTasks, rddInfos, details)
490496
stageInfo.submissionTime = submissionTime
491497
stageInfo.completionTime = completionTime
492498
stageInfo.failureReason = failureReason
499+
accumulatedValues.foreach { values =>
500+
for ((k, v) <- values) {
501+
stageInfo.accumulatedValues(k) = v
502+
}
503+
}
493504
stageInfo
494505
}
495506

@@ -505,12 +516,19 @@ private[spark] object JsonProtocol {
505516
val gettingResultTime = (json \ "Getting Result Time").extract[Long]
506517
val finishTime = (json \ "Finish Time").extract[Long]
507518
val failed = (json \ "Failed").extract[Boolean]
519+
val accumulableUpdates = (json \ "Accumulable Updates").extractOpt[Seq[JValue]].map(
520+
updates => updates.map(mapFromJson(_)))
508521

509522
val taskInfo =
510523
new TaskInfo(taskId, index, attempt, launchTime, executorId, host, taskLocality, speculative)
511524
taskInfo.gettingResultTime = gettingResultTime
512525
taskInfo.finishTime = finishTime
513526
taskInfo.failed = failed
527+
accumulableUpdates.foreach { maps =>
528+
for (m <- maps) {
529+
taskInfo.accumulableValues += m.head
530+
}
531+
}
514532
taskInfo
515533
}
516534

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,7 @@ class JsonProtocolSuite extends FunSuite {
261261
(0 until info1.rddInfos.size).foreach { i =>
262262
assertEquals(info1.rddInfos(i), info2.rddInfos(i))
263263
}
264+
assert(info1.accumulatedValues === info2.accumulatedValues)
264265
assert(info1.details === info2.details)
265266
}
266267

@@ -293,6 +294,7 @@ class JsonProtocolSuite extends FunSuite {
293294
assert(info1.gettingResultTime === info2.gettingResultTime)
294295
assert(info1.finishTime === info2.finishTime)
295296
assert(info1.failed === info2.failed)
297+
assert(info1.accumulableValues === info2.accumulableValues)
296298
}
297299

298300
private def assertEquals(metrics1: TaskMetrics, metrics2: TaskMetrics) {
@@ -477,11 +479,19 @@ class JsonProtocolSuite extends FunSuite {
477479

478480
private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
479481
val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, d + i, e + i) }
480-
new StageInfo(a, "greetings", b, rddInfos, "details")
482+
val stageInfo = new StageInfo(a, "greetings", b, rddInfos, "details")
483+
stageInfo.accumulatedValues("acc1") = "val1"
484+
stageInfo.accumulatedValues("acc2") = "val2"
485+
stageInfo
481486
}
482487

483488
private def makeTaskInfo(a: Long, b: Int, c: Int, d: Long, speculative: Boolean) = {
484-
new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL, speculative)
489+
val taskInfo = new TaskInfo(a, b, c, d, "executor", "your kind sir", TaskLocality.NODE_LOCAL,
490+
speculative)
491+
taskInfo.accumulableValues += (("acc1", "val1"))
492+
taskInfo.accumulableValues += (("acc1", "val1"))
493+
taskInfo.accumulableValues += (("acc2", "val2"))
494+
taskInfo
485495
}
486496

487497
/**
@@ -538,7 +548,8 @@ class JsonProtocolSuite extends FunSuite {
538548
private val stageSubmittedJsonString =
539549
"""
540550
{"Event":"SparkListenerStageSubmitted","Stage Info":{"Stage ID":100,"Stage Name":
541-
"greetings","Number of Tasks":200,"RDD Info":[],"Details":"details"},"Properties":
551+
"greetings","Number of Tasks":200,"RDD Info":[],"Details":"details",
552+
"AccumulatedValues":{"acc2":"val2","acc1":"val1"}},"Properties":
542553
{"France":"Paris","Germany":"Berlin","Russia":"Moscow","Ukraine":"Kiev"}}
543554
"""
544555

@@ -548,23 +559,25 @@ class JsonProtocolSuite extends FunSuite {
548559
"greetings","Number of Tasks":201,"RDD Info":[{"RDD ID":101,"Name":"mayor","Storage
549560
Level":{"Use Disk":true,"Use Memory":true,"Use Tachyon":false,"Deserialized":true,
550561
"Replication":1},"Number of Partitions":201,"Number of Cached Partitions":301,
551-
"Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details"}}
562+
"Memory Size":401,"Tachyon Size":0,"Disk Size":501}],"Details":"details",
563+
"AccumulatedValues":{"acc2":"val2","acc1":"val1"}}}
552564
"""
553565

554566
private val taskStartJsonString =
555567
"""
556568
|{"Event":"SparkListenerTaskStart","Stage ID":111,"Task Info":{"Task ID":222,
557569
|"Index":333,"Attempt":1,"Launch Time":444,"Executor ID":"executor","Host":"your kind sir",
558570
|"Locality":"NODE_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,
559-
|"Failed":false}}
571+
|"Failed":false,"AccumulableUpdates":[{"acc1":"val1"},{"acc1":"val1"},{"acc2":"val2"}]}}
560572
""".stripMargin
561573

562574
private val taskGettingResultJsonString =
563575
"""
564576
|{"Event":"SparkListenerTaskGettingResult","Task Info":
565577
| {"Task ID":1000,"Index":2000,"Attempt":5,"Launch Time":3000,"Executor ID":"executor",
566578
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":true,"Getting Result Time":0,
567-
| "Finish Time":0,"Failed":false
579+
| "Finish Time":0,"Failed":false,
580+
| "AccumulableUpdates":[{"acc1":"val1"},{"acc1":"val1"},{"acc2":"val2"}]
568581
| }
569582
|}
570583
""".stripMargin
@@ -576,7 +589,8 @@ class JsonProtocolSuite extends FunSuite {
576589
|"Task Info":{
577590
| "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
578591
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
579-
| "Getting Result Time":0,"Finish Time":0,"Failed":false
592+
| "Getting Result Time":0,"Finish Time":0,"Failed":false,
593+
| "AccumulableUpdates":[{"acc1":"val1"},{"acc1":"val1"},{"acc2":"val2"}]
580594
|},
581595
|"Task Metrics":{
582596
| "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,
@@ -616,7 +630,8 @@ class JsonProtocolSuite extends FunSuite {
616630
|"Task Info":{
617631
| "Task ID":123,"Index":234,"Attempt":67,"Launch Time":345,"Executor ID":"executor",
618632
| "Host":"your kind sir","Locality":"NODE_LOCAL","Speculative":false,
619-
| "Getting Result Time":0,"Finish Time":0,"Failed":false
633+
| "Getting Result Time":0,"Finish Time":0,"Failed":false,
634+
| "AccumulableUpdates":[{"acc1":"val1"},{"acc1":"val1"},{"acc2":"val2"}]
620635
|},
621636
|"Task Metrics":{
622637
| "Host Name":"localhost","Executor Deserialize Time":300,"Executor Run Time":400,

0 commit comments

Comments
 (0)