17
17
18
18
package org .apache .spark .streaming .ui
19
19
20
+ import java .text .SimpleDateFormat
21
+ import java .util .Date
20
22
import javax .servlet .http .HttpServletRequest
21
23
22
24
import scala .collection .mutable .ArrayBuffer
@@ -30,6 +32,8 @@ import org.apache.spark.util.Distribution
30
32
/**
31
33
* @param divId the `id` used in the html `div` tag
32
34
* @param data the data for the timeline graph
35
+ * @param minX the min value of X axis
36
+ * @param maxX the max value of X axis
33
37
* @param minY the min value of Y axis
34
38
* @param maxY the max value of Y axis
35
39
* @param unitY the unit of Y axis
@@ -41,6 +45,7 @@ private[ui] case class TimelineUIData(divId: String, data: Seq[(Long, _)], minX:
41
45
val jsForData = data.map { case (x, y) =>
42
46
s """ {"x": $x, "y": $y} """
43
47
}.mkString(" [" , " ," , " ]" )
48
+ jsCollector.addPreparedStatement(s " prepareTimeline( $minY, $maxY); " )
44
49
jsCollector.addStatement(
45
50
s " drawTimeline('# $divId', $jsForData, $minX, $maxX, $minY, $maxY, ' $unitY'); " )
46
51
@@ -67,17 +72,21 @@ private[ui] case class DistributionUIData(
67
72
}
68
73
}
69
74
70
- private [ui] case class LongStreamingUIData (data : Seq [(Long , Long )]) {
75
+ private [ui] case class MillisecondsStatUIData (data : Seq [(Long , Long )]) {
71
76
72
77
val avg : Option [Long ] = if (data.isEmpty) None else Some (data.map(_._2).sum / data.size)
73
78
79
+ val formattedAvg : String = StreamingPage .formatDurationOption(avg)
80
+
74
81
val max : Option [Long ] = if (data.isEmpty) None else Some (data.map(_._2).max)
75
82
}
76
83
77
- private [ui] case class DoubleStreamingUIData (data : Seq [(Long , Double )]) {
84
+ private [ui] case class DoubleStatUIData (data : Seq [(Long , Double )]) {
78
85
79
86
val avg : Option [Double ] = if (data.isEmpty) None else Some (data.map(_._2).sum / data.size)
80
87
88
+ val formattedAvg : String = avg.map(_.formatted(" %.2f" )).getOrElse(" -" )
89
+
81
90
val max : Option [Double ] = if (data.isEmpty) None else Some (data.map(_._2).max)
82
91
}
83
92
@@ -89,7 +98,6 @@ private[ui] class StreamingPage(parent: StreamingTab)
89
98
90
99
private val listener = parent.listener
91
100
private val startTime = System .currentTimeMillis()
92
- private val emptyCell = " -"
93
101
94
102
/** Render the page */
95
103
def render (request : HttpServletRequest ): Seq [Node ] = {
@@ -104,6 +112,9 @@ private[ui] class StreamingPage(parent: StreamingTab)
104
112
UIUtils .headerSparkPage(" Streaming Statistics" , content, parent, Some (5000 ))
105
113
}
106
114
115
+ /**
116
+ * Generate html that will load css/js files for StreamingPage
117
+ */
107
118
private def generateLoadResources (): Seq [Node ] = {
108
119
// scalastyle:off
109
120
<script src ={UIUtils .prependBaseUri(" /static/d3.min.js" )}></script >
@@ -130,24 +141,34 @@ private[ui] class StreamingPage(parent: StreamingTab)
130
141
</div >
131
142
}
132
143
144
+ private def generateTimeMap (times : Seq [Long ]): Seq [Node ] = {
145
+ val dateFormat = new SimpleDateFormat (" HH:mm:ss" )
146
+ val js = " var timeFormat = {};\n " + times.map { time =>
147
+ val formattedTime = dateFormat.format(new Date (time))
148
+ s " timeFormat[ $time] = ' $formattedTime'; "
149
+ }.mkString(" \n " )
150
+
151
+ <script >{Unparsed (js)}</script >
152
+ }
153
+
133
154
private def generateStatTable (): Seq [Node ] = {
134
155
val batchInfos = listener.retainedBatches
135
156
136
157
val batchTimes = batchInfos.map(_.batchTime.milliseconds)
137
158
val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min
138
159
val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max
139
160
140
- val eventRateForAllReceivers = DoubleStreamingUIData (batchInfos.map { batchInfo =>
161
+ val eventRateForAllReceivers = DoubleStatUIData (batchInfos.map { batchInfo =>
141
162
(batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration)
142
163
})
143
164
144
- val schedulingDelay = LongStreamingUIData (batchInfos.flatMap { batchInfo =>
165
+ val schedulingDelay = MillisecondsStatUIData (batchInfos.flatMap { batchInfo =>
145
166
batchInfo.schedulingDelay.map(batchInfo.batchTime.milliseconds -> _)
146
167
})
147
- val processingTime = LongStreamingUIData (batchInfos.flatMap { batchInfo =>
168
+ val processingTime = MillisecondsStatUIData (batchInfos.flatMap { batchInfo =>
148
169
batchInfo.processingDelay.map(batchInfo.batchTime.milliseconds -> _)
149
170
})
150
- val totalDelay = LongStreamingUIData (batchInfos.flatMap { batchInfo =>
171
+ val totalDelay = MillisecondsStatUIData (batchInfos.flatMap { batchInfo =>
151
172
batchInfo.totalDelay.map(batchInfo.batchTime.milliseconds -> _)
152
173
})
153
174
@@ -167,6 +188,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
167
188
val maxEventRate = eventRateForAllReceivers.max.map(_.ceil.toLong).getOrElse(0L )
168
189
val minEventRate = 0L
169
190
191
+ // JavaScript to show/hide the receiver sub table.
170
192
val triangleJs =
171
193
s """ $$ ('#inputs-table').toggle('collapsed');
172
194
|if ( $$ (this).html() == ' $BLACK_RIGHT_TRIANGLE_HTML')
@@ -258,7 +280,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
258
280
<span onclick ={Unparsed (triangleJs)}>{Unparsed (BLACK_RIGHT_TRIANGLE_HTML )}</span >
259
281
<strong >Input Rate </strong >
260
282
</div >
261
- <div >Avg : {eventRateForAllReceivers.avg.map(_.formatted( " %.2f " )).getOrElse(emptyCell) } events/ sec</div >
283
+ <div >Avg : {eventRateForAllReceivers.formattedAvg } events/ sec</div >
262
284
</td >
263
285
<td class =" timeline" >{timelineDataForEventRateOfAllReceivers}</td >
264
286
<td class =" distribution" >{distributionDataForEventRateOfAllReceivers}</td >
@@ -271,23 +293,23 @@ private[ui] class StreamingPage(parent: StreamingTab)
271
293
<tr >
272
294
<td style =" vertical-align: middle;" >
273
295
<div ><strong >Scheduling Delay </strong ></div >
274
- <div >Avg : {formatDurationOption( schedulingDelay.avg) }</div >
296
+ <div >Avg : {schedulingDelay.formattedAvg }</div >
275
297
</td >
276
298
<td class =" timeline" >{timelineDataForSchedulingDelay}</td >
277
299
<td class =" distribution" >{distributionDataForSchedulingDelay}</td >
278
300
</tr >
279
301
<tr >
280
302
<td style =" vertical-align: middle;" >
281
303
<div ><strong >Processing Time </strong ></div >
282
- <div >Avg : {formatDurationOption( processingTime.avg) }</div >
304
+ <div >Avg : {processingTime.formattedAvg }</div >
283
305
</td >
284
306
<td class =" timeline" >{timelineDataForProcessingTime}</td >
285
307
<td class =" distribution" >{distributionDataForProcessingTime}</td >
286
308
</tr >
287
309
<tr >
288
310
<td style =" vertical-align: middle;" >
289
311
<div ><strong >Total Delay </strong ></div >
290
- <div >Avg : {formatDurationOption( totalDelay.avg) }</div >
312
+ <div >Avg : {totalDelay.formattedAvg }</div >
291
313
</td >
292
314
<td class =" timeline" >{timelineDataForTotalDelay}</td >
293
315
<td class =" distribution" >{distributionDataForTotalDelay}</td >
@@ -296,7 +318,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
296
318
</table >
297
319
// scalastyle:on
298
320
299
- table ++ jsCollector.toHtml
321
+ generateTimeMap(batchTimes) ++ table ++ jsCollector.toHtml
300
322
}
301
323
302
324
private def generateInputReceiversTable (
@@ -366,7 +388,6 @@ private[ui] class StreamingPage(parent: StreamingTab)
366
388
maxY,
367
389
" events/sec" ).toHtml(jsCollector)
368
390
369
- // scalastyle:off
370
391
<tr >
371
392
<td rowspan =" 2" style =" vertical-align: middle;" >
372
393
<div >
@@ -385,7 +406,6 @@ private[ui] class StreamingPage(parent: StreamingTab)
385
406
</td >
386
407
<td class =" distribution" >{distributionForEventsRate}</td >
387
408
</tr >
388
- // scalastyle:on
389
409
}
390
410
391
411
/**
@@ -417,13 +437,33 @@ private[ui] class StreamingPage(parent: StreamingTab)
417
437
}
418
438
}
419
439
420
- private object StreamingPage {
440
+ private [ui] object StreamingPage {
421
441
val BLACK_RIGHT_TRIANGLE_HTML = " ▶"
422
442
val BLACK_DOWN_TRIANGLE_HTML = " ▼"
443
+
444
+ val emptyCell = " -"
445
+
446
+ /**
447
+ * Returns a human-readable string representing a duration such as "5 second 35 ms"
448
+ */
449
+ def formatDurationOption (msOption : Option [Long ]): String = {
450
+ msOption.map(formatDurationVerbose).getOrElse(emptyCell)
451
+ }
423
452
}
424
453
454
+ /**
455
+ * A helper class that allows the user to add JavaScript statements which will be executed when the
456
+ * DOM has finished loading.
457
+ */
425
458
private [ui] class JsCollector {
459
+ /**
460
+ * JavaScript statements that will execute before `statements`
461
+ */
426
462
private val preparedStatements = ArrayBuffer [String ]()
463
+
464
+ /**
465
+ * JavaScript statements that will execute after `preparedStatements`
466
+ */
427
467
private val statements = ArrayBuffer [String ]()
428
468
429
469
def addPreparedStatement (js : String ): Unit = {
@@ -434,6 +474,9 @@ private[ui] class JsCollector {
434
474
statements += js
435
475
}
436
476
477
+ /**
478
+ * Generate a html snippet that will execute all scripts when the DOM has finished loading.
479
+ */
437
480
def toHtml : Seq [Node ] = {
438
481
val js =
439
482
s """
0 commit comments