Skip to content

Commit 5222330

Browse files
committed
Change JavaKafkaStreamSuite to better test it
1 parent 5525f10 commit 5222330

File tree

1 file changed

+21
-13
lines changed

1 file changed

+21
-13
lines changed

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,11 @@
2727

2828
import junit.framework.Assert;
2929

30+
import kafka.serializer.StringDecoder;
31+
3032
import org.apache.spark.api.java.JavaPairRDD;
3133
import org.apache.spark.api.java.function.Function;
34+
import org.apache.spark.storage.StorageLevel;
3235
import org.apache.spark.streaming.Duration;
3336
import org.apache.spark.streaming.LocalJavaStreamingContext;
3437
import org.apache.spark.streaming.api.java.JavaDStream;
@@ -65,17 +68,31 @@ public void testKafkaStream() throws InterruptedException {
6568
String topic = "topic1";
6669
HashMap<String, Integer> topics = new HashMap<String, Integer>();
6770
topics.put(topic, 1);
68-
testSuite.createTopic(topic);
6971

7072
HashMap<String, Integer> sent = new HashMap<String, Integer>();
7173
sent.put("a", 5);
7274
sent.put("b", 3);
7375
sent.put("c", 10);
7476

77+
testSuite.createTopic(topic);
78+
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
79+
testSuite.produceAndSendMessage(topic,
80+
JavaConverters.asScalaMapConverter(tmp).asScala().toMap(
81+
Predef.<Tuple2<String, Object>>conforms()));
82+
83+
HashMap<String, String> kafkaParams = new HashMap<String, String>();
84+
kafkaParams.put("zookeeper.connect", testSuite.zkConnect());
85+
kafkaParams.put("group.id", "test-consumer-" + testSuite.random().nextInt(10000));
86+
kafkaParams.put("auto.offset.reset", "smallest");
87+
7588
JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
76-
testSuite.zkConnect(),
77-
"group",
78-
topics);
89+
String.class,
90+
String.class,
91+
StringDecoder.class,
92+
StringDecoder.class,
93+
kafkaParams,
94+
topics,
95+
StorageLevel.MEMORY_ONLY_SER());
7996

8097
final HashMap<String, Long> result = new HashMap<String, Long>();
8198

@@ -107,15 +124,6 @@ public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
107124
);
108125

109126
ssc.start();
110-
111-
// Sleep to let Receiver start first
112-
Thread.sleep(3000);
113-
114-
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
115-
testSuite.produceAndSendMessage(topic,
116-
JavaConverters.asScalaMapConverter(tmp).asScala().toMap(
117-
Predef.<Tuple2<String, Object>>conforms()));
118-
119127
ssc.awaitTermination(3000);
120128

121129
Assert.assertEquals(sent.size(), result.size());

0 commit comments

Comments
 (0)