Skip to content

Commit 1ab423f

Browse files
zsxwingtdas
authored andcommitted
[SPARK-6766][Streaming] Fix issue about StreamingListenerBatchSubmitted and StreamingListenerBatchStarted (backport to branch 1.3)
Backport SPARK-6766 #5414 to branch 1.3 Conflicts: streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala Author: zsxwing <zsxwing@gmail.com> Closes #5452 from zsxwing/SPARK-6766-branch-1.3 and squashes the following commits: cb87e44 [zsxwing] [SPARK-6766][Streaming] Fix issue about StreamingListenerBatchSubmitted and StreamingListenerBatchStarted (backport to branch 1.3)
1 parent db2154d commit 1ab423f

File tree

4 files changed

+180
-18
lines changed

4 files changed

+180
-18
lines changed

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
105105
if (jobSet.jobs.isEmpty) {
106106
logInfo("No jobs added for time " + jobSet.time)
107107
} else {
108+
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
108109
jobSets.put(jobSet.time, jobSet)
109110
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
110111
logInfo("Added jobs for time " + jobSet.time)
@@ -134,10 +135,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
134135

135136
private def handleJobStart(job: Job) {
136137
val jobSet = jobSets.get(job.time)
137-
if (!jobSet.hasStarted) {
138+
val isFirstJobOfJobSet = !jobSet.hasStarted
139+
jobSet.handleJobStart(job)
140+
if (isFirstJobOfJobSet) {
141+
// "StreamingListenerBatchStarted" should be posted after calling "handleJobStart" to get the
142+
// correct "jobSet.processingStartTime".
138143
listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
139144
}
140-
jobSet.handleJobStart(job)
141145
logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
142146
}
143147

streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
3232

3333
private val waitingBatchInfos = new HashMap[Time, BatchInfo]
3434
private val runningBatchInfos = new HashMap[Time, BatchInfo]
35-
private val completedaBatchInfos = new Queue[BatchInfo]
35+
private val completedBatchInfos = new Queue[BatchInfo]
3636
private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
3737
private var totalCompletedBatches = 0L
3838
private var totalReceivedRecords = 0L
@@ -60,7 +60,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
6060
}
6161

6262
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) = synchronized {
63-
runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
63+
waitingBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
6464
}
6565

6666
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) = synchronized {
@@ -75,8 +75,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
7575
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) = synchronized {
7676
waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
7777
runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
78-
completedaBatchInfos.enqueue(batchCompleted.batchInfo)
79-
if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
78+
completedBatchInfos.enqueue(batchCompleted.batchInfo)
79+
if (completedBatchInfos.size > batchInfoLimit) completedBatchInfos.dequeue()
8080
totalCompletedBatches += 1L
8181

8282
batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
@@ -113,7 +113,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
113113
}
114114

115115
def retainedCompletedBatches: Seq[BatchInfo] = synchronized {
116-
completedaBatchInfos.toSeq
116+
completedBatchInfos.toSeq
117117
}
118118

119119
def processingDelayDistribution: Option[Distribution] = synchronized {
@@ -160,7 +160,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
160160
}
161161

162162
def lastCompletedBatch: Option[BatchInfo] = {
163-
completedaBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
163+
completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
164164
}
165165

166166
def lastReceivedBatch: Option[BatchInfo] = {
@@ -169,10 +169,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
169169

170170
private def retainedBatches: Seq[BatchInfo] = synchronized {
171171
(waitingBatchInfos.values.toSeq ++
172-
runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering)
172+
runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering)
173173
}
174174

175175
private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
176-
Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
176+
Distribution(completedBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
177177
}
178178
}

streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,38 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
4646
val collector = new BatchInfoCollector
4747
ssc.addStreamingListener(collector)
4848
runStreams(ssc, input.size, input.size)
49-
val batchInfos = collector.batchInfos
50-
batchInfos should have size 4
5149

52-
batchInfos.foreach(info => {
50+
// SPARK-6766: batch info should be submitted
51+
val batchInfosSubmitted = collector.batchInfosSubmitted
52+
batchInfosSubmitted should have size 4
53+
54+
batchInfosSubmitted.foreach(info => {
55+
info.schedulingDelay should be (None)
56+
info.processingDelay should be (None)
57+
info.totalDelay should be (None)
58+
})
59+
60+
isInIncreasingOrder(batchInfosSubmitted.map(_.submissionTime)) should be (true)
61+
62+
// SPARK-6766: processingStartTime of batch info should not be None when starting
63+
val batchInfosStarted = collector.batchInfosStarted
64+
batchInfosStarted should have size 4
65+
66+
batchInfosStarted.foreach(info => {
67+
info.schedulingDelay should not be None
68+
info.schedulingDelay.get should be >= 0L
69+
info.processingDelay should be (None)
70+
info.totalDelay should be (None)
71+
})
72+
73+
isInIncreasingOrder(batchInfosStarted.map(_.submissionTime)) should be (true)
74+
isInIncreasingOrder(batchInfosStarted.map(_.processingStartTime.get)) should be (true)
75+
76+
// test onBatchCompleted
77+
val batchInfosCompleted = collector.batchInfosCompleted
78+
batchInfosCompleted should have size 4
79+
80+
batchInfosCompleted.foreach(info => {
5381
info.schedulingDelay should not be None
5482
info.processingDelay should not be None
5583
info.totalDelay should not be None
@@ -58,9 +86,9 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
5886
info.totalDelay.get should be >= 0L
5987
})
6088

61-
isInIncreasingOrder(batchInfos.map(_.submissionTime)) should be (true)
62-
isInIncreasingOrder(batchInfos.map(_.processingStartTime.get)) should be (true)
63-
isInIncreasingOrder(batchInfos.map(_.processingEndTime.get)) should be (true)
89+
isInIncreasingOrder(batchInfosCompleted.map(_.submissionTime)) should be (true)
90+
isInIncreasingOrder(batchInfosCompleted.map(_.processingStartTime.get)) should be (true)
91+
isInIncreasingOrder(batchInfosCompleted.map(_.processingEndTime.get)) should be (true)
6492
}
6593

6694
test("receiver info reporting") {
@@ -99,9 +127,20 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
99127

100128
/** Listener that collects information on processed batches */
101129
class BatchInfoCollector extends StreamingListener {
102-
val batchInfos = new ArrayBuffer[BatchInfo]
130+
val batchInfosCompleted = new ArrayBuffer[BatchInfo]
131+
val batchInfosStarted = new ArrayBuffer[BatchInfo]
132+
val batchInfosSubmitted = new ArrayBuffer[BatchInfo]
133+
134+
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) {
135+
batchInfosSubmitted += batchSubmitted.batchInfo
136+
}
137+
138+
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) {
139+
batchInfosStarted += batchStarted.batchInfo
140+
}
141+
103142
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
104-
batchInfos += batchCompleted.batchInfo
143+
batchInfosCompleted += batchCompleted.batchInfo
105144
}
106145
}
107146

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.ui
19+
20+
import org.scalatest.Matchers
21+
22+
import org.apache.spark.streaming.dstream.DStream
23+
import org.apache.spark.streaming.scheduler._
24+
import org.apache.spark.streaming.{Time, Milliseconds, TestSuiteBase}
25+
26+
class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
27+
28+
val input = (1 to 4).map(Seq(_)).toSeq
29+
val operation = (d: DStream[Int]) => d.map(x => x)
30+
31+
override def batchDuration = Milliseconds(100)
32+
33+
test("onBatchSubmitted, onBatchStarted, onBatchCompleted, " +
34+
"onReceiverStarted, onReceiverError, onReceiverStopped") {
35+
val ssc = setupStreams(input, operation)
36+
val listener = new StreamingJobProgressListener(ssc)
37+
38+
val receivedBlockInfo = Map(
39+
0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)),
40+
1 -> Array(ReceivedBlockInfo(1, 300, null))
41+
)
42+
43+
// onBatchSubmitted
44+
val batchInfoSubmitted = BatchInfo(Time(1000), receivedBlockInfo, 1000, None, None)
45+
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
46+
listener.waitingBatches should be (List(batchInfoSubmitted))
47+
listener.runningBatches should be (Nil)
48+
listener.retainedCompletedBatches should be (Nil)
49+
listener.lastCompletedBatch should be (None)
50+
listener.numUnprocessedBatches should be (1)
51+
listener.numTotalCompletedBatches should be (0)
52+
listener.numTotalProcessedRecords should be (0)
53+
listener.numTotalReceivedRecords should be (0)
54+
55+
// onBatchStarted
56+
val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
57+
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
58+
listener.waitingBatches should be (Nil)
59+
listener.runningBatches should be (List(batchInfoStarted))
60+
listener.retainedCompletedBatches should be (Nil)
61+
listener.lastCompletedBatch should be (None)
62+
listener.numUnprocessedBatches should be (1)
63+
listener.numTotalCompletedBatches should be (0)
64+
listener.numTotalProcessedRecords should be (0)
65+
listener.numTotalReceivedRecords should be (600)
66+
67+
// onBatchCompleted
68+
val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
69+
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
70+
listener.waitingBatches should be (Nil)
71+
listener.runningBatches should be (Nil)
72+
listener.retainedCompletedBatches should be (List(batchInfoCompleted))
73+
listener.lastCompletedBatch should be (Some(batchInfoCompleted))
74+
listener.numUnprocessedBatches should be (0)
75+
listener.numTotalCompletedBatches should be (1)
76+
listener.numTotalProcessedRecords should be (600)
77+
listener.numTotalReceivedRecords should be (600)
78+
79+
// onReceiverStarted
80+
val receiverInfoStarted = ReceiverInfo(0, "test", null, true, "localhost")
81+
listener.onReceiverStarted(StreamingListenerReceiverStarted(receiverInfoStarted))
82+
listener.receiverInfo(0) should be (Some(receiverInfoStarted))
83+
listener.receiverInfo(1) should be (None)
84+
85+
// onReceiverError
86+
val receiverInfoError = ReceiverInfo(1, "test", null, true, "localhost")
87+
listener.onReceiverError(StreamingListenerReceiverError(receiverInfoError))
88+
listener.receiverInfo(0) should be (Some(receiverInfoStarted))
89+
listener.receiverInfo(1) should be (Some(receiverInfoError))
90+
listener.receiverInfo(2) should be (None)
91+
92+
// onReceiverStopped
93+
val receiverInfoStopped = ReceiverInfo(2, "test", null, true, "localhost")
94+
listener.onReceiverStopped(StreamingListenerReceiverStopped(receiverInfoStopped))
95+
listener.receiverInfo(0) should be (Some(receiverInfoStarted))
96+
listener.receiverInfo(1) should be (Some(receiverInfoError))
97+
listener.receiverInfo(2) should be (Some(receiverInfoStopped))
98+
listener.receiverInfo(3) should be (None)
99+
}
100+
101+
test("Remove the old completed batches when exceeding the limit") {
102+
val ssc = setupStreams(input, operation)
103+
val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
104+
val listener = new StreamingJobProgressListener(ssc)
105+
106+
val receivedBlockInfo = Map(
107+
0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)),
108+
1 -> Array(ReceivedBlockInfo(1, 300, null))
109+
)
110+
val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
111+
112+
for(_ <- 0 until (limit + 10)) {
113+
listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
114+
}
115+
116+
listener.retainedCompletedBatches.size should be (limit)
117+
listener.numTotalCompletedBatches should be(limit + 10)
118+
}
119+
}

0 commit comments

Comments
 (0)