File tree Expand file tree Collapse file tree 1 file changed +3
-3
lines changed
external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink Expand file tree Collapse file tree 1 file changed +3
-3
lines changed Original file line number Diff line number Diff line change @@ -18,7 +18,7 @@ package org.apache.spark.streaming.flume.sink
18
18
19
19
import java .net .InetSocketAddress
20
20
import java .util .concurrent .atomic .AtomicInteger
21
- import java .util .concurrent .{CountDownLatch , Executors }
21
+ import java .util .concurrent .{TimeUnit , CountDownLatch , Executors }
22
22
23
23
import scala .collection .JavaConversions ._
24
24
import scala .concurrent .{ExecutionContext , Future }
@@ -118,8 +118,7 @@ class SparkSinkSuite extends TestSuiteBase {
118
118
transceiversAndClients.foreach(x => {
119
119
Future {
120
120
val client = x._2
121
- var events : EventBatch = null
122
- events = client.getEventBatch(1000 )
121
+ val events = client.getEventBatch(1000 )
123
122
if (! failSome || counter.getAndIncrement() % 2 == 0 ) {
124
123
client.ack(events.getSequenceNumber)
125
124
} else {
@@ -137,6 +136,7 @@ class SparkSinkSuite extends TestSuiteBase {
137
136
}
138
137
})
139
138
batchCounter.await()
139
+ TimeUnit .SECONDS .sleep(1 ) // Allow the sink to commit the transactions.
140
140
executorContext.shutdown()
141
141
if (failSome) {
142
142
assert(availableChannelSlots(channel) === 3000 )
You can’t perform that action at this time.
0 commit comments