Skip to content

Commit e038b4b

Browse files
committed
Addressed Patrick's comments.
1 parent 89dae36 commit e038b4b

File tree

3 files changed

+13
-13
lines changed

3 files changed

+13
-13
lines changed

project/MimaBuild.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ object MimaBuild {
7575
excludeSparkClass("streaming.dstream.NetworkReceiver#BlockGenerator") ++
7676
excludeSparkClass("streaming.dstream.NetworkReceiver#BlockGenerator#Block") ++
7777
excludeSparkClass("streaming.dstream.ReportError") ++
78-
excludeSparkClass("org.apache.spark.streaming.dstream.ReportBlock") ++
79-
excludeSparkClass("org.apache.spark.streaming.dstream.DStream")
78+
excludeSparkClass("streaming.dstream.ReportBlock") ++
79+
excludeSparkClass("streaming.dstream.DStream")
8080
case _ => Seq()
8181
}
8282

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St
3333
private val waitingBatchInfos = new HashMap[Time, BatchInfo]
3434
private val runningBatchInfos = new HashMap[Time, BatchInfo]
3535
private val completedaBatchInfos = new Queue[BatchInfo]
36-
private val batchInfoLimit = ssc.conf.getInt("spark.steaming.ui.maxBatches", 100)
36+
private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
3737
private var totalCompletedBatches = 0L
3838
private val receiverInfos = new HashMap[Int, ReceiverInfo]
3939

@@ -82,7 +82,7 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St
8282
runningBatchInfos.values.toSeq
8383
}
8484

85-
def completedBatches: Seq[BatchInfo] = synchronized {
85+
def retainedCompletedBatches: Seq[BatchInfo] = synchronized {
8686
completedaBatchInfos.toSeq
8787
}
8888

@@ -99,7 +99,7 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St
9999
}
100100

101101
def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
102-
val latestBatchInfos = allBatches.reverse.take(batchInfoLimit)
102+
val latestBatchInfos = retainedBatches.reverse.take(batchInfoLimit)
103103
val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
104104
(0 until numNetworkReceivers).map { receiverId =>
105105
val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
@@ -134,10 +134,10 @@ private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends St
134134
}
135135

136136
def lastReceivedBatch: Option[BatchInfo] = {
137-
allBatches.lastOption
137+
retainedBatches.lastOption
138138
}
139139

140-
private def allBatches: Seq[BatchInfo] = synchronized {
140+
private def retainedBatches: Seq[BatchInfo] = synchronized {
141141
(waitingBatchInfos.values.toSeq ++
142142
runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering)
143143
}

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,13 @@ private[ui] class StreamingPage(parent: StreamingTab)
3333

3434
private val listener = parent.listener
3535
private val startTime = Calendar.getInstance().getTime()
36-
private val emptyCellTest = "-"
36+
private val emptyCell = "-"
3737

3838
/** Render the page */
3939
override def render(request: HttpServletRequest): Seq[Node] = {
4040
val content =
4141
generateBasicStats() ++
42-
<br></br><h4>Statistics over last {listener.completedBatches.size} processed batches</h4> ++
42+
<br></br><h4>Statistics over last {listener.retainedCompletedBatches.size} processed batches</h4> ++
4343
generateNetworkStatsTable() ++
4444
generateBatchStatsTable()
4545
UIUtils.headerSparkPage(
@@ -89,12 +89,12 @@ private[ui] class StreamingPage(parent: StreamingTab)
8989
val dataRows = (0 until listener.numNetworkReceivers).map { receiverId =>
9090
val receiverInfo = listener.receiverInfo(receiverId)
9191
val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId")
92-
val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCellTest)
92+
val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
9393
val receiverLastBatchRecords = formatDurationVerbose(lastBatchReceivedRecord(receiverId))
9494
val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
9595
d.getQuantiles().map(r => formatDurationVerbose(r.toLong))
9696
}.getOrElse {
97-
Seq(emptyCellTest, emptyCellTest, emptyCellTest, emptyCellTest, emptyCellTest)
97+
Seq(emptyCell, emptyCell, emptyCell, emptyCell, emptyCell)
9898
}
9999
Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++ receivedRecordStats
100100
}
@@ -112,7 +112,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
112112

113113
/** Generate stats of batch jobs of the streaming program */
114114
private def generateBatchStatsTable(): Seq[Node] = {
115-
val numBatches = listener.completedBatches.size
115+
val numBatches = listener.retainedCompletedBatches.size
116116
val lastCompletedBatch = listener.lastCompletedBatch
117117
val table = if (numBatches > 0) {
118118
val processingDelayQuantilesRow = {
@@ -161,7 +161,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
161161
* Returns a human-readable string representing a duration such as "5 second 35 ms"
162162
*/
163163
private def formatDurationOption(msOption: Option[Long]): String = {
164-
msOption.map(formatDurationVerbose).getOrElse(emptyCellTest)
164+
msOption.map(formatDurationVerbose).getOrElse(emptyCell)
165165
}
166166

167167
/** Get quantiles for any time distribution */

0 commit comments

Comments
 (0)