@@ -208,4 +208,51 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
208
208
(listener.waitingBatches.size + listener.runningBatches.size +
209
209
listener.retainedCompletedBatches.size + 10 )
210
210
}
211
+
212
+ test(" detect memory leak" ) {
213
+ val ssc = setupStreams(input, operation)
214
+ val listener = new StreamingJobProgressListener (ssc)
215
+
216
+ val limit = ssc.conf.getInt(" spark.streaming.ui.retainedBatches" , 1000 )
217
+
218
+ for (_ <- 0 until 2 * limit) {
219
+ val receivedBlockInfo = Map (
220
+ 0 -> Array (ReceivedBlockInfo (0 , 100 , null ), ReceivedBlockInfo (0 , 200 , null )),
221
+ 1 -> Array (ReceivedBlockInfo (1 , 300 , null ))
222
+ )
223
+
224
+ // onBatchSubmitted
225
+ val batchInfoSubmitted = BatchInfo (Time (1000 ), receivedBlockInfo, 1000 , None , None )
226
+ listener.onBatchSubmitted(StreamingListenerBatchSubmitted (batchInfoSubmitted))
227
+
228
+ // onBatchStarted
229
+ val batchInfoStarted = BatchInfo (Time (1000 ), receivedBlockInfo, 1000 , Some (2000 ), None )
230
+ listener.onBatchStarted(StreamingListenerBatchStarted (batchInfoStarted))
231
+
232
+ // onJobStart
233
+ val jobStart1 = createJobStart(Time (1000 ), outputOpId = 0 , jobId = 0 )
234
+ listener.onJobStart(jobStart1)
235
+
236
+ val jobStart2 = createJobStart(Time (1000 ), outputOpId = 0 , jobId = 1 )
237
+ listener.onJobStart(jobStart2)
238
+
239
+ val jobStart3 = createJobStart(Time (1000 ), outputOpId = 1 , jobId = 0 )
240
+ listener.onJobStart(jobStart3)
241
+
242
+ val jobStart4 = createJobStart(Time (1000 ), outputOpId = 1 , jobId = 1 )
243
+ listener.onJobStart(jobStart4)
244
+
245
+ // onBatchCompleted
246
+ val batchInfoCompleted = BatchInfo (Time (1000 ), receivedBlockInfo, 1000 , Some (2000 ), None )
247
+ listener.onBatchCompleted(StreamingListenerBatchCompleted (batchInfoCompleted))
248
+ }
249
+
250
+ listener.waitingBatches.size should be (0 )
251
+ listener.runningBatches.size should be (0 )
252
+ listener.retainedCompletedBatches.size should be (limit)
253
+ listener.batchTimeToOutputOpIdSparkJobIdPair.size() should be <=
254
+ (listener.waitingBatches.size + listener.runningBatches.size +
255
+ listener.retainedCompletedBatches.size + 10 )
256
+ }
257
+
211
258
}
0 commit comments