Skip to content

Commit d0b0aec

Browse files
committed
Clean up the codes
1 parent a459f49 commit d0b0aec

File tree

10 files changed

+351
-320
lines changed

10 files changed

+351
-320
lines changed

core/src/main/resources/org/apache/spark/ui/static/streaming-page.css

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
}
3232

3333
.tooltip-inner {
34-
max-width: 500px !important;
34+
max-width: 500px !important; // Make sure we only have one line tooltip
3535
}
3636

3737
.line {
@@ -46,19 +46,13 @@
4646
}
4747

4848
.bar rect:hover {
49-
//fill: rgb(49, 91, 125);
50-
//fill: #005580;
51-
fill: rgb(0, 194, 255);
52-
}
53-
54-
.stable-text text:hover {
55-
fill: #0088cc;
49+
fill: #00c2ff;
5650
}
5751

5852
.timeline {
5953
width: 500px;
6054
}
6155

62-
.distribution {
56+
.histogram {
6357
width: auto;
6458
}

core/src/main/resources/org/apache/spark/ui/static/streaming-page.js

Lines changed: 139 additions & 170 deletions
Large diffs are not rendered by default.

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ private[spark] object UIUtils extends Logging {
4949
}
5050

5151
/**
52-
* Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value after converting with
53-
* the `TimeUnit`.
52+
* Find the best `TimeUnit` for converting milliseconds to a friendly string. Return the value
53+
* after converting, also with its TimeUnit.
5454
*/
5555
def normalizeDuration(milliseconds: Long): (Double, TimeUnit) = {
5656
if (milliseconds < 1000) {
@@ -65,7 +65,11 @@ private[spark] object UIUtils extends Logging {
6565
return (minutes, TimeUnit.MINUTES)
6666
}
6767
val hours = minutes / 60
68-
(hours, TimeUnit.HOURS)
68+
if (hours < 24) {
69+
return (hours, TimeUnit.HOURS)
70+
}
71+
val days = hours / 24
72+
(days, TimeUnit.DAYS)
6973
}
7074

7175
def formatDate(date: Date): String = dateFormat.get.format(date)
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
19+
package org.apache.spark.ui
20+
21+
import java.util.concurrent.TimeUnit
22+
23+
import org.scalatest.FunSuite
24+
import org.scalatest.Matchers
25+
26+
class UIUtilsSuite extends FunSuite with Matchers{
27+
28+
test("shortTimeUnitString") {
29+
assert("ns" === UIUtils.shortTimeUnitString(TimeUnit.NANOSECONDS))
30+
assert("us" === UIUtils.shortTimeUnitString(TimeUnit.MICROSECONDS))
31+
assert("ms" === UIUtils.shortTimeUnitString(TimeUnit.MILLISECONDS))
32+
assert("sec" === UIUtils.shortTimeUnitString(TimeUnit.SECONDS))
33+
assert("min" === UIUtils.shortTimeUnitString(TimeUnit.MINUTES))
34+
assert("hrs" === UIUtils.shortTimeUnitString(TimeUnit.HOURS))
35+
assert("days" === UIUtils.shortTimeUnitString(TimeUnit.DAYS))
36+
}
37+
38+
test("normalizeDuration") {
39+
verifyNormalizedTime(900, TimeUnit.MILLISECONDS, 900)
40+
verifyNormalizedTime(1.0, TimeUnit.SECONDS, 1000)
41+
verifyNormalizedTime(1.0, TimeUnit.MINUTES, 60 * 1000)
42+
verifyNormalizedTime(1.0, TimeUnit.HOURS, 60 * 60 * 1000)
43+
verifyNormalizedTime(1.0, TimeUnit.DAYS, 24 * 60 * 60 * 1000)
44+
}
45+
46+
private def verifyNormalizedTime(
47+
expectedTime: Double, expectedUnit: TimeUnit, input: Long): Unit = {
48+
val (time, unit) = UIUtils.normalizeDuration(input)
49+
time should be (expectedTime +- 1E-6)
50+
unit should be (expectedUnit)
51+
}
52+
}

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverInfo.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ case class ReceiverInfo(
3232
active: Boolean,
3333
location: String,
3434
lastErrorMessage: String = "",
35-
lastError: String = ""
35+
lastError: String = "",
36+
lastErrorTime: Long = -1L
3637
) {
3738
}

streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
155155
private def deregisterReceiver(streamId: Int, message: String, error: String) {
156156
val newReceiverInfo = receiverInfo.get(streamId) match {
157157
case Some(oldInfo) =>
158-
oldInfo.copy(actor = null, active = false, lastErrorMessage = message, lastError = error)
158+
oldInfo.copy(actor = null, active = false, lastErrorMessage = message,
159+
lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis())
159160
case None =>
160161
logWarning("No prior receiver info")
161-
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
162+
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message,
163+
lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis())
162164
}
163165
receiverInfo -= streamId
164166
listenerBus.post(StreamingListenerReceiverStopped(newReceiverInfo))
@@ -182,7 +184,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
182184
oldInfo.copy(lastErrorMessage = message, lastError = error)
183185
case None =>
184186
logWarning("No prior receiver info")
185-
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message, lastError = error)
187+
ReceiverInfo(streamId, "", null, false, "", lastErrorMessage = message,
188+
lastError = error, lastErrorTime = ssc.scheduler.clock.getTimeMillis())
186189
}
187190
receiverInfo(streamId) = newReceiverInfo
188191
listenerBus.post(StreamingListenerReceiverError(receiverInfo(streamId)))

streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ private[ui] abstract class BatchTableBase(tableId: String) {
2727
protected def columns: Seq[Node] = {
2828
<th>Batch Time</th>
2929
<th>Input Size</th>
30-
<th>Scheduling Delay</th>
30+
<th>Streaming Scheduling Delay</th>
3131
<th>Processing Time</th>
3232
}
3333

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala

Lines changed: 2 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
2525
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
2626
import org.apache.spark.streaming.scheduler.BatchInfo
2727
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
28-
import org.apache.spark.util.Distribution
2928

3029

3130
private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
@@ -39,7 +38,6 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
3938
private var totalReceivedRecords = 0L
4039
private var totalProcessedRecords = 0L
4140
private val receiverInfos = new HashMap[Int, ReceiverInfo]
42-
private val receiverLastErrorTime = new HashMap[Int, Long]
4341

4442
val batchDuration = ssc.graph.batchDuration.milliseconds
4543

@@ -52,7 +50,6 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
5250
override def onReceiverError(receiverError: StreamingListenerReceiverError) {
5351
synchronized {
5452
receiverInfos(receiverError.receiverInfo.streamId) = receiverError.receiverInfo
55-
receiverLastErrorTime(receiverError.receiverInfo.streamId) = System.currentTimeMillis()
5653
}
5754
}
5855

@@ -123,32 +120,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
123120
completedBatchInfos.toSeq
124121
}
125122

126-
def processingDelayDistribution: Option[Distribution] = synchronized {
127-
extractDistribution(_.processingDelay)
128-
}
129-
130-
def schedulingDelayDistribution: Option[Distribution] = synchronized {
131-
extractDistribution(_.schedulingDelay)
132-
}
133-
134-
def totalDelayDistribution: Option[Distribution] = synchronized {
135-
extractDistribution(_.totalDelay)
136-
}
137-
138-
def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
139-
val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit)
140-
val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
141-
(0 until numReceivers).map { receiverId =>
142-
val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
143-
batchInfo.get(receiverId).getOrElse(Array.empty)
144-
}
145-
val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo =>
146-
// calculate records per second for each batch
147-
blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration
148-
}
149-
val distributionOption = Distribution(recordsOfParticularReceiver)
150-
(receiverId, distributionOption)
151-
}.toMap
123+
def allReceivers: Seq[Int] = synchronized {
124+
receiverInfos.keys.toSeq
152125
}
153126

154127
def receivedRecordsWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
@@ -184,10 +157,6 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
184157
receiverInfos.get(receiverId)
185158
}
186159

187-
def receiverLastErrorTime(receiverId: Int): Option[Long] = synchronized {
188-
receiverLastErrorTime.get(receiverId)
189-
}
190-
191160
def lastCompletedBatch: Option[BatchInfo] = synchronized {
192161
completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
193162
}
@@ -200,8 +169,4 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
200169
(waitingBatchInfos.values.toSeq ++
201170
runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering)
202171
}
203-
204-
private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
205-
Distribution(completedBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
206-
}
207172
}

0 commit comments

Comments
 (0)