Skip to content

Commit b92a450

Browse files
jerryshaotdas
authored andcommitted
[SPARK-1022][Streaming] Add Kafka real unit test
This PR is a updated version of (#557) to actually test sending and receiving data through Kafka, and fix previous flaky issues. @tdas, would you mind reviewing this PR? Thanks a lot. Author: jerryshao <saisai.shao@intel.com> Closes #1751 from jerryshao/kafka-unit-test and squashes the following commits: b6a505f [jerryshao] code refactor according to comments 5222330 [jerryshao] Change JavaKafkaStreamSuite to better test it 5525f10 [jerryshao] Fix flaky issue of Kafka real unit test 4559310 [jerryshao] Minor changes for Kafka unit test 860f649 [jerryshao] Minor style changes, and tests ignored due to flakiness 796d4ca [jerryshao] Add real Kafka streaming test
1 parent 075ba67 commit b92a450

File tree

3 files changed

+293
-35
lines changed

3 files changed

+293
-35
lines changed

external/kafka/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@
7070
</exclusion>
7171
</exclusions>
7272
</dependency>
73+
<dependency>
74+
<groupId>net.sf.jopt-simple</groupId>
75+
<artifactId>jopt-simple</artifactId>
76+
<version>3.2</version>
77+
<scope>test</scope>
78+
</dependency>
7379
<dependency>
7480
<groupId>org.scalatest</groupId>
7581
<artifactId>scalatest_${scala.binary.version}</artifactId>

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java

Lines changed: 106 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,118 @@
1717

1818
package org.apache.spark.streaming.kafka;
1919

20+
import java.io.Serializable;
2021
import java.util.HashMap;
22+
import java.util.List;
23+
24+
import scala.Predef;
25+
import scala.Tuple2;
26+
import scala.collection.JavaConverters;
27+
28+
import junit.framework.Assert;
2129

22-
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
23-
import org.junit.Test;
24-
import com.google.common.collect.Maps;
2530
import kafka.serializer.StringDecoder;
31+
32+
import org.apache.spark.api.java.JavaPairRDD;
33+
import org.apache.spark.api.java.function.Function;
2634
import org.apache.spark.storage.StorageLevel;
35+
import org.apache.spark.streaming.Duration;
2736
import org.apache.spark.streaming.LocalJavaStreamingContext;
37+
import org.apache.spark.streaming.api.java.JavaDStream;
38+
import org.apache.spark.streaming.api.java.JavaPairDStream;
39+
import org.apache.spark.streaming.api.java.JavaStreamingContext;
40+
41+
import org.junit.Test;
42+
import org.junit.After;
43+
import org.junit.Before;
44+
45+
public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements Serializable {
46+
private transient KafkaStreamSuite testSuite = new KafkaStreamSuite();
47+
48+
@Before
49+
@Override
50+
public void setUp() {
51+
testSuite.beforeFunction();
52+
System.clearProperty("spark.driver.port");
53+
//System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock");
54+
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
55+
}
56+
57+
@After
58+
@Override
59+
public void tearDown() {
60+
ssc.stop();
61+
ssc = null;
62+
System.clearProperty("spark.driver.port");
63+
testSuite.afterFunction();
64+
}
2865

29-
public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
3066
@Test
31-
public void testKafkaStream() {
32-
HashMap<String, Integer> topics = Maps.newHashMap();
33-
34-
// tests the API, does not actually test data receiving
35-
JavaPairReceiverInputDStream<String, String> test1 =
36-
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
37-
JavaPairReceiverInputDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
38-
StorageLevel.MEMORY_AND_DISK_SER_2());
39-
40-
HashMap<String, String> kafkaParams = Maps.newHashMap();
41-
kafkaParams.put("zookeeper.connect", "localhost:12345");
42-
kafkaParams.put("group.id","consumer-group");
43-
JavaPairReceiverInputDStream<String, String> test3 = KafkaUtils.createStream(ssc,
44-
String.class, String.class, StringDecoder.class, StringDecoder.class,
45-
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
67+
public void testKafkaStream() throws InterruptedException {
68+
String topic = "topic1";
69+
HashMap<String, Integer> topics = new HashMap<String, Integer>();
70+
topics.put(topic, 1);
71+
72+
HashMap<String, Integer> sent = new HashMap<String, Integer>();
73+
sent.put("a", 5);
74+
sent.put("b", 3);
75+
sent.put("c", 10);
76+
77+
testSuite.createTopic(topic);
78+
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
79+
testSuite.produceAndSendMessage(topic,
80+
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
81+
Predef.<Tuple2<String, Object>>conforms()));
82+
83+
HashMap<String, String> kafkaParams = new HashMap<String, String>();
84+
kafkaParams.put("zookeeper.connect", testSuite.zkConnect());
85+
kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000));
86+
kafkaParams.put("auto.offset.reset", "smallest");
87+
88+
JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
89+
String.class,
90+
String.class,
91+
StringDecoder.class,
92+
StringDecoder.class,
93+
kafkaParams,
94+
topics,
95+
StorageLevel.MEMORY_ONLY_SER());
96+
97+
final HashMap<String, Long> result = new HashMap<String, Long>();
98+
99+
JavaDStream<String> words = stream.map(
100+
new Function<Tuple2<String, String>, String>() {
101+
@Override
102+
public String call(Tuple2<String, String> tuple2) throws Exception {
103+
return tuple2._2();
104+
}
105+
}
106+
);
107+
108+
words.countByValue().foreachRDD(
109+
new Function<JavaPairRDD<String, Long>, Void>() {
110+
@Override
111+
public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
112+
List<Tuple2<String, Long>> ret = rdd.collect();
113+
for (Tuple2<String, Long> r : ret) {
114+
if (result.containsKey(r._1())) {
115+
result.put(r._1(), result.get(r._1()) + r._2());
116+
} else {
117+
result.put(r._1(), r._2());
118+
}
119+
}
120+
121+
return null;
122+
}
123+
}
124+
);
125+
126+
ssc.start();
127+
ssc.awaitTermination(3000);
128+
129+
Assert.assertEquals(sent.size(), result.size());
130+
for (String k : sent.keySet()) {
131+
Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue());
132+
}
46133
}
47134
}

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala

Lines changed: 181 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,193 @@
1717

1818
package org.apache.spark.streaming.kafka
1919

20-
import kafka.serializer.StringDecoder
20+
import java.io.File
21+
import java.net.InetSocketAddress
22+
import java.util.{Properties, Random}
23+
24+
import scala.collection.mutable
25+
26+
import kafka.admin.CreateTopicCommand
27+
import kafka.common.TopicAndPartition
28+
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
29+
import kafka.utils.ZKStringSerializer
30+
import kafka.serializer.{StringDecoder, StringEncoder}
31+
import kafka.server.{KafkaConfig, KafkaServer}
32+
33+
import org.I0Itec.zkclient.ZkClient
34+
35+
import org.apache.zookeeper.server.ZooKeeperServer
36+
import org.apache.zookeeper.server.NIOServerCnxnFactory
37+
2138
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
2239
import org.apache.spark.storage.StorageLevel
23-
import org.apache.spark.streaming.dstream.ReceiverInputDStream
40+
import org.apache.spark.util.Utils
2441

2542
class KafkaStreamSuite extends TestSuiteBase {
43+
import KafkaTestUtils._
44+
45+
val zkConnect = "localhost:2181"
46+
val zkConnectionTimeout = 6000
47+
val zkSessionTimeout = 6000
48+
49+
val brokerPort = 9092
50+
val brokerProps = getBrokerConfig(brokerPort, zkConnect)
51+
val brokerConf = new KafkaConfig(brokerProps)
52+
53+
protected var zookeeper: EmbeddedZookeeper = _
54+
protected var zkClient: ZkClient = _
55+
protected var server: KafkaServer = _
56+
protected var producer: Producer[String, String] = _
57+
58+
override def useManualClock = false
59+
60+
override def beforeFunction() {
61+
// Zookeeper server startup
62+
zookeeper = new EmbeddedZookeeper(zkConnect)
63+
logInfo("==================== 0 ====================")
64+
zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
65+
logInfo("==================== 1 ====================")
2666

27-
test("kafka input stream") {
67+
// Kafka broker startup
68+
server = new KafkaServer(brokerConf)
69+
logInfo("==================== 2 ====================")
70+
server.startup()
71+
logInfo("==================== 3 ====================")
72+
Thread.sleep(2000)
73+
logInfo("==================== 4 ====================")
74+
super.beforeFunction()
75+
}
76+
77+
override def afterFunction() {
78+
producer.close()
79+
server.shutdown()
80+
brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
81+
82+
zkClient.close()
83+
zookeeper.shutdown()
84+
85+
super.afterFunction()
86+
}
87+
88+
test("Kafka input stream") {
2889
val ssc = new StreamingContext(master, framework, batchDuration)
29-
val topics = Map("my-topic" -> 1)
30-
31-
// tests the API, does not actually test data receiving
32-
val test1: ReceiverInputDStream[(String, String)] =
33-
KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
34-
val test2: ReceiverInputDStream[(String, String)] =
35-
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
36-
val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
37-
val test3: ReceiverInputDStream[(String, String)] =
38-
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
39-
ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
40-
41-
// TODO: Actually test receiving data
90+
val topic = "topic1"
91+
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
92+
createTopic(topic)
93+
produceAndSendMessage(topic, sent)
94+
95+
val kafkaParams = Map("zookeeper.connect" -> zkConnect,
96+
"group.id" -> s"test-consumer-${random.nextInt(10000)}",
97+
"auto.offset.reset" -> "smallest")
98+
99+
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
100+
ssc,
101+
kafkaParams,
102+
Map(topic -> 1),
103+
StorageLevel.MEMORY_ONLY)
104+
val result = new mutable.HashMap[String, Long]()
105+
stream.map { case (k, v) => v }
106+
.countByValue()
107+
.foreachRDD { r =>
108+
val ret = r.collect()
109+
ret.toMap.foreach { kv =>
110+
val count = result.getOrElseUpdate(kv._1, 0) + kv._2
111+
result.put(kv._1, count)
112+
}
113+
}
114+
ssc.start()
115+
ssc.awaitTermination(3000)
116+
117+
assert(sent.size === result.size)
118+
sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
119+
42120
ssc.stop()
43121
}
122+
123+
private def createTestMessage(topic: String, sent: Map[String, Int])
124+
: Seq[KeyedMessage[String, String]] = {
125+
val messages = for ((s, freq) <- sent; i <- 0 until freq) yield {
126+
new KeyedMessage[String, String](topic, s)
127+
}
128+
messages.toSeq
129+
}
130+
131+
def createTopic(topic: String) {
132+
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
133+
logInfo("==================== 5 ====================")
134+
// wait until metadata is propagated
135+
waitUntilMetadataIsPropagated(Seq(server), topic, 0, 1000)
136+
}
137+
138+
def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
139+
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
140+
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
141+
producer.send(createTestMessage(topic, sent): _*)
142+
logInfo("==================== 6 ====================")
143+
}
144+
}
145+
146+
object KafkaTestUtils {
147+
val random = new Random()
148+
149+
def getBrokerConfig(port: Int, zkConnect: String): Properties = {
150+
val props = new Properties()
151+
props.put("broker.id", "0")
152+
props.put("host.name", "localhost")
153+
props.put("port", port.toString)
154+
props.put("log.dir", Utils.createTempDir().getAbsolutePath)
155+
props.put("zookeeper.connect", zkConnect)
156+
props.put("log.flush.interval.messages", "1")
157+
props.put("replica.socket.timeout.ms", "1500")
158+
props
159+
}
160+
161+
def getProducerConfig(brokerList: String): Properties = {
162+
val props = new Properties()
163+
props.put("metadata.broker.list", brokerList)
164+
props.put("serializer.class", classOf[StringEncoder].getName)
165+
props
166+
}
167+
168+
def waitUntilTrue(condition: () => Boolean, waitTime: Long): Boolean = {
169+
val startTime = System.currentTimeMillis()
170+
while (true) {
171+
if (condition())
172+
return true
173+
if (System.currentTimeMillis() > startTime + waitTime)
174+
return false
175+
Thread.sleep(waitTime.min(100L))
176+
}
177+
// Should never go to here
178+
throw new RuntimeException("unexpected error")
179+
}
180+
181+
def waitUntilMetadataIsPropagated(servers: Seq[KafkaServer], topic: String, partition: Int,
182+
timeout: Long) {
183+
assert(waitUntilTrue(() =>
184+
servers.foldLeft(true)(_ && _.apis.leaderCache.keySet.contains(
185+
TopicAndPartition(topic, partition))), timeout),
186+
s"Partition [$topic, $partition] metadata not propagated after timeout")
187+
}
188+
189+
class EmbeddedZookeeper(val zkConnect: String) {
190+
val random = new Random()
191+
val snapshotDir = Utils.createTempDir()
192+
val logDir = Utils.createTempDir()
193+
194+
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
195+
val (ip, port) = {
196+
val splits = zkConnect.split(":")
197+
(splits(0), splits(1).toInt)
198+
}
199+
val factory = new NIOServerCnxnFactory()
200+
factory.configure(new InetSocketAddress(ip, port), 16)
201+
factory.startup(zookeeper)
202+
203+
def shutdown() {
204+
factory.shutdown()
205+
Utils.deleteRecursively(snapshotDir)
206+
Utils.deleteRecursively(logDir)
207+
}
208+
}
44209
}

0 commit comments

Comments
 (0)