Skip to content

Commit eae4ad6

Browse files
committed
Refectored KafkaStreamSuiteBased to eliminate KafkaTestUtils and made Java more robust.
1 parent fab14c7 commit eae4ad6

File tree

2 files changed

+73
-87
lines changed

2 files changed

+73
-87
lines changed

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,11 +127,16 @@ public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
127127
);
128128

129129
ssc.start();
130-
ssc.awaitTermination(3000);
131-
130+
long startTime = System.currentTimeMillis();
131+
boolean sizeMatches = false;
132+
while (!sizeMatches && System.currentTimeMillis() - startTime < 20000) {
133+
sizeMatches = sent.size() == result.size();
134+
Thread.sleep(200);
135+
}
132136
Assert.assertEquals(sent.size(), result.size());
133137
for (String k : sent.keySet()) {
134138
Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue());
135139
}
140+
ssc.stop();
136141
}
137142
}

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala

Lines changed: 66 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,7 @@ import org.apache.spark.util.Utils
4646
* This is an abstract base class for Kafka testsuites. This has the functionality to set up
4747
* and tear down local Kafka servers, and to push data using Kafka producers.
4848
*/
49-
abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
50-
import KafkaTestUtils._
49+
abstract class KafkaStreamSuiteBase extends FunSuite with Eventually with Logging {
5150

5251
var zkAddress: String = _
5352
var zkClient: ZkClient = _
@@ -78,7 +77,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
7877
var bindSuccess: Boolean = false
7978
while(!bindSuccess) {
8079
try {
81-
val brokerProps = getBrokerConfig(brokerPort, zkAddress)
80+
val brokerProps = getBrokerConfig()
8281
brokerConf = new KafkaConfig(brokerProps)
8382
server = new KafkaServer(brokerConf)
8483
logInfo("==================== 2 ====================")
@@ -134,111 +133,43 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
134133
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
135134
logInfo("==================== 5 ====================")
136135
// wait until metadata is propagated
137-
waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000)
136+
waitUntilMetadataIsPropagated(topic, 0)
138137
}
139138

140139
def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
141-
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
142-
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
140+
producer = new Producer[String, String](new ProducerConfig(getProducerConfig()))
143141
producer.send(createTestMessage(topic, sent): _*)
144142
producer.close()
145143
logInfo("==================== 6 ====================")
146144
}
147-
}
148-
149-
class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
150-
var ssc: StreamingContext = _
151-
152-
before {
153-
setupKafka()
154-
}
155-
156-
after {
157-
if (ssc != null) {
158-
ssc.stop()
159-
ssc = null
160-
}
161-
tearDownKafka()
162-
}
163-
164-
test("Kafka input stream") {
165-
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
166-
ssc = new StreamingContext(sparkConf, Milliseconds(500))
167-
val topic = "topic1"
168-
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
169-
createTopic(topic)
170-
produceAndSendMessage(topic, sent)
171-
172-
val kafkaParams = Map("zookeeper.connect" -> zkAddress,
173-
"group.id" -> s"test-consumer-${Random.nextInt(10000)}",
174-
"auto.offset.reset" -> "smallest")
175-
176-
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
177-
ssc,
178-
kafkaParams,
179-
Map(topic -> 1),
180-
StorageLevel.MEMORY_ONLY)
181-
val result = new mutable.HashMap[String, Long]()
182-
stream.map { case (k, v) => v }
183-
.countByValue()
184-
.foreachRDD { r =>
185-
val ret = r.collect()
186-
ret.toMap.foreach { kv =>
187-
val count = result.getOrElseUpdate(kv._1, 0) + kv._2
188-
result.put(kv._1, count)
189-
}
190-
}
191-
ssc.start()
192-
eventually(timeout(3000 milliseconds), interval(100 milliseconds)) {
193-
assert(sent.size === result.size)
194-
sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
195-
}
196-
197-
ssc.stop()
198-
}
199-
}
200-
201145

202-
object KafkaTestUtils {
203-
204-
def getBrokerConfig(port: Int, zkConnect: String): Properties = {
146+
private def getBrokerConfig(): Properties = {
205147
val props = new Properties()
206148
props.put("broker.id", "0")
207149
props.put("host.name", "localhost")
208-
props.put("port", port.toString)
150+
props.put("port", brokerPort.toString)
209151
props.put("log.dir", Utils.createTempDir().getAbsolutePath)
210-
props.put("zookeeper.connect", zkConnect)
152+
props.put("zookeeper.connect", zkAddress)
211153
props.put("log.flush.interval.messages", "1")
212154
props.put("replica.socket.timeout.ms", "1500")
213155
props
214156
}
215157

216-
def getProducerConfig(brokerList: String): Properties = {
158+
private def getProducerConfig(): Properties = {
159+
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
217160
val props = new Properties()
218-
props.put("metadata.broker.list", brokerList)
161+
props.put("metadata.broker.list", brokerAddr)
219162
props.put("serializer.class", classOf[StringEncoder].getName)
220163
props
221164
}
222165

223-
def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = {
224-
val startTime = System.currentTimeMillis()
225-
while (true) {
226-
if (condition())
227-
return true
228-
if (System.currentTimeMillis() > startTime + waitTime)
229-
return false
230-
Thread.sleep(waitTime.min(100L))
166+
private def waitUntilMetadataIsPropagated(topic: String, partition: Int) {
167+
eventually(timeout(1000 milliseconds), interval(100 milliseconds)) {
168+
assert(
169+
server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, partition)),
170+
s"Partition [$topic, $partition] metadata not propagated after timeout"
171+
)
231172
}
232-
// Should never go to here
233-
throw new RuntimeException("unexpected error")
234-
}
235-
236-
def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int,
237-
timeout: Long) {
238-
assert(waitUntilTrue(() =>
239-
servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(
240-
TopicAndPartition(topic, partition))), timeout),
241-
s"Partition [$topic, $partition] metadata not propagated after timeout")
242173
}
243174

244175
class EmbeddedZookeeper(val zkConnect: String) {
@@ -264,3 +195,53 @@ object KafkaTestUtils {
264195
}
265196
}
266197
}
198+
199+
200+
class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
201+
var ssc: StreamingContext = _
202+
203+
before {
204+
setupKafka()
205+
}
206+
207+
after {
208+
if (ssc != null) {
209+
ssc.stop()
210+
ssc = null
211+
}
212+
tearDownKafka()
213+
}
214+
215+
test("Kafka input stream") {
216+
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
217+
ssc = new StreamingContext(sparkConf, Milliseconds(500))
218+
val topic = "topic1"
219+
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
220+
createTopic(topic)
221+
produceAndSendMessage(topic, sent)
222+
223+
val kafkaParams = Map("zookeeper.connect" -> zkAddress,
224+
"group.id" -> s"test-consumer-${Random.nextInt(10000)}",
225+
"auto.offset.reset" -> "smallest")
226+
227+
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
228+
ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
229+
val result = new mutable.HashMap[String, Long]()
230+
stream.map(_._2).countByValue().foreachRDD { r =>
231+
val ret = r.collect()
232+
ret.toMap.foreach { kv =>
233+
val count = result.getOrElseUpdate(kv._1, 0) + kv._2
234+
result.put(kv._1, count)
235+
}
236+
}
237+
ssc.start()
238+
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
239+
assert(sent.size === result.size)
240+
sent.keys.foreach { k =>
241+
assert(sent(k) === result(k).toInt)
242+
}
243+
}
244+
ssc.stop()
245+
}
246+
}
247+

0 commit comments

Comments
 (0)