@@ -19,6 +19,7 @@ package org.apache.spark.streaming.ui
19
19
20
20
import java .text .SimpleDateFormat
21
21
import java .util .Date
22
+ import java .util .concurrent .TimeUnit
22
23
import javax .servlet .http .HttpServletRequest
23
24
24
25
import scala .collection .mutable .ArrayBuffer
@@ -38,8 +39,8 @@ import org.apache.spark.util.Distribution
38
39
* @param maxY the max value of Y axis
39
40
* @param unitY the unit of Y axis
40
41
*/
41
- private [ui] case class TimelineUIData (divId : String , data : Seq [(Long , _)], minX : Long , maxX : Long ,
42
- minY : Long , maxY : Long , unitY : String ) {
42
+ private [ui] class TimelineUIData (divId : String , data : Seq [(Long , _)], minX : Long , maxX : Long ,
43
+ minY : Double , maxY : Double , unitY : String ) {
43
44
44
45
def toHtml (jsCollector : JsCollector ): Seq [Node ] = {
45
46
val jsForData = data.map { case (x, y) =>
@@ -60,8 +61,8 @@ private[ui] case class TimelineUIData(divId: String, data: Seq[(Long, _)], minX:
60
61
* @param maxY the max value of Y axis
61
62
* @param unitY the unit of Y axis
62
63
*/
63
- private [ui] case class DistributionUIData (
64
- divId : String , data : Seq [_], minY : Long , maxY : Long , unitY : String ) {
64
+ private [ui] class DistributionUIData (
65
+ divId : String , data : Seq [_], minY : Double , maxY : Double , unitY : String ) {
65
66
66
67
def toHtml (jsCollector : JsCollector ): Seq [Node ] = {
67
68
val jsForData = data.mkString(" [" , " ," , " ]" )
@@ -72,7 +73,11 @@ private[ui] case class DistributionUIData(
72
73
}
73
74
}
74
75
75
- private [ui] case class MillisecondsStatUIData (data : Seq [(Long , Long )]) {
76
+ private [ui] class MillisecondsStatUIData (data : Seq [(Long , Long )]) {
77
+
78
+ def timelineData (unit : TimeUnit ) = data.map(x => x._1 -> StreamingPage .convertToTimeUnit(x._2, unit))
79
+
80
+ def distributionData (unit : TimeUnit ) = data.map(x => StreamingPage .convertToTimeUnit(x._2, unit))
76
81
77
82
val avg : Option [Long ] = if (data.isEmpty) None else Some (data.map(_._2).sum / data.size)
78
83
@@ -81,7 +86,7 @@ private[ui] case class MillisecondsStatUIData(data: Seq[(Long, Long)]) {
81
86
val max : Option [Long ] = if (data.isEmpty) None else Some (data.map(_._2).max)
82
87
}
83
88
84
- private [ui] case class DoubleStatUIData (data : Seq [(Long , Double )]) {
89
+ private [ui] class DoubleStatUIData (val data : Seq [(Long , Double )]) {
85
90
86
91
val avg : Option [Double ] = if (data.isEmpty) None else Some (data.map(_._2).sum / data.size)
87
92
@@ -158,30 +163,31 @@ private[ui] class StreamingPage(parent: StreamingTab)
158
163
val minBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.min
159
164
val maxBatchTime = if (batchTimes.isEmpty) startTime else batchTimes.max
160
165
161
- val eventRateForAllReceivers = DoubleStatUIData (batchInfos.map { batchInfo =>
166
+ val eventRateForAllReceivers = new DoubleStatUIData (batchInfos.map { batchInfo =>
162
167
(batchInfo.batchTime.milliseconds, batchInfo.numRecords * 1000.0 / listener.batchDuration)
163
168
})
164
169
165
- val schedulingDelay = MillisecondsStatUIData (batchInfos.flatMap { batchInfo =>
170
+ val schedulingDelay = new MillisecondsStatUIData (batchInfos.flatMap { batchInfo =>
166
171
batchInfo.schedulingDelay.map(batchInfo.batchTime.milliseconds -> _)
167
172
})
168
- val processingTime = MillisecondsStatUIData (batchInfos.flatMap { batchInfo =>
173
+ val processingTime = new MillisecondsStatUIData (batchInfos.flatMap { batchInfo =>
169
174
batchInfo.processingDelay.map(batchInfo.batchTime.milliseconds -> _)
170
175
})
171
- val totalDelay = MillisecondsStatUIData (batchInfos.flatMap { batchInfo =>
176
+ val totalDelay = new MillisecondsStatUIData (batchInfos.flatMap { batchInfo =>
172
177
batchInfo.totalDelay.map(batchInfo.batchTime.milliseconds -> _)
173
178
})
174
179
175
180
val jsCollector = new JsCollector
176
181
177
182
// Use the max value of "schedulingDelay", "processingTime", and "totalDelay" to make the
178
183
// Y axis ranges same.
179
- val maxTime =
184
+ val _maxTime =
180
185
(for (m1 <- schedulingDelay.max; m2 <- processingTime.max; m3 <- totalDelay.max) yield
181
186
m1 max m2 max m3).getOrElse(0L )
182
- List (1 , 2 , 3 ).sum
183
187
// Should start at 0
184
188
val minTime = 0L
189
+ val (maxTime, unit) = UIUtils .normalizeDuration(_maxTime)
190
+ val formattedUnit = UIUtils .shortTimeUnitString(unit)
185
191
186
192
// Use the max input rate for all receivers' graphs to make the Y axis ranges same.
187
193
// If it's not an integral number, just use its ceil integral number.
@@ -196,7 +202,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
196
202
|else $$ (this).html(' $BLACK_RIGHT_TRIANGLE_HTML'); """ .stripMargin.replaceAll(" \\ n" , " " )
197
203
198
204
val timelineDataForEventRateOfAllReceivers =
199
- TimelineUIData (
205
+ new TimelineUIData (
200
206
" all-receiver-events-timeline" ,
201
207
eventRateForAllReceivers.data,
202
208
minBatchTime,
@@ -206,81 +212,83 @@ private[ui] class StreamingPage(parent: StreamingTab)
206
212
" events/sec" ).toHtml(jsCollector)
207
213
208
214
val distributionDataForEventRateOfAllReceivers =
209
- DistributionUIData (
215
+ new DistributionUIData (
210
216
" all-receiver-events-distribution" ,
211
217
eventRateForAllReceivers.data.map(_._2),
212
218
minEventRate,
213
219
maxEventRate,
214
220
" events/sec" ).toHtml(jsCollector)
215
221
216
222
val timelineDataForSchedulingDelay =
217
- TimelineUIData (
223
+ new TimelineUIData (
218
224
" scheduling-delay-timeline" ,
219
- schedulingDelay.data ,
225
+ schedulingDelay.timelineData(unit) ,
220
226
minBatchTime,
221
227
maxBatchTime,
222
228
minTime,
223
229
maxTime,
224
- " ms " ).toHtml(jsCollector)
230
+ formattedUnit ).toHtml(jsCollector)
225
231
226
232
val distributionDataForSchedulingDelay =
227
- DistributionUIData (
233
+ new DistributionUIData (
228
234
" scheduling-delay-distribution" ,
229
- schedulingDelay.data.map(_._2 ),
235
+ schedulingDelay.distributionData(unit ),
230
236
minTime,
231
237
maxTime,
232
- " ms " ).toHtml(jsCollector)
238
+ formattedUnit ).toHtml(jsCollector)
233
239
234
240
val timelineDataForProcessingTime =
235
- TimelineUIData (
241
+ new TimelineUIData (
236
242
" processing-time-timeline" ,
237
- processingTime.data ,
243
+ processingTime.timelineData(unit) ,
238
244
minBatchTime,
239
245
maxBatchTime,
240
246
minTime,
241
247
maxTime,
242
- " ms " ).toHtml(jsCollector)
248
+ formattedUnit ).toHtml(jsCollector)
243
249
244
250
val distributionDataForProcessingTime =
245
- DistributionUIData (
251
+ new DistributionUIData (
246
252
" processing-time-distribution" ,
247
- processingTime.data.map(_._2 ),
253
+ processingTime.distributionData(unit ),
248
254
minTime,
249
255
maxTime,
250
- " ms " ).toHtml(jsCollector)
256
+ formattedUnit ).toHtml(jsCollector)
251
257
252
258
val timelineDataForTotalDelay =
253
- TimelineUIData (
259
+ new TimelineUIData (
254
260
" total-delay-timeline" ,
255
- totalDelay.data ,
261
+ totalDelay.timelineData(unit) ,
256
262
minBatchTime,
257
263
maxBatchTime,
258
264
minTime,
259
265
maxTime,
260
- " ms " ).toHtml(jsCollector)
266
+ formattedUnit ).toHtml(jsCollector)
261
267
262
268
val distributionDataForTotalDelay =
263
- DistributionUIData (
269
+ new DistributionUIData (
264
270
" total-delay-distribution" ,
265
- totalDelay.data.map(_._2 ),
271
+ totalDelay.distributionData(unit ),
266
272
minTime,
267
273
maxTime,
268
- " ms " ).toHtml(jsCollector)
274
+ formattedUnit ).toHtml(jsCollector)
269
275
270
276
val table =
271
277
// scalastyle:off
272
278
<table class =" table table-bordered" style =" width: auto" >
273
279
<thead >
274
- <tr ><th ></th ><th >Timelines </th ><th >Histograms </th ></tr >
280
+ <tr ><th style = " width: 160px; " ></th ><th style = " width: 492px; " >Timelines </th ><th style = " width: 300px; " >Histograms </th ></tr >
275
281
</thead >
276
282
<tbody >
277
283
<tr >
278
- <td style =" vertical-align: middle; width: 200px;" >
284
+ <td style =" vertical-align: middle; width: 160px;" >
285
+ <div style =" width: 160px;" >
279
286
<div >
280
287
<span onclick ={Unparsed (triangleJs)}>{Unparsed (BLACK_RIGHT_TRIANGLE_HTML )}</span >
281
288
<strong >Input Rate </strong >
282
289
</div >
283
290
<div >Avg : {eventRateForAllReceivers.formattedAvg} events/ sec</div >
291
+ </div >
284
292
</td >
285
293
<td class =" timeline" >{timelineDataForEventRateOfAllReceivers}</td >
286
294
<td class =" distribution" >{distributionDataForEventRateOfAllReceivers}</td >
@@ -325,19 +333,19 @@ private[ui] class StreamingPage(parent: StreamingTab)
325
333
jsCollector : JsCollector ,
326
334
minX : Long ,
327
335
maxX : Long ,
328
- minY : Long ,
329
- maxY : Long ): Seq [Node ] = {
336
+ minY : Double ,
337
+ maxY : Double ): Seq [Node ] = {
330
338
val content = listener.receivedRecordsDistributions.map { case (receiverId, distribution) =>
331
339
generateInputReceiverRow(jsCollector, receiverId, distribution, minX, maxX, minY, maxY)
332
340
}.foldLeft[Seq [Node ]](Nil )(_ ++ _)
333
341
334
342
<table class =" table table-bordered" style =" width: auto" >
335
343
<thead >
336
344
<tr >
337
- <th ></th >
338
- <th style =" width: 166.7px ; padding: 8px 0 8px 0" ><div style =" margin: 0 8px 0 8px" >Status </div ></th >
339
- <th style =" width: 166.7px ; padding: 8px 0 8px 0" ><div style =" margin: 0 8px 0 8px" >Location </div ></th >
340
- <th style =" width: 166.7px ; padding: 8px 0 8px 0" ><div style =" margin: 0 8px 0 8px" >Last Error Time </div ></th >
345
+ <th style = " width: 151px; " ></th >
346
+ <th style =" width: 167px ; padding: 8px 0 8px 0" ><div style =" margin: 0 8px 0 8px" >Status </div ></th >
347
+ <th style =" width: 167px ; padding: 8px 0 8px 0" ><div style =" margin: 0 8px 0 8px" >Location </div ></th >
348
+ <th style =" width: 166px ; padding: 8px 0 8px 0" ><div style =" margin: 0 8px 0 8px" >Last Error Time </div ></th >
341
349
<th >Last Error Message </th >
342
350
</tr >
343
351
</thead >
@@ -353,8 +361,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
353
361
distribution : Option [Distribution ],
354
362
minX : Long ,
355
363
maxX : Long ,
356
- minY : Long ,
357
- maxY : Long ): Seq [Node ] = {
364
+ minY : Double ,
365
+ maxY : Double ): Seq [Node ] = {
358
366
val avgReceiverEvents = distribution.map(_.statCounter.mean.toLong)
359
367
val receiverInfo = listener.receiverInfo(receiverId)
360
368
val receiverName = receiverInfo.map(_.name).getOrElse(s " Receiver- $receiverId" )
@@ -371,7 +379,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
371
379
val receivedRecords = listener.receivedRecordsWithBatchTime.get(receiverId).getOrElse(Seq ())
372
380
373
381
val timelineForEventRate =
374
- TimelineUIData (
382
+ new TimelineUIData (
375
383
s " receiver- $receiverId-events-timeline " ,
376
384
receivedRecords,
377
385
minX,
@@ -381,19 +389,21 @@ private[ui] class StreamingPage(parent: StreamingTab)
381
389
" events/sec" ).toHtml(jsCollector)
382
390
383
391
val distributionForEventsRate =
384
- DistributionUIData (
392
+ new DistributionUIData (
385
393
s " receiver- $receiverId-events-distribution " ,
386
394
receivedRecords.map(_._2),
387
395
minY,
388
396
maxY,
389
397
" events/sec" ).toHtml(jsCollector)
390
398
391
399
<tr >
392
- <td rowspan =" 2" style =" vertical-align: middle; width: 193px;" >
400
+ <td rowspan =" 2" style =" vertical-align: middle; width: 151px;" >
401
+ <div style =" width: 151px;" >
393
402
<div >
394
403
<strong >{receiverName}</strong >
395
404
</div >
396
405
<div >Avg : {avgReceiverEvents.map(_.toString).getOrElse(emptyCell)} events/ sec</div >
406
+ </div >
397
407
</td >
398
408
<td >{receiverActive}</td >
399
409
<td >{receiverLocation}</td >
@@ -449,6 +459,16 @@ private[ui] object StreamingPage {
449
459
def formatDurationOption (msOption : Option [Long ]): String = {
450
460
msOption.map(formatDurationVerbose).getOrElse(emptyCell)
451
461
}
462
+
463
+ def convertToTimeUnit (milliseconds : Long , unit : TimeUnit ): Double = unit match {
464
+ case TimeUnit .NANOSECONDS => milliseconds * 1000 * 1000 // not used yet
465
+ case TimeUnit .MICROSECONDS => milliseconds * 1000 // not used yet
466
+ case TimeUnit .MILLISECONDS => milliseconds
467
+ case TimeUnit .SECONDS => milliseconds / 1000.0
468
+ case TimeUnit .MINUTES => milliseconds / 1000.0 / 60.0
469
+ case TimeUnit .HOURS => milliseconds / 1000.0 / 60.0 / 60.0
470
+ case TimeUnit .DAYS => milliseconds / 1000.0 / 60.0 / 60.0 / 24.0
471
+ }
452
472
}
453
473
454
474
/**
0 commit comments