Skip to content

Commit dd653a1

Browse files
committed
Add timeline and histogram graphs for streaming statistics
1 parent 6be9189 commit dd653a1

File tree

9 files changed

+468
-32
lines changed

9 files changed

+468
-32
lines changed

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ log4j-defaults.properties
3030
bootstrap-tooltip.js
3131
jquery-1.11.1.min.js
3232
sorttable.js
33+
d3.min.js
3334
.*avsc
3435
.*txt
3536
.*json

LICENSE

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,36 @@ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
643643
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
644644
THE SOFTWARE.
645645

646+
========================================================================
647+
For d3 (core/src/main/resources/org/apache/spark/ui/static/d3.min.js):
648+
========================================================================
649+
650+
Copyright (c) 2010-2015, Michael Bostock
651+
All rights reserved.
652+
653+
Redistribution and use in source and binary forms, with or without
654+
modification, are permitted provided that the following conditions are met:
655+
656+
* Redistributions of source code must retain the above copyright notice, this
657+
list of conditions and the following disclaimer.
658+
659+
* Redistributions in binary form must reproduce the above copyright notice,
660+
this list of conditions and the following disclaimer in the documentation
661+
and/or other materials provided with the distribution.
662+
663+
* The name Michael Bostock may not be used to endorse or promote products
664+
derived from this software without specific prior written permission.
665+
666+
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
667+
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
668+
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
669+
DISCLAIMED. IN NO EVENT SHALL MICHAEL BOSTOCK BE LIABLE FOR ANY DIRECT,
670+
INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
671+
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
672+
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
673+
OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
674+
NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
675+
EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
646676

647677
========================================================================
648678
For Scala Interpreter classes (all .scala files in repl/src/main/scala

core/src/main/resources/org/apache/spark/ui/static/d3.min.js

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
.graph {
19+
font: 10px sans-serif;
20+
}
21+
22+
.axis path, .axis line {
23+
fill: none;
24+
stroke: #000;
25+
shape-rendering: crispEdges;
26+
}
27+
28+
.line {
29+
fill: none;
30+
stroke: steelblue;
31+
stroke-width: 1.5px;
32+
}
33+
34+
.bar rect {
35+
fill: steelblue;
36+
shape-rendering: crispEdges;
37+
}
38+
39+
.bar text {
40+
fill: #000;
41+
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
function drawTimeline(id, data, unit) {
19+
var margin = {top: 20, right: 20, bottom: 70, left: 50};
20+
var width = 500 - margin.left - margin.right;
21+
var height = 250 - margin.top - margin.bottom;
22+
23+
var x = d3.time.scale().range([0, width]);
24+
var y = d3.scale.linear().range([height, 0]);
25+
26+
var xAxis = d3.svg.axis().scale(x).orient("bottom").tickFormat(d3.time.format("%H:%M:%S"));
27+
var yAxis = d3.svg.axis().scale(y).orient("left");
28+
29+
var line = d3.svg.line()
30+
.x(function(d) { return x(new Date(d.x)); })
31+
.y(function(d) { return y(d.y); });
32+
33+
var svg = d3.select(id).append("svg")
34+
.attr("width", width + margin.left + margin.right)
35+
.attr("height", height + margin.top + margin.bottom)
36+
.append("g")
37+
.attr("transform", "translate(" + margin.left + "," + margin.top + ")");
38+
39+
x.domain(d3.extent(data, function(d) { return d.x; }));
40+
y.domain(d3.extent(data, function(d) { return d.y; }));
41+
42+
svg.append("g")
43+
.attr("class", "x axis")
44+
.attr("transform", "translate(0," + height + ")")
45+
.call(xAxis)
46+
.selectAll("text")
47+
.style("text-anchor", "end")
48+
.attr("dx", "-.8em")
49+
.attr("dy", ".15em")
50+
.attr("transform", "rotate(-65)");
51+
52+
svg.append("g")
53+
.attr("class", "y axis")
54+
.call(yAxis)
55+
.append("text")
56+
.attr("x", 25)
57+
.attr("y", -5)
58+
.attr("dy", ".71em")
59+
.style("text-anchor", "end")
60+
.text(unit);
61+
62+
svg.append("path")
63+
.datum(data)
64+
.attr("class", "line")
65+
.attr("d", line);
66+
}
67+
68+
function drawDistribution(id, values, unit) {
69+
var margin = {top: 10, right: 30, bottom: 30, left: 50};
70+
var width = 500 - margin.left - margin.right;
71+
var height = 250 - margin.top - margin.bottom;
72+
73+
var y = d3.scale.linear()
74+
.domain(d3.extent(values, function(d) { return d; }))
75+
.range([height, 0]);
76+
var data = d3.layout.histogram().bins(y.ticks(10))(values);
77+
console.log(values)
78+
console.log(data)
79+
var barHeight = height / data.length
80+
81+
var x = d3.scale.linear()
82+
.domain([0, d3.max(data, function(d) { return d.y; })])
83+
.range([0, width]);
84+
85+
var xAxis = d3.svg.axis().scale(x).orient("bottom");
86+
var yAxis = d3.svg.axis().scale(y).orient("left");
87+
88+
var svg = d3.select(id).append("svg")
89+
.attr("width", width + margin.left + margin.right)
90+
.attr("height", height + margin.top + margin.bottom)
91+
.append("g")
92+
.attr("transform", "translate(" + margin.left + "," + margin.top + ")");
93+
94+
var bar = svg.selectAll(".bar")
95+
.data(data)
96+
.enter().append("g")
97+
.attr("class", "bar")
98+
99+
bar.append("rect")
100+
.attr("x", 1)
101+
.attr("y", function(d) { return y(d.x) - margin.bottom + margin.top; })
102+
.attr("width", function(d) { return x(d.y); })
103+
.attr("height", function(d) { return (height - margin.bottom) / data.length; });
104+
105+
bar.append("text")
106+
.attr("dy", ".75em")
107+
.attr("x", function(d) { return x(d.y) + 10; })
108+
.attr("y", function(d) { return y(d.x) + 16 - margin.bottom; })
109+
.attr("text-anchor", "middle")
110+
.text(function(d) { return d.y; });
111+
112+
svg.append("g")
113+
.attr("class", "x axis")
114+
.attr("transform", "translate(0," + height + ")")
115+
.call(xAxis);
116+
117+
svg.append("g")
118+
.attr("class", "y axis")
119+
.call(yAxis);
120+
}

core/src/main/scala/org/apache/spark/util/Distribution.scala

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package org.apache.spark.util
2020
import java.io.PrintStream
2121

2222
import scala.collection.immutable.IndexedSeq
23+
import scala.collection.mutable.ArrayBuffer
2324

2425
/**
2526
* Util for getting some stats from a small sample of numeric values, with some handy
@@ -67,6 +68,34 @@ private[spark] class Distribution(val data: Array[Double], val startIdx: Int, va
6768
out.println(statCounter)
6869
showQuantiles(out)
6970
}
71+
72+
def histogram(bins: Int): Seq[(Double, Int)] = {
73+
require(bins > 0)
74+
val stat = statCounter
75+
val binSize = (stat.max - stat.min) / (bins - 1)
76+
val points = new Array[Double](bins)
77+
for (i <- 0 until (bins - 1)) {
78+
points(i) = stat.min + binSize * i
79+
}
80+
points(bins - 1) = stat.max
81+
val counts = new Array[Int](bins)
82+
for (i <- startIdx until endIdx) {
83+
val v = data(i)
84+
var j = 0
85+
var find = false
86+
while (!find && j < bins - 1) {
87+
if (v < points(j) + binSize / 2) {
88+
counts(j) += 1
89+
find = true
90+
}
91+
j += 1
92+
}
93+
if (!find) {
94+
counts(bins - 1) += 1
95+
}
96+
}
97+
points.zip(counts)
98+
}
7099
}
71100

72101
private[spark] object Distribution {

streaming/src/main/scala/org/apache/spark/streaming/scheduler/BatchInfo.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,4 +58,11 @@ case class BatchInfo(
5858
*/
5959
def totalDelay: Option[Long] = schedulingDelay.zip(processingDelay)
6060
.map(x => x._1 + x._2).headOption
61+
62+
/**
63+
* The number of recorders received by the receivers in this batch.
64+
*/
65+
def numRecords: Long = receivedBlockInfo.map { case (_, infos) =>
66+
infos.map(_.numRecords).sum
67+
}.sum
6168
}

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
3939
private var totalReceivedRecords = 0L
4040
private var totalProcessedRecords = 0L
4141
private val receiverInfos = new HashMap[Int, ReceiverInfo]
42+
private val receiverLastErrorTime = new HashMap[Int, Long]
4243

4344
val batchDuration = ssc.graph.batchDuration.milliseconds
4445

@@ -51,6 +52,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
5152
override def onReceiverError(receiverError: StreamingListenerReceiverError) {
5253
synchronized {
5354
receiverInfos(receiverError.receiverInfo.streamId) = receiverError.receiverInfo
55+
receiverLastErrorTime(receiverError.receiverInfo.streamId) = System.currentTimeMillis()
5456
}
5557
}
5658

@@ -149,6 +151,31 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
149151
}.toMap
150152
}
151153

154+
def receivedRecordsWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized {
155+
val latestBlockInfos = retainedBatches.take(batchInfoLimit).map { batchInfo =>
156+
(batchInfo.batchTime.milliseconds, batchInfo.receivedBlockInfo)
157+
}
158+
(0 until numReceivers).map { receiverId =>
159+
val blockInfoOfParticularReceiver = latestBlockInfos.map {
160+
case (batchTime, receivedBlockInfo) =>
161+
(batchTime, receivedBlockInfo.get(receiverId).getOrElse(Array.empty))
162+
}
163+
val recordsOfParticularReceiver = blockInfoOfParticularReceiver.map {
164+
case (batchTime, blockInfo) =>
165+
// calculate records per second for each batch
166+
(batchTime, blockInfo.map(_.numRecords).sum.toDouble * 1000 / batchDuration)
167+
}
168+
(receiverId, recordsOfParticularReceiver)
169+
}.toMap
170+
}
171+
172+
173+
def allReceivedRecordsWithBatchTime: Seq[(Long, Double)] = synchronized {
174+
retainedBatches.take(batchInfoLimit).map { batchInfo =>
175+
(batchInfo.batchTime.milliseconds, batchInfo.numRecords.toDouble * 1000 / batchDuration)
176+
}
177+
}
178+
152179
def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
153180
val lastReceivedBlockInfoOption = lastReceivedBatch.map(_.receivedBlockInfo)
154181
lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
@@ -164,6 +191,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
164191
receiverInfos.get(receiverId)
165192
}
166193

194+
def receiverLastErrorTimeo(receiverId: Int): Option[Long] = synchronized {
195+
receiverLastErrorTime.get(receiverId)
196+
}
197+
167198
def lastCompletedBatch: Option[BatchInfo] = synchronized {
168199
completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
169200
}

0 commit comments

Comments
 (0)