Skip to content

Commit c6889d2

Browse files
andrewor14tdas
authored andcommitted
[HOTFIX][Streaming] Handle port collisions in flume polling test
This is failing my tests in #1777. @tdas Author: Andrew Or <andrewor14@gmail.com> Closes #1803 from andrewor14/fix-flaky-streaming-test and squashes the following commits: ea11a03 [Andrew Or] Catch all exceptions caused by BindExceptions 54a0ca0 [Andrew Or] Merge branch 'master' of github.com:apache/spark into fix-flaky-streaming-test 664095c [Andrew Or] Tone down bind exception message af3ddc9 [Andrew Or] Handle port collisions in flume polling test
1 parent e537b33 commit c6889d2

File tree

1 file changed

+31
-1
lines changed

1 file changed

+31
-1
lines changed

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import org.apache.spark.streaming.dstream.ReceiverInputDStream
3535
import org.apache.spark.streaming.util.ManualClock
3636
import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext}
3737
import org.apache.spark.streaming.flume.sink._
38+
import org.apache.spark.util.Utils
3839

3940
class FlumePollingStreamSuite extends TestSuiteBase {
4041

@@ -45,8 +46,37 @@ class FlumePollingStreamSuite extends TestSuiteBase {
4546
val eventsPerBatch = 100
4647
val totalEventsPerChannel = batchCount * eventsPerBatch
4748
val channelCapacity = 5000
49+
val maxAttempts = 5
4850

4951
test("flume polling test") {
52+
testMultipleTimes(testFlumePolling)
53+
}
54+
55+
test("flume polling test multiple hosts") {
56+
testMultipleTimes(testFlumePollingMultipleHost)
57+
}
58+
59+
/**
60+
* Run the given test until no more java.net.BindException's are thrown.
61+
* Do this only up to a certain attempt limit.
62+
*/
63+
private def testMultipleTimes(test: () => Unit): Unit = {
64+
var testPassed = false
65+
var attempt = 0
66+
while (!testPassed && attempt < maxAttempts) {
67+
try {
68+
test()
69+
testPassed = true
70+
} catch {
71+
case e: Exception if Utils.isBindCollision(e) =>
72+
logWarning("Exception when running flume polling test: " + e)
73+
attempt += 1
74+
}
75+
}
76+
assert(testPassed, s"Test failed after $attempt attempts!")
77+
}
78+
79+
private def testFlumePolling(): Unit = {
5080
val testPort = getTestPort
5181
// Set up the streaming context and input streams
5282
val ssc = new StreamingContext(conf, batchDuration)
@@ -80,7 +110,7 @@ class FlumePollingStreamSuite extends TestSuiteBase {
80110
channel.stop()
81111
}
82112

83-
test("flume polling test multiple hosts") {
113+
private def testFlumePollingMultipleHost(): Unit = {
84114
val testPort = getTestPort
85115
// Set up the streaming context and input streams
86116
val ssc = new StreamingContext(conf, batchDuration)

0 commit comments

Comments
 (0)