Skip to content

Commit 1af239b

Browse files
committed
Changed streaming UI to attach itself as a tab with the Spark UI.
1 parent 827e81a commit 1af239b

File tree

8 files changed

+241
-405
lines changed

8 files changed

+241
-405
lines changed

core/src/main/scala/org/apache/spark/ui/FooTab.scala

Lines changed: 0 additions & 105 deletions
This file was deleted.

core/src/main/scala/org/apache/spark/ui/UIUtils.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ private[spark] object UIUtils {
6161
appName: String,
6262
title: String,
6363
tabs: Seq[UITab],
64-
activeTab: UITab) : Seq[Node] = {
64+
activeTab: UITab,
65+
refreshInterval: Option[Int] = None
66+
) : Seq[Node] = {
6567

6668
val header = tabs.map { tab =>
6769
<li class={if (tab == activeTab) "active" else ""}>
@@ -78,8 +80,17 @@ private[spark] object UIUtils {
7880
type="text/css" />
7981
<script src={prependBaseUri("/static/sorttable.js")} ></script>
8082
<title>{appName} - {title}</title>
83+
<script type="text/JavaScript">
84+
<!--
85+
function timedRefresh(timeoutPeriod) {
86+
if (timeoutPeriod > 0) {
87+
setTimeout("location.reload(true);",timeoutPeriod);
88+
}
89+
}
90+
// -->
91+
</script>
8192
</head>
82-
<body>
93+
<body onload={s"JavaScript:timedRefresh(${refreshInterval.getOrElse(-1)});"}>
8394
<div class="navbar navbar-static-top">
8495
<div class="navbar-inner">
8596
<a href={prependBaseUri(basePath, "/")} class="brand">

streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,28 @@
1717

1818
package org.apache.spark.streaming
1919

20-
import scala.collection.mutable.Queue
21-
import scala.collection.Map
22-
import scala.reflect.ClassTag
23-
2420
import java.io.InputStream
2521
import java.util.concurrent.atomic.AtomicInteger
2622

27-
import akka.actor.Props
28-
import akka.actor.SupervisorStrategy
29-
import org.apache.hadoop.io.LongWritable
30-
import org.apache.hadoop.io.Text
23+
import scala.collection.Map
24+
import scala.collection.mutable.Queue
25+
import scala.reflect.ClassTag
26+
27+
import akka.actor.{Props, SupervisorStrategy}
28+
import org.apache.hadoop.conf.Configuration
29+
import org.apache.hadoop.fs.Path
30+
import org.apache.hadoop.io.{LongWritable, Text}
3131
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
3232
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
33-
import org.apache.hadoop.fs.Path
3433

3534
import org.apache.spark._
3635
import org.apache.spark.rdd.RDD
3736
import org.apache.spark.storage.StorageLevel
38-
import org.apache.spark.util.MetadataCleaner
3937
import org.apache.spark.streaming.dstream._
4038
import org.apache.spark.streaming.receivers._
4139
import org.apache.spark.streaming.scheduler._
42-
import org.apache.hadoop.conf.Configuration
43-
import org.apache.spark.streaming.ui.StreamingUI
40+
import org.apache.spark.streaming.ui.StreamingTab
41+
import org.apache.spark.util.MetadataCleaner
4442

4543
/**
4644
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -159,8 +157,8 @@ class StreamingContext private[streaming] (
159157

160158
private[streaming] val waiter = new ContextWaiter
161159

162-
private[streaming] val ui = new StreamingUI(this)
163-
ui.bind()
160+
private[streaming] val ui = new StreamingTab(this)
161+
ui.start()
164162

165163
/** Enumeration to identify current state of the StreamingContext */
166164
private[streaming] object StreamingContextState extends Enumeration {

streaming/src/main/scala/org/apache/spark/streaming/dstream/NetworkInputDStream.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.spark.{Logging, SparkEnv}
3131
import org.apache.spark.rdd.{BlockRDD, RDD}
3232
import org.apache.spark.storage.{BlockId, StorageLevel, StreamBlockId}
3333
import org.apache.spark.streaming._
34-
import org.apache.spark.streaming.scheduler.{AddBlocks, DeregisterReceiver, ReceivedBlockInfo, RegisterReceiver}
34+
import org.apache.spark.streaming.scheduler.{AddBlock, DeregisterReceiver, ReceivedBlockInfo, RegisterReceiver}
3535
import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
3636
import org.apache.spark.util.{AkkaUtils, Utils}
3737

@@ -237,7 +237,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
237237
level: StorageLevel
238238
) {
239239
env.blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], level)
240-
trackerActor ! AddBlocks(ReceivedBlockInfo(streamId, blockId, arrayBuffer.size, metadata))
240+
trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, arrayBuffer.size, metadata))
241241
logDebug("Pushed block " + blockId)
242242
}
243243

@@ -251,7 +251,7 @@ abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging
251251
level: StorageLevel
252252
) {
253253
env.blockManager.putBytes(blockId, bytes, level)
254-
trackerActor ! AddBlocks(ReceivedBlockInfo(streamId, blockId, -1, metadata))
254+
trackerActor ! AddBlock(ReceivedBlockInfo(streamId, blockId, -1, metadata))
255255
}
256256

257257
/** Set the ID of the DStream that this receiver is associated with */

streaming/src/main/scala/org/apache/spark/streaming/scheduler/NetworkInputTracker.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ private[streaming] case class RegisterReceiver(
5252
host: String,
5353
receiverActor: ActorRef
5454
) extends NetworkInputTrackerMessage
55-
private[streaming] case class AddBlocks(receivedBlockInfo: ReceivedBlockInfo)
55+
private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
5656
extends NetworkInputTrackerMessage
5757
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String)
5858
extends NetworkInputTrackerMessage
@@ -153,7 +153,7 @@ class NetworkInputTracker(ssc: StreamingContext) extends Logging {
153153
case RegisterReceiver(streamId, typ, host, receiverActor) =>
154154
registerReceiver(streamId, typ, host, receiverActor, sender)
155155
sender ! true
156-
case AddBlocks(receivedBlockInfo) =>
156+
case AddBlock(receivedBlockInfo) =>
157157
addBlocks(receivedBlockInfo)
158158
case DeregisterReceiver(streamId, message) =>
159159
deregisterReceiver(streamId, message)
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package org.apache.spark.streaming.ui
2+
3+
import org.apache.spark.streaming.{Time, StreamingContext}
4+
import org.apache.spark.streaming.scheduler._
5+
import scala.collection.mutable.{Queue, HashMap}
6+
import org.apache.spark.streaming.scheduler.StreamingListenerReceiverStarted
7+
import org.apache.spark.streaming.scheduler.StreamingListenerBatchStarted
8+
import org.apache.spark.streaming.scheduler.BatchInfo
9+
import org.apache.spark.streaming.scheduler.ReceiverInfo
10+
import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
11+
import org.apache.spark.util.Distribution
12+
13+
14+
private[ui] class StreamingProgressListener(ssc: StreamingContext) extends StreamingListener {
15+
16+
private val waitingBatchInfos = new HashMap[Time, BatchInfo]
17+
private val runningBatchInfos = new HashMap[Time, BatchInfo]
18+
private val completedaBatchInfos = new Queue[BatchInfo]
19+
private val batchInfoLimit = ssc.conf.getInt("spark.steaming.ui.maxBatches", 100)
20+
private var totalCompletedBatches = 0L
21+
private val receiverInfos = new HashMap[Int, ReceiverInfo]
22+
23+
val batchDuration = ssc.graph.batchDuration.milliseconds
24+
25+
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) = {
26+
synchronized {
27+
receiverInfos.put(receiverStarted.receiverInfo.streamId, receiverStarted.receiverInfo)
28+
}
29+
}
30+
31+
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized {
32+
runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
33+
}
34+
35+
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized {
36+
runningBatchInfos(batchStarted.batchInfo.batchTime) = batchStarted.batchInfo
37+
waitingBatchInfos.remove(batchStarted.batchInfo.batchTime)
38+
}
39+
40+
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
41+
waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
42+
runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
43+
completedaBatchInfos.enqueue(batchCompleted.batchInfo)
44+
if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
45+
totalCompletedBatches += 1L
46+
}
47+
48+
def numNetworkReceivers = synchronized {
49+
ssc.graph.getNetworkInputStreams().size
50+
}
51+
52+
def numTotalCompletedBatches: Long = synchronized {
53+
totalCompletedBatches
54+
}
55+
56+
def numUnprocessedBatches: Long = synchronized {
57+
waitingBatchInfos.size + runningBatchInfos.size
58+
}
59+
60+
def waitingBatches: Seq[BatchInfo] = synchronized {
61+
waitingBatchInfos.values.toSeq
62+
}
63+
64+
def runningBatches: Seq[BatchInfo] = synchronized {
65+
runningBatchInfos.values.toSeq
66+
}
67+
68+
def completedBatches: Seq[BatchInfo] = synchronized {
69+
completedaBatchInfos.toSeq
70+
}
71+
72+
def processingDelayDistribution: Option[Distribution] = synchronized {
73+
extractDistribution(_.processingDelay)
74+
}
75+
76+
def schedulingDelayDistribution: Option[Distribution] = synchronized {
77+
extractDistribution(_.schedulingDelay)
78+
}
79+
80+
def totalDelayDistribution: Option[Distribution] = synchronized {
81+
extractDistribution(_.totalDelay)
82+
}
83+
84+
def receivedRecordsDistributions: Map[Int, Option[Distribution]] = synchronized {
85+
val latestBatchInfos = allBatches.reverse.take(batchInfoLimit)
86+
val latestBlockInfos = latestBatchInfos.map(_.receivedBlockInfo)
87+
(0 until numNetworkReceivers).map { receiverId =>
88+
val blockInfoOfParticularReceiver = latestBlockInfos.map { batchInfo =>
89+
batchInfo.get(receiverId).getOrElse(Array.empty)
90+
}
91+
val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map { blockInfo =>
92+
// calculate records per second for each batch
93+
blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration
94+
}
95+
val distributionOption = Distribution(recordsOfParticularReceiver)
96+
(receiverId, distributionOption)
97+
}.toMap
98+
}
99+
100+
def lastReceivedBatchRecords: Map[Int, Long] = {
101+
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
102+
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
103+
(0 until numNetworkReceivers).map { receiverId =>
104+
(receiverId, lastReceivedBlockInfo(receiverId).map(_.numRecords).sum)
105+
}.toMap
106+
}.getOrElse {
107+
(0 until numNetworkReceivers).map(receiverId => (receiverId, 0L)).toMap
108+
}
109+
}
110+
111+
def receiverInfo(receiverId: Int): Option[ReceiverInfo] = {
112+
receiverInfos.get(receiverId)
113+
}
114+
115+
def lastCompletedBatch: Option[BatchInfo] = {
116+
completedaBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
117+
}
118+
119+
def lastReceivedBatch: Option[BatchInfo] = {
120+
allBatches.lastOption
121+
}
122+
123+
private def allBatches: Seq[BatchInfo] = synchronized {
124+
(waitingBatchInfos.values.toSeq ++
125+
runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering)
126+
}
127+
128+
private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
129+
Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
130+
}
131+
}

0 commit comments

Comments
 (0)