Skip to content

Commit db27bad

Browse files
committed
Added last batch processing time to StreamingUI.
1 parent 4d86e98 commit db27bad

File tree

5 files changed

+147
-72
lines changed

5 files changed

+147
-72
lines changed

streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
3434
import org.apache.spark.streaming._
3535
import org.apache.spark.streaming.scheduler.{ReceivedBlockInfo, AddBlocks, DeregisterReceiver, RegisterReceiver}
3636
import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
37+
import org.apache.spark.util.Utils
3738

3839
/**
3940
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
@@ -206,7 +207,9 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
206207
val timeout = 5.seconds
207208

208209
override def preStart() {
209-
val future = tracker.ask(RegisterReceiver(streamId, self))(timeout)
210+
val msg = RegisterReceiver(
211+
streamId, NetworkReceiver.this.getClass.getSimpleName, Utils.localHostName(), self)
212+
val future = tracker.ask(msg)(timeout)
210213
Await.result(future, timeout)
211214
}
212215

streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,12 @@ import org.apache.spark.streaming.{StreamingContext, Time}
2828
import org.apache.spark.streaming.dstream.{NetworkReceiver, StopReceiver}
2929
import org.apache.spark.util.AkkaUtils
3030

31-
/** Information about block received by the network receiver */
31+
/** Information about receiver */
32+
case class ReceiverInfo(streamId: Int, typ: String, location: String) {
33+
override def toString = s"$typ-$streamId"
34+
}
35+
36+
/** Information about blocks received by the network receiver */
3237
case class ReceivedBlockInfo(
3338
streamId: Int,
3439
blockId: StreamBlockId,
@@ -41,8 +46,12 @@ case class ReceivedBlockInfo(
4146
* with each other.
4247
*/
4348
private[streaming] sealed trait NetworkInputTrackerMessage
44-
private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: ActorRef)
45-
extends NetworkInputTrackerMessage
49+
private[streaming] case class RegisterReceiver(
50+
streamId: Int,
51+
typ: String,
52+
host: String,
53+
receiverActor: ActorRef
54+
) extends NetworkInputTrackerMessage
4655
private[streaming] case class AddBlocks(receivedBlockInfo: ReceivedBlockInfo)
4756
extends NetworkInputTrackerMessage
4857
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
@@ -108,11 +117,14 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
108117
/** Actor to receive messages from the receivers. */
109118
private class NetworkInputTrackerActor extends Actor {
110119
def receive = {
111-
case RegisterReceiver(streamId, receiverActor) => {
120+
case RegisterReceiver(streamId, typ, host, receiverActor) => {
112121
if (!networkInputStreamMap.contains(streamId)) {
113122
throw new Exception("Register received for unexpected id " + streamId)
114123
}
115124
receiverInfo += ((streamId, receiverActor))
125+
ssc.scheduler.listenerBus.post(StreamingListenerReceiverStarted(
126+
ReceiverInfo(streamId, typ, host)
127+
))
116128
logInfo("Registered receiver for network stream " + streamId + " from "
117129
+ sender.path.address)
118130
sender ! true

streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ sealed trait StreamingListenerEvent
2626
case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
2727
case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends StreamingListenerEvent
2828
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
29+
case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo)
30+
extends StreamingListenerEvent
2931

3032
/** An event used in the listener to shutdown the listener daemon thread. */
3133
private[scheduler] case object StreamingListenerShutdown extends StreamingListenerEvent
@@ -36,6 +38,9 @@ private[scheduler] case object StreamingListenerShutdown extends StreamingListen
3638
*/
3739
trait StreamingListener {
3840

41+
/** Called when a receiver has been started */
42+
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }
43+
3944
/** Called when a batch of jobs has been submitted for processing. */
4045
def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) { }
4146

streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ private[spark] class StreamingListenerBus() extends Logging {
3838
while (true) {
3939
val event = eventQueue.take
4040
event match {
41+
case receiverStarted: StreamingListenerReceiverStarted =>
42+
listeners.foreach(_.onReceiverStarted(receiverStarted))
43+
case batchSubmitted: StreamingListenerBatchSubmitted =>
44+
listeners.foreach(_.onBatchSubmitted(batchSubmitted))
4145
case batchStarted: StreamingListenerBatchStarted =>
4246
listeners.foreach(_.onBatchStarted(batchStarted))
4347
case batchCompleted: StreamingListenerBatchCompleted =>

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingUI.scala

Lines changed: 118 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,17 @@ private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingLi
3737
private val runningBatchInfos = new HashMap[Time, BatchInfo]
3838
private val completedaBatchInfos = new Queue[BatchInfo]
3939
private val batchInfoLimit = ssc.conf.getInt("spark.steaming.ui.maxBatches", 100)
40-
private var totalBatchesCompleted = 0L
40+
private var totalCompletedBatches = 0L
41+
private val receiverInfos = new HashMap[Int, ReceiverInfo]
4142

4243
val batchDuration = ssc.graph.batchDuration.milliseconds
4344

45+
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) = {
46+
synchronized {
47+
receiverInfos.put(receiverStarted.receiverInfo.streamId, receiverStarted.receiverInfo)
48+
}
49+
}
50+
4451
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized {
4552
runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
4653
}
@@ -55,15 +62,19 @@ private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingLi
5562
runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
5663
completedaBatchInfos.enqueue(batchCompleted.batchInfo)
5764
if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
58-
totalBatchesCompleted += 1L
65+
totalCompletedBatches += 1L
5966
}
6067

61-
def numTotalBatchesCompleted: Long = synchronized {
62-
totalBatchesCompleted
68+
def numNetworkReceivers = synchronized {
69+
ssc.graph.getNetworkInputStreams().size
6370
}
6471

65-
def numNetworkReceivers: Int = synchronized {
66-
completedaBatchInfos.headOption.map(_.receivedBlockInfo.size).getOrElse(0)
72+
def numTotalCompletedBatches: Long = synchronized {
73+
totalCompletedBatches
74+
}
75+
76+
def numUnprocessedBatches: Long = synchronized {
77+
waitingBatchInfos.size + runningBatchInfos.size
6778
}
6879

6980
def waitingBatches: Seq[BatchInfo] = synchronized {
@@ -91,9 +102,7 @@ private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingLi
91102
}
92103

93104
def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
94-
val allBatcheInfos = waitingBatchInfos.values.toSeq ++
95-
runningBatchInfos.values.toSeq ++ completedaBatchInfos
96-
val latestBatchInfos = allBatcheInfos.sortBy(_.batchTime)(Time.ordering).reverse.take(batchInfoLimit)
105+
val latestBatchInfos = allBatches.reverse.take(batchInfoLimit)
97106
val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
98107
(0 until numNetworkReceivers).map { receiverId =>
99108
val blockInfoOfParticularReceiver = latestBlockInfos.map(_.get(receiverId).getOrElse(Array.empty))
@@ -103,6 +112,34 @@ private[ui] class StreamingUIListener(ssc: StreamingContext) extends StreamingLi
103112
}.toMap
104113
}
105114

115+
def lastReceivedBatchRecords: Map[Int, Long] = {
116+
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
117+
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
118+
(0 until numNetworkReceivers).map { receiverId =>
119+
(receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum)
120+
}.toMap
121+
}.getOrElse {
122+
(0 until numNetworkReceivers).map(receiverId => (receiverId, 0L)).toMap
123+
}
124+
}
125+
126+
def receiverInfo(receiverId: Int): Option[ReceiverInfo] = {
127+
receiverInfos.get(receiverId)
128+
}
129+
130+
def lastCompletedBatch: Option[BatchInfo] = {
131+
completedaBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
132+
}
133+
134+
def lastReceivedBatch: Option[BatchInfo] = {
135+
allBatches.lastOption
136+
}
137+
138+
private def allBatches: Seq[BatchInfo] = synchronized {
139+
(waitingBatchInfos.values.toSeq ++
140+
runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering)
141+
}
142+
106143
private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
107144
Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
108145
}
@@ -114,13 +151,13 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
114151
private val listener = parent.listener
115152
private val calendar = Calendar.getInstance()
116153
private val startTime = calendar.getTime()
117-
154+
private val emptyCellTest = "-"
118155

119156
def render(request: HttpServletRequest): Seq[Node] = {
120157

121158
val content =
122159
generateBasicStats() ++
123-
<h4>Statistics over last {listener.completedBatches.size} processed batches</h4> ++
160+
<br></br><h4>Statistics over last {listener.completedBatches.size} processed batches</h4> ++
124161
generateNetworkStatsTable() ++
125162
generateBatchStatsTable()
126163
UIUtils.headerStreamingPage(content, "", parent.appName, "Spark Streaming Overview")
@@ -136,28 +173,76 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
136173
<li>
137174
<strong>Time since start: </strong>{msDurationToString(timeSinceStart)}
138175
</li>
176+
<li>
177+
<strong>Network receivers: </strong>{listener.numNetworkReceivers}
178+
</li>
139179
<li>
140180
<strong>Batch interval: </strong>{msDurationToString(listener.batchDuration)}
141181
</li>
142182
<li>
143-
<strong>Processed batches: </strong>{listener.numTotalBatchesCompleted}
183+
<strong>Processed batches: </strong>{listener.numTotalCompletedBatches}
184+
</li>
185+
<li>
186+
<strong>Waiting batches: </strong>{listener.numUnprocessedBatches}
144187
</li>
145-
<li></li>
146188
</ul>
147189
}
148190

191+
private def generateNetworkStatsTable(): Seq[Node] = {
192+
val receivedRecordDistributions = listener.receivedRecordsDistributions
193+
val lastBatchReceivedRecord = listener.lastReceivedBatchRecords
194+
val table = if (receivedRecordDistributions.size > 0) {
195+
val headerRow = Seq(
196+
"Receiver",
197+
"Location",
198+
s"Records in last batch",
199+
"Minimum rate [records/sec]",
200+
"25th percentile rate [records/sec]",
201+
"Median rate [records/sec]",
202+
"75th percentile rate [records/sec]",
203+
"Maximum rate [records/sec]"
204+
)
205+
val dataRows = (0 until listener.numNetworkReceivers).map { receiverId =>
206+
val receiverInfo = listener.receiverInfo(receiverId)
207+
val receiverName = receiverInfo.map(_.toString).getOrElse(s"Receiver-$receiverId")
208+
val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCellTest)
209+
val receiverLastBatchRecords = numberToString(lastBatchReceivedRecord(receiverId))
210+
val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
211+
d.getQuantiles().map(r => numberToString(r.toLong))
212+
}.getOrElse {
213+
Seq(emptyCellTest, emptyCellTest, emptyCellTest, emptyCellTest, emptyCellTest)
214+
}
215+
Seq(receiverName, receiverLocation, receiverLastBatchRecords) ++
216+
receivedRecordStats
217+
}
218+
Some(UIUtils.listingTable(headerRow, dataRows, fixedWidth = true))
219+
} else {
220+
None
221+
}
222+
223+
val content =
224+
<h5>Network Input Statistics</h5> ++
225+
<div>{table.getOrElse("No network receivers")}</div>
226+
227+
content
228+
}
229+
149230
private def generateBatchStatsTable(): Seq[Node] = {
150231
val numBatches = listener.completedBatches.size
232+
val lastCompletedBatch = listener.lastCompletedBatch
151233
val table = if (numBatches > 0) {
152234
val processingDelayQuantilesRow =
153-
"Processing Times" +: getQuantiles(listener.processingDelayDistribution)
235+
Seq("Processing Time", msDurationToString(lastCompletedBatch.flatMap(_.processingDelay))) ++
236+
getQuantiles(listener.processingDelayDistribution)
154237
val schedulingDelayQuantilesRow =
155-
"Scheduling Delay:" +: getQuantiles(listener.schedulingDelayDistribution)
238+
Seq("Scheduling Delay", msDurationToString(lastCompletedBatch.flatMap(_.schedulingDelay))) ++
239+
getQuantiles(listener.schedulingDelayDistribution)
156240
val totalDelayQuantilesRow =
157-
"End-to-end Delay:" +: getQuantiles(listener.totalDelayDistribution)
241+
Seq("Total Delay", msDurationToString(lastCompletedBatch.flatMap(_.totalDelay))) ++
242+
getQuantiles(listener.totalDelayDistribution)
158243

159-
val headerRow = Seq("Metric", "Min", "25th percentile",
160-
"Median", "75th percentile", "Max")
244+
val headerRow = Seq("Metric", "Last batch", "Minimum", "25th percentile",
245+
"Median", "75th percentile", "Maximum")
161246
val dataRows: Seq[Seq[String]] = Seq(
162247
processingDelayQuantilesRow,
163248
schedulingDelayQuantilesRow,
@@ -168,57 +253,19 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
168253
None
169254
}
170255

171-
val batchCounts =
172-
<ul class="unstyled">
173-
<li>
174-
# batches being processed: {listener.runningBatches.size}
175-
</li>
176-
<li>
177-
# scheduled batches: {listener.waitingBatches.size}
178-
</li>
179-
</ul>
180-
181-
val batchStats =
182-
<ul class="unstyled">
183-
{table.getOrElse("No statistics have been generated yet.")}
184-
</ul>
185-
186256
val content =
187257
<h5>Batch Processing Statistics</h5> ++
188-
<div>{batchStats}</div>
189-
190-
content
191-
}
192-
193-
private def generateNetworkStatsTable(): Seq[Node] = {
194-
val receivedRecordDistributions = listener.receivedRecordsDistributions
195-
val numNetworkReceivers = receivedRecordDistributions.size
196-
val table = if (receivedRecordDistributions.size > 0) {
197-
val headerRow = Seq("Receiver", "Min", "25th percentile",
198-
"Median", "75th percentile", "Max")
199-
val dataRows = (0 until numNetworkReceivers).map { receiverId =>
200-
val receiverName = s"Receiver-$receiverId"
201-
val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
202-
d.getQuantiles().map(r => numberToString(r.toLong) + " records/second")
203-
}.getOrElse {
204-
Seq("-", "-", "-", "-", "-")
205-
}
206-
receiverName +: receivedRecordStats
207-
}
208-
Some(UIUtils.listingTable(headerRow, dataRows, fixedWidth = true))
209-
} else {
210-
None
211-
}
212-
213-
val content =
214-
<h5>Network Input Statistics</h5> ++
215-
<div>{table.getOrElse("No network receivers")}</div>
258+
<div>
259+
<ul class="unstyled">
260+
{table.getOrElse("No statistics have been generated yet.")}
261+
</ul>
262+
</div>
216263

217264
content
218265
}
219266

220267
private def getQuantiles(timeDistributionOption: Option[Distribution]) = {
221-
timeDistributionOption.get.getQuantiles().map { ms => Utils.msDurationToString(ms.toLong) }
268+
timeDistributionOption.get.getQuantiles().map { ms => msDurationToString(ms.toLong) }
222269
}
223270

224271
private def numberToString(records: Double): String = {
@@ -229,13 +276,13 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
229276

230277
val (value, unit) = {
231278
if (records >= 2*trillion) {
232-
(records / trillion, "T")
279+
(records / trillion, " T")
233280
} else if (records >= 2*billion) {
234-
(records / billion, "B")
281+
(records / billion, " B")
235282
} else if (records >= 2*million) {
236-
(records / million, "M")
283+
(records / million, " M")
237284
} else if (records >= 2*thousand) {
238-
(records / thousand, "K")
285+
(records / thousand, " K")
239286
} else {
240287
(records, "")
241288
}
@@ -265,7 +312,7 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
265312
}
266313
}
267314

268-
val millisecondsString = if (ms % second == 0) "" else s"${ms % second} ms"
315+
val millisecondsString = if (ms >= second && ms % second == 0) "" else s"${ms % second} ms"
269316
val secondString = toString((ms % minute) / second, "second")
270317
val minuteString = toString((ms % hour) / minute, "minute")
271318
val hourString = toString((ms % day) / hour, "hour")
@@ -292,6 +339,10 @@ private[ui] class StreamingPage(parent: StreamingUI) extends Logging {
292339
return ""
293340
}
294341
}
342+
343+
private def msDurationToString(msOption: Option[Long]): String = {
344+
msOption.map(msDurationToString).getOrElse(emptyCellTest)
345+
}
295346
}
296347

297348

0 commit comments

Comments
 (0)