Skip to content

Commit 796d4ca

Browse files
committed
Add real Kafka streaming test
Conflicts: external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala project/SparkBuild.scala
1 parent 3dc55fd commit 796d4ca

File tree

3 files changed

+257
-36
lines changed

3 files changed

+257
-36
lines changed

external/kafka/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@
7070
</exclusion>
7171
</exclusions>
7272
</dependency>
73+
<dependency>
74+
<groupId>net.sf.jopt-simple</groupId>
75+
<artifactId>jopt-simple</artifactId>
76+
<version>4.5</version>
77+
<scope>test</scope>
78+
</dependency>
7379
<dependency>
7480
<groupId>org.scalatest</groupId>
7581
<artifactId>scalatest_${scala.binary.version}</artifactId>

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java

Lines changed: 97 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,31 +17,108 @@
1717

1818
package org.apache.spark.streaming.kafka;
1919

20+
import java.io.Serializable;
2021
import java.util.HashMap;
22+
import java.util.List;
2123

22-
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
23-
import org.junit.Test;
24-
import com.google.common.collect.Maps;
25-
import kafka.serializer.StringDecoder;
26-
import org.apache.spark.storage.StorageLevel;
24+
import scala.Predef;
25+
import scala.Tuple2;
26+
import scala.collection.JavaConverters;
27+
28+
import junit.framework.Assert;
29+
30+
import org.apache.spark.api.java.JavaPairRDD;
31+
import org.apache.spark.api.java.function.Function;
32+
import org.apache.spark.streaming.Duration;
2733
import org.apache.spark.streaming.LocalJavaStreamingContext;
34+
import org.apache.spark.streaming.api.java.JavaDStream;
35+
import org.apache.spark.streaming.api.java.JavaPairDStream;
36+
import org.apache.spark.streaming.api.java.JavaStreamingContext;
37+
import org.apache.spark.streaming.kafka.KafkaStreamSuite;
38+
39+
import org.junit.Test;
40+
import org.junit.After;
41+
import org.junit.Before;
42+
43+
public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements Serializable {
44+
private transient KafkaStreamSuite testSuite = new KafkaStreamSuite();
45+
46+
@Before
47+
@Override
48+
public void setUp() {
49+
testSuite.beforeFunction();
50+
System.clearProperty("spark.driver.port");
51+
System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock");
52+
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
53+
}
54+
55+
@After
56+
@Override
57+
public void tearDown() {
58+
ssc.stop();
59+
ssc = null;
60+
System.clearProperty("spark.driver.port");
61+
testSuite.afterFunction();
62+
}
2863

29-
public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
3064
@Test
3165
public void testKafkaStream() {
32-
HashMap<String, Integer> topics = Maps.newHashMap();
33-
34-
// tests the API, does not actually test data receiving
35-
JavaPairReceiverInputDStream<String, String> test1 =
36-
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics);
37-
JavaPairReceiverInputDStream<String, String> test2 = KafkaUtils.createStream(ssc, "localhost:12345", "group", topics,
38-
StorageLevel.MEMORY_AND_DISK_SER_2());
39-
40-
HashMap<String, String> kafkaParams = Maps.newHashMap();
41-
kafkaParams.put("zookeeper.connect", "localhost:12345");
42-
kafkaParams.put("group.id","consumer-group");
43-
JavaPairReceiverInputDStream<String, String> test3 = KafkaUtils.createStream(ssc,
44-
String.class, String.class, StringDecoder.class, StringDecoder.class,
45-
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
66+
String topic = "topic1";
67+
HashMap<String, Integer> topics = new HashMap<String, Integer>();
68+
topics.put(topic, 1);
69+
70+
HashMap<String, Integer> sent = new HashMap<String, Integer>();
71+
sent.put("a", 5);
72+
sent.put("b", 3);
73+
sent.put("c", 10);
74+
75+
JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,
76+
testSuite.zkConnect(),
77+
"group",
78+
topics);
79+
80+
final HashMap<String, Long> result = new HashMap<String, Long>();
81+
82+
JavaDStream<String> words = stream.map(
83+
new Function<Tuple2<String, String>, String>() {
84+
@Override
85+
public String call(Tuple2<String, String> tuple2) throws Exception {
86+
return tuple2._2();
87+
}
88+
}
89+
);
90+
91+
words.countByValue().foreachRDD(
92+
new Function<JavaPairRDD<String, Long>, Void>() {
93+
@Override
94+
public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
95+
List<Tuple2<String, Long>> ret = rdd.collect();
96+
for (Tuple2<String, Long> r : ret) {
97+
if (result.containsKey(r._1())) {
98+
result.put(r._1(), result.get(r._1()) + r._2());
99+
} else {
100+
result.put(r._1(), r._2());
101+
}
102+
}
103+
104+
return null;
105+
}
106+
}
107+
);
108+
109+
ssc.start();
110+
111+
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
112+
testSuite.produceAndSendTestMessage(topic,
113+
JavaConverters.asScalaMapConverter(tmp).asScala().toMap(
114+
Predef.<Tuple2<String, Object>>conforms()
115+
));
116+
117+
ssc.awaitTermination(10000);
118+
119+
Assert.assertEquals(sent.size(), result.size());
120+
for (String k : sent.keySet()) {
121+
Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue());
122+
}
46123
}
47124
}

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala

Lines changed: 154 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,166 @@
1717

1818
package org.apache.spark.streaming.kafka
1919

20-
import kafka.serializer.StringDecoder
20+
import java.io.File
21+
import java.net.InetSocketAddress
22+
import java.util.{Properties, Random}
23+
24+
import scala.collection.mutable
25+
26+
import kafka.admin.CreateTopicCommand
27+
import kafka.common.TopicAndPartition
28+
import kafka.producer.{KeyedMessage, ProducerConfig, Producer}
29+
import kafka.utils.ZKStringSerializer
30+
import kafka.serializer.StringEncoder
31+
import kafka.server.{KafkaConfig, KafkaServer}
32+
2133
import org.apache.spark.streaming.{StreamingContext, TestSuiteBase}
22-
import org.apache.spark.storage.StorageLevel
23-
import org.apache.spark.streaming.dstream.ReceiverInputDStream
34+
import org.apache.zookeeper.server.ZooKeeperServer
35+
import org.apache.zookeeper.server.NIOServerCnxnFactory
36+
37+
import org.I0Itec.zkclient.ZkClient
2438

2539
class KafkaStreamSuite extends TestSuiteBase {
40+
val zkConnect = "localhost:2181"
41+
var zookeeper: EmbeddedZookeeper = _
42+
var zkClient: ZkClient = _
43+
val zkConnectionTimeout = 6000
44+
val zkSessionTimeout = 6000
45+
46+
val brokerPort = 9092
47+
val brokerProps = getBrokerConfig(brokerPort)
48+
val brokerConf = new KafkaConfig(brokerProps)
49+
var server: KafkaServer = _
50+
51+
override def beforeFunction() {
52+
// Zookeeper server startup
53+
zookeeper = new EmbeddedZookeeper(zkConnect)
54+
zkClient = new ZkClient(zkConnect, zkSessionTimeout, zkConnectionTimeout, ZKStringSerializer)
55+
56+
// Kafka broker startup
57+
server = new KafkaServer(brokerConf)
58+
server.startup()
59+
60+
super.beforeFunction()
61+
}
62+
63+
override def afterFunction() {
64+
server.shutdown()
65+
brokerConf.logDirs.foreach { f => KafkaStreamSuite.deleteDir(new File(f)) }
66+
67+
zkClient.close()
68+
zookeeper.shutdown()
69+
70+
super.afterFunction()
71+
}
2672

2773
test("kafka input stream") {
2874
val ssc = new StreamingContext(master, framework, batchDuration)
29-
val topics = Map("my-topic" -> 1)
30-
31-
// tests the API, does not actually test data receiving
32-
val test1: ReceiverInputDStream[(String, String)] =
33-
KafkaUtils.createStream(ssc, "localhost:1234", "group", topics)
34-
val test2: ReceiverInputDStream[(String, String)] =
35-
KafkaUtils.createStream(ssc, "localhost:12345", "group", topics, StorageLevel.MEMORY_AND_DISK_SER_2)
36-
val kafkaParams = Map("zookeeper.connect"->"localhost:12345","group.id"->"consumer-group")
37-
val test3: ReceiverInputDStream[(String, String)] =
38-
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
39-
ssc, kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2)
40-
41-
// TODO: Actually test receiving data
75+
val topic = "topic1"
76+
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
77+
78+
val stream = KafkaUtils.createStream(ssc, zkConnect, "group", Map(topic -> 1))
79+
val result = new mutable.HashMap[String, Long]()
80+
stream.map { case (k, v) => v }
81+
.countByValue()
82+
.foreachRDD { r =>
83+
val ret = r.collect()
84+
ret.toMap.foreach { kv =>
85+
val count = result.getOrElseUpdate(kv._1, 0) + kv._2
86+
result.put(kv._1, count)
87+
}
88+
}
89+
ssc.start()
90+
produceAndSendTestMessage(topic, sent)
91+
ssc.awaitTermination(10000)
92+
93+
assert(sent.size === result.size)
94+
sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
95+
4296
ssc.stop()
4397
}
98+
99+
private def getBrokerConfig(port: Int): Properties = {
100+
val props = new Properties()
101+
props.put("broker.id", "0")
102+
props.
103+
put("host.name", "localhost")
104+
props.put("port", port.toString)
105+
props.put("log.dir", KafkaStreamSuite.tmpDir().getAbsolutePath)
106+
props.put("zookeeper.connect", zkConnect)
107+
props.put("log.flush.interval.messages", "1")
108+
props.put("replica.socket.timeout.ms", "1500")
109+
props
110+
}
111+
112+
private def getProducerConfig(brokerList: String): Properties = {
113+
val props = new Properties()
114+
props.put("metadata.broker.list", brokerList)
115+
props.put("serializer.class", classOf[StringEncoder].getName)
116+
props
117+
}
118+
119+
private def createTestMessage(topic: String, sent: Map[String, Int])
120+
: Seq[KeyedMessage[String, String]] = {
121+
val messages = for ((s, freq) <- sent; i <- 0 until freq) yield {
122+
new KeyedMessage[String, String](topic, s)
123+
}
124+
messages.toSeq
125+
}
126+
127+
def produceAndSendTestMessage(topic: String, sent: Map[String, Int]) {
128+
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
129+
val producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
130+
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
131+
132+
// wait until metadata is propagated
133+
Thread.sleep(1000)
134+
assert(server.apis.leaderCache.keySet.contains(TopicAndPartition(topic, 0)))
135+
136+
producer.send(createTestMessage(topic, sent): _*)
137+
producer.close()
138+
}
139+
}
140+
141+
object KafkaStreamSuite {
142+
val random = new Random()
143+
144+
def tmpDir(): File = {
145+
val tmp = System.getProperty("java.io.tmpdir")
146+
val f = new File(tmp, "spark-kafka-" + random.nextInt(1000))
147+
f.mkdirs()
148+
f
149+
}
150+
151+
def deleteDir(file: File) {
152+
if (file.isFile) {
153+
file.delete()
154+
} else {
155+
for (f <- file.listFiles()) {
156+
deleteDir(f)
157+
}
158+
file.delete()
159+
}
160+
}
161+
}
162+
163+
class EmbeddedZookeeper(val zkConnect: String) {
164+
val random = new Random()
165+
val snapshotDir = KafkaStreamSuite.tmpDir()
166+
val logDir = KafkaStreamSuite.tmpDir()
167+
168+
val zookeeper = new ZooKeeperServer(snapshotDir, logDir, 500)
169+
val(ip, port) = {
170+
val splits = zkConnect.split(":")
171+
(splits(0), splits(1).toInt)
172+
}
173+
val factory = new NIOServerCnxnFactory()
174+
factory.configure(new InetSocketAddress(ip, port), 16)
175+
factory.startup(zookeeper)
176+
177+
def shutdown() {
178+
factory.shutdown()
179+
KafkaStreamSuite.deleteDir(snapshotDir)
180+
KafkaStreamSuite.deleteDir(logDir)
181+
}
44182
}

0 commit comments

Comments
 (0)