Skip to content

Commit bb65232

Browse files
committed
Fixed test bug and refactored KafkaStreamSuite
1 parent 50f2b56 commit bb65232

File tree

9 files changed

+85
-85
lines changed

9 files changed

+85
-85
lines changed

examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.SparkConf
2626
/**
2727
* Consumes messages from one or more topics in Kafka and does wordcount.
2828
* Usage: DirectKafkaWordCount <brokers> <topics>
29-
* <brokers> is a list of one or more zookeeper servers that make quorum
29+
* <brokers> is a list of one or more Kafka brokers
3030
* <topics> is a list of one or more kafka topics to consume from
3131
*
3232
* Example:

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ object KafkaUtils {
171171
K: ClassTag,
172172
V: ClassTag,
173173
KD <: Decoder[K]: ClassTag,
174-
VD <: Decoder[V]: ClassTag] (
174+
VD <: Decoder[V]: ClassTag](
175175
sc: SparkContext,
176176
kafkaParams: Map[String, String],
177177
offsetRanges: Array[OffsetRange]
@@ -205,7 +205,7 @@ object KafkaUtils {
205205
V: ClassTag,
206206
KD <: Decoder[K]: ClassTag,
207207
VD <: Decoder[V]: ClassTag,
208-
R: ClassTag] (
208+
R: ClassTag](
209209
sc: SparkContext,
210210
kafkaParams: Map[String, String],
211211
offsetRanges: Array[OffsetRange],

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java

Lines changed: 10 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,11 @@
2121
import java.util.HashMap;
2222
import java.util.HashSet;
2323
import java.util.Random;
24-
import java.util.List;
25-
import java.util.ArrayList;
24+
import java.util.Arrays;
2625

2726
import org.apache.spark.SparkConf;
28-
import org.apache.spark.streaming.Duration;
29-
import scala.Predef;
27+
3028
import scala.Tuple2;
31-
import scala.collection.JavaConverters;
3229

3330
import junit.framework.Assert;
3431

@@ -74,12 +71,12 @@ public void testKafkaStream() throws InterruptedException {
7471
String topic1 = "topic1";
7572
String topic2 = "topic2";
7673

77-
List<String> topic1data = createTopicAndSendData(topic1);
78-
List<String> topic2data = createTopicAndSendData(topic2);
74+
String[] topic1data = createTopicAndSendData(topic1);
75+
String[] topic2data = createTopicAndSendData(topic2);
7976

8077
HashSet<String> sent = new HashSet<String>();
81-
sent.addAll(topic1data);
82-
sent.addAll(topic2data);
78+
sent.addAll(Arrays.asList(topic1data));
79+
sent.addAll(Arrays.asList(topic2data));
8380

8481
HashMap<String, String> kafkaParams = new HashMap<String, String>();
8582
kafkaParams.put("metadata.broker.list", suiteBase.brokerAddress());
@@ -122,7 +119,7 @@ public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception
122119

123120
final HashSet<String> result = new HashSet<String>();
124121
unifiedStream.foreachRDD(
125-
new Function<JavaRDD<String>, Void> () {
122+
new Function<JavaRDD<String>, Void>() {
126123
@Override
127124
public Void call(org.apache.spark.api.java.JavaRDD<String> rdd) throws Exception {
128125
result.addAll(rdd.collect());
@@ -153,19 +150,10 @@ private HashMap<TopicAndPartition, Long> topicOffsetToMap(String topic, Long off
153150
return topicMap;
154151
}
155152

156-
private List<String> createTopicAndSendData(String topic) {
157-
List<String> data = java.util.Arrays.asList(topic+"-1", topic+"-2", topic+"-3");
158-
HashMap<String, Integer> sent = new HashMap<String, Integer>();
159-
for(String i: data) {
160-
sent.put(i, 1);
161-
}
162-
153+
private String[] createTopicAndSendData(String topic) {
154+
String[] data = { topic + "-1", topic + "-2", topic + "-3"};
163155
suiteBase.createTopic(topic);
164-
165-
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
166-
suiteBase.produceAndSendMessage(topic,
167-
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
168-
Predef.<Tuple2<String, Object>>conforms()));
156+
suiteBase.sendMessages(topic, data);
169157
return data;
170158
}
171159
}

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,10 @@ public void testKafkaStream() throws InterruptedException {
7979

8080
suiteBase.createTopic(topic);
8181
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
82-
suiteBase.produceAndSendMessage(topic,
82+
suiteBase.sendMessages(topic,
8383
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
84-
Predef.<Tuple2<String, Object>>conforms()));
84+
Predef.<Tuple2<String, Object>>conforms())
85+
);
8586

8687
HashMap<String, String> kafkaParams = new HashMap<String, String>();
8788
kafkaParams.put("zookeeper.connect", suiteBase.zkAddress());

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import kafka.serializer.StringDecoder
2727
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
2828
import org.scalatest.concurrent.{Eventually, Timeouts}
2929

30-
import org.apache.spark.SparkConf
30+
import org.apache.spark.{SparkContext, SparkConf}
3131
import org.apache.spark.rdd.RDD
3232
import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time}
3333
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
@@ -41,7 +41,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
4141
.setMaster("local[4]")
4242
.setAppName(this.getClass.getSimpleName)
4343

44-
44+
var sc: SparkContext = _
4545
var ssc: StreamingContext = _
4646
var testDir: File = _
4747

@@ -56,18 +56,23 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
5656
after {
5757
if (ssc != null) {
5858
ssc.stop()
59+
sc = null
60+
}
61+
if (sc != null) {
62+
sc.stop()
5963
}
6064
if (testDir != null) {
6165
Utils.deleteRecursively(testDir)
6266
}
6367
}
6468

65-
test("basic receiving with multiple topics and smallest starting offset") {
66-
val topics = Set("topic1", "topic2", "topic3")
69+
70+
test("basic stream receiving with multiple topics and smallest starting offset") {
71+
val topics = Set("basic1", "basic2", "basic3")
6772
val data = Map("a" -> 7, "b" -> 9)
6873
topics.foreach { t =>
6974
createTopic(t)
70-
produceAndSendMessage(t, data)
75+
sendMessages(t, data)
7176
}
7277
val kafkaParams = Map(
7378
"metadata.broker.list" -> s"$brokerAddress",
@@ -107,6 +112,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
107112
}
108113
ssc.stop()
109114
}
115+
110116
test("receiving from largest starting offset") {
111117
val topic = "largest"
112118
val topicPartition = TopicAndPartition(topic, 0)
@@ -122,7 +128,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
122128
}
123129

124130
// Send some initial messages before starting context
125-
produceAndSendMessage(topic, data)
131+
sendMessages(topic, data)
126132
eventually(timeout(10 seconds), interval(20 milliseconds)) {
127133
assert(getLatestOffset() > 3)
128134
}
@@ -144,7 +150,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
144150
stream.map { _._2 }.foreachRDD { rdd => collectedData ++= rdd.collect() }
145151
ssc.start()
146152
val newData = Map("b" -> 10)
147-
produceAndSendMessage(topic, newData)
153+
sendMessages(topic, newData)
148154
eventually(timeout(10 seconds), interval(50 milliseconds)) {
149155
collectedData.contains("b")
150156
}
@@ -167,7 +173,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
167173
}
168174

169175
// Send some initial messages before starting context
170-
produceAndSendMessage(topic, data)
176+
sendMessages(topic, data)
171177
eventually(timeout(10 seconds), interval(20 milliseconds)) {
172178
assert(getLatestOffset() >= 10)
173179
}
@@ -190,7 +196,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
190196
stream.foreachRDD { rdd => collectedData ++= rdd.collect() }
191197
ssc.start()
192198
val newData = Map("b" -> 10)
193-
produceAndSendMessage(topic, newData)
199+
sendMessages(topic, newData)
194200
eventually(timeout(10 seconds), interval(50 milliseconds)) {
195201
collectedData.contains("b")
196202
}
@@ -211,7 +217,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
211217
// Send data to Kafka and wait for it to be received
212218
def sendDataAndWaitForReceive(data: Seq[Int]) {
213219
val strings = data.map { _.toString}
214-
produceAndSendMessage(topic, strings.map { _ -> 1}.toMap)
220+
sendMessages(topic, strings.map { _ -> 1}.toMap)
215221
eventually(timeout(10 seconds), interval(50 milliseconds)) {
216222
assert(strings.forall { DirectKafkaStreamSuite.collectedData.contains })
217223
}

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaClusterSuite.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,29 +19,29 @@ package org.apache.spark.streaming.kafka
1919

2020
import scala.util.Random
2121

22-
import org.scalatest.BeforeAndAfter
2322
import kafka.common.TopicAndPartition
23+
import org.scalatest.BeforeAndAfterAll
2424

25-
class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
26-
27-
val kafkaParams = Map("metadata.broker.list" -> s"$brokerAddress")
28-
val kc = new KafkaCluster(kafkaParams)
25+
class KafkaClusterSuite extends KafkaStreamSuiteBase with BeforeAndAfterAll {
2926
val topic = "kcsuitetopic" + Random.nextInt(10000)
3027
val topicAndPartition = TopicAndPartition(topic, 0)
28+
var kc: KafkaCluster = null
3129

32-
before {
30+
override def beforeAll() {
3331
setupKafka()
3432
createTopic(topic)
35-
produceAndSendMessage(topic, Map("a" -> 1))
33+
sendMessages(topic, Map("a" -> 1))
34+
kc = new KafkaCluster(Map("metadata.broker.list" -> s"$brokerAddress"))
3635
}
3736

38-
after {
37+
override def afterAll() {
3938
tearDownKafka()
4039
}
4140

4241
test("metadata apis") {
43-
val leader = kc.findLeaders(Set(topicAndPartition)).right.get
44-
assert(leader(topicAndPartition) === (brokerHost, brokerPort), "didn't get leader")
42+
val leader = kc.findLeaders(Set(topicAndPartition)).right.get(topicAndPartition)
43+
val leaderAddress = s"${leader._1}:${leader._2}"
44+
assert(leaderAddress === brokerAddress, "didn't get leader")
4545

4646
val parts = kc.getPartitions(Set(topic)).right.get
4747
assert(parts(topicAndPartition), "didn't get partitions")

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
4646
val topic = "topic1"
4747
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
4848
createTopic(topic)
49-
produceAndSendMessage(topic, sent)
49+
sendMessages(topic, sent)
5050

51-
val kafkaParams = Map("metadata.broker.list" -> s"localhost:$brokerPort",
51+
val kafkaParams = Map("metadata.broker.list" -> brokerAddress,
5252
"group.id" -> s"test-consumer-${Random.nextInt(10000)}")
5353

5454
val kc = new KafkaCluster(kafkaParams)
@@ -65,14 +65,14 @@ class KafkaRDDSuite extends KafkaStreamSuiteBase with BeforeAndAfter {
6565

6666
val rdd2 = getRdd(kc, Set(topic))
6767
val sent2 = Map("d" -> 1)
68-
produceAndSendMessage(topic, sent2)
68+
sendMessages(topic, sent2)
6969
// this is the "0 messages" case
7070
// make sure we dont get anything, since messages were sent after rdd was defined
7171
assert(rdd2.isDefined)
7272
assert(rdd2.get.count === 0)
7373

7474
val rdd3 = getRdd(kc, Set(topic))
75-
produceAndSendMessage(topic, Map("extra" -> 22))
75+
sendMessages(topic, Map("extra" -> 22))
7676
// this is the "exactly 1 message" case
7777
// make sure we get exactly one message, despite there being lots more available
7878
assert(rdd3.isDefined)

0 commit comments

Comments
 (0)