@@ -45,8 +45,37 @@ class FlumePollingStreamSuite extends TestSuiteBase {
4545 val eventsPerBatch = 100
4646 val totalEventsPerChannel = batchCount * eventsPerBatch
4747 val channelCapacity = 5000
48+ val maxAttempts = 5
4849
4950 test(" flume polling test" ) {
51+ testMultipleTimes(testFlumePolling)
52+ }
53+
54+ test(" flume polling test multiple hosts" ) {
55+ testMultipleTimes(testFlumePollingMultipleHost)
56+ }
57+
58+ /**
59+ * Run the given test until no more java.net.BindException's are thrown.
60+ * Do this only up to a certain attempt limit.
61+ */
62+ private def testMultipleTimes (test : () => Unit ): Unit = {
63+ var testPassed = false
64+ var attempt = 0
65+ while (! testPassed && attempt < maxAttempts) {
66+ try {
67+ test()
68+ testPassed = true
69+ } catch {
70+ case e : java.net.BindException =>
71+ logError(" Exception when running flume polling test" , e)
72+ attempt += 1
73+ }
74+ }
75+ assert(testPassed, s " Test failed after $attempt attempts! " )
76+ }
77+
78+ private def testFlumePolling (): Unit = {
5079 val testPort = getTestPort
5180 // Set up the streaming context and input streams
5281 val ssc = new StreamingContext (conf, batchDuration)
@@ -80,7 +109,7 @@ class FlumePollingStreamSuite extends TestSuiteBase {
80109 channel.stop()
81110 }
82111
83- test( " flume polling test multiple hosts " ) {
112+ private def testFlumePollingMultipleHost () : Unit = {
84113 val testPort = getTestPort
85114 // Set up the streaming context and input streams
86115 val ssc = new StreamingContext (conf, batchDuration)
0 commit comments