Skip to content

Commit 77c3e50

Browse files
committed
Code refactor and add some unit tests
1 parent dd9aeeb commit 77c3e50

File tree

4 files changed

+213
-12
lines changed

4 files changed

+213
-12
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,12 +47,12 @@ class KafkaInputDStream[
4747
@transient ssc_ : StreamingContext,
4848
kafkaParams: Map[String, String],
4949
topics: Map[String, Int],
50-
reliableStoreEnabled: Boolean,
50+
reliableReceiveEnabled: Boolean,
5151
storageLevel: StorageLevel
5252
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
5353

5454
def getReceiver(): Receiver[(K, V)] = {
55-
if (!reliableStoreEnabled) {
55+
if (!reliableReceiveEnabled) {
5656
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
5757
.asInstanceOf[Receiver[(K, V)]]
5858
} else {

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ class ReliableKafkaReceiver[
7171
val kafkaMetadata = metadata.asInstanceOf[(TopicAndPartition, Long)]
7272
topicPartitionOffsetMap.put(kafkaMetadata._1, kafkaMetadata._2)
7373
}
74-
println(s"offset map: ${topicPartitionOffsetMap.mkString(":")}")
7574
}
7675

7776
override def onGenerateBlock(blockId: StreamBlockId): Unit = {
@@ -80,7 +79,6 @@ class ReliableKafkaReceiver[
8079
val offsetSnapshot = topicPartitionOffsetMap.toMap
8180
blockOffsetMap.put(blockId, offsetSnapshot)
8281
topicPartitionOffsetMap.clear()
83-
println(s"block generated: $blockId, offset snapshot: ${offsetSnapshot.mkString(":")}")
8482
}
8583

8684
override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
@@ -101,8 +99,7 @@ class ReliableKafkaReceiver[
10199

102100
/** Manage the BlockGenerator in receiver itself for better managing block store and offset
103101
* commit */
104-
@volatile private lazy val blockGenerator =
105-
new BlockGenerator(blockGeneratorListener, streamId, env.conf)
102+
private lazy val blockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
106103

107104
override def onStop(): Unit = {
108105
if (consumerConnector != null) {
@@ -134,6 +131,9 @@ class ReliableKafkaReceiver[
134131
props.setProperty(AUTO_OFFSET_COMMIT, "false")
135132

136133
val consumerConfig = new ConsumerConfig(props)
134+
135+
assert(consumerConfig.autoCommitEnable == false)
136+
137137
logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
138138
consumerConnector = Consumer.create(consumerConfig)
139139
logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
@@ -204,7 +204,7 @@ class ReliableKafkaReceiver[
204204
s"${topicAndPart.topic}, partition ${topicAndPart.partition}", t)
205205
}
206206

207-
println(s"Committed offset ${offset} for topic ${topicAndPart.topic}, " +
207+
logInfo(s"Committed offset ${offset} for topic ${topicAndPart.topic}, " +
208208
s"partition ${topicAndPart.partition}")
209209
}
210210
}

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,12 +93,27 @@ class KafkaStreamSuite extends TestSuiteBase {
9393
}
9494

9595
override def afterFunction() {
96-
producer.close()
97-
server.shutdown()
96+
if (producer != null) {
97+
producer.close()
98+
producer = null
99+
}
100+
101+
if (server != null) {
102+
server.shutdown()
103+
server = null
104+
}
105+
98106
brokerConf.logDirs.foreach { f => Utils.deleteRecursively(new File(f)) }
99107

100-
zkClient.close()
101-
zookeeper.shutdown()
108+
if (zkClient != null) {
109+
zkClient.close()
110+
zkClient = null
111+
}
112+
113+
if (zookeeper != null) {
114+
zookeeper.shutdown()
115+
zookeeper = null
116+
}
102117

103118
super.afterFunction()
104119
}
@@ -155,7 +170,9 @@ class KafkaStreamSuite extends TestSuiteBase {
155170

156171
def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
157172
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
158-
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
173+
if (producer == null) {
174+
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
175+
}
159176
producer.send(createTestMessage(topic, sent): _*)
160177
logInfo("==================== 6 ====================")
161178
}
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.kafka
19+
20+
import scala.collection.mutable
21+
22+
import kafka.serializer.StringDecoder
23+
import kafka.utils.{ZkUtils, ZKGroupTopicDirs}
24+
25+
import org.apache.spark.SparkConf
26+
import org.apache.spark.storage.StorageLevel
27+
import org.apache.spark.streaming.StreamingContext
28+
29+
class ReliableKafkaStreamSuite extends KafkaStreamSuite {
30+
import KafkaTestUtils._
31+
32+
test("Reliable Kafka input stream") {
33+
val ssc = new StreamingContext(master, framework, batchDuration)
34+
val topic = "test"
35+
val sent = Map("a" -> 1, "b" -> 1, "c" -> 1)
36+
createTopic(topic)
37+
produceAndSendMessage(topic, sent)
38+
39+
val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
40+
"group.id" -> s"test-consumer-${random.nextInt(10000)}",
41+
"auto.offset.reset" -> "smallest")
42+
43+
val stream = KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
44+
ssc,
45+
kafkaParams,
46+
Map(topic -> 1),
47+
StorageLevel.MEMORY_ONLY)
48+
val result = new mutable.HashMap[String, Long]()
49+
stream.map { case (k, v) => v }
50+
.foreachRDD { r =>
51+
val ret = r.collect()
52+
ret.foreach { v =>
53+
val count = result.getOrElseUpdate(v, 0) + 1
54+
result.put(v, count)
55+
}
56+
}
57+
ssc.start()
58+
ssc.awaitTermination(3000)
59+
60+
assert(sent.size === result.size)
61+
sent.keys.foreach { k => assert(sent(k) === result(k).toInt) }
62+
63+
ssc.stop()
64+
}
65+
66+
test("Verify the offset commit") {
67+
val ssc = new StreamingContext(master, framework, batchDuration)
68+
val topic = "test"
69+
val sent = Map("a" -> 10, "b" -> 10, "c" -> 10)
70+
createTopic(topic)
71+
produceAndSendMessage(topic, sent)
72+
73+
val groupId = s"test-consumer-${random.nextInt(10000)}"
74+
75+
val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
76+
"group.id" -> groupId,
77+
"auto.offset.reset" -> "smallest")
78+
79+
assert(getCommitOffset(groupId, topic, 0) === 0L)
80+
81+
val stream = KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
82+
ssc,
83+
kafkaParams,
84+
Map(topic -> 1),
85+
StorageLevel.MEMORY_ONLY)
86+
stream.foreachRDD(_ => Unit)
87+
ssc.start()
88+
ssc.awaitTermination(3000)
89+
ssc.stop()
90+
91+
assert(getCommitOffset(groupId, topic, 0) === 29L)
92+
}
93+
94+
test("Verify multiple topics offset commit") {
95+
val ssc = new StreamingContext(master, framework, batchDuration)
96+
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
97+
val sent = Map("a" -> 10, "b" -> 10, "c" -> 10)
98+
topics.foreach { case (t, _) =>
99+
createTopic(t)
100+
produceAndSendMessage(t, sent)
101+
}
102+
103+
val groupId = s"test-consumer-${random.nextInt(10000)}"
104+
105+
val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
106+
"group.id" -> groupId,
107+
"auto.offset.reset" -> "smallest")
108+
109+
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) }
110+
111+
val stream = KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
112+
ssc,
113+
kafkaParams,
114+
topics,
115+
StorageLevel.MEMORY_ONLY)
116+
stream.foreachRDD(_ => Unit)
117+
ssc.start()
118+
ssc.awaitTermination(3000)
119+
ssc.stop()
120+
121+
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) }
122+
}
123+
124+
test("Verify offset commit when exception is met") {
125+
val sparkConf = new SparkConf()
126+
.setMaster(master)
127+
.setAppName(framework)
128+
var ssc = new StreamingContext(
129+
sparkConf.clone.set("spark.streaming.blockInterval", "4000"),
130+
batchDuration)
131+
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
132+
val sent = Map("a" -> 10, "b" -> 10, "c" -> 10)
133+
topics.foreach { case (t, _) =>
134+
createTopic(t)
135+
produceAndSendMessage(t, sent)
136+
}
137+
138+
val groupId = s"test-consumer-${random.nextInt(10000)}"
139+
140+
val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
141+
"group.id" -> groupId,
142+
"auto.offset.reset" -> "smallest")
143+
144+
KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
145+
ssc,
146+
kafkaParams,
147+
topics,
148+
StorageLevel.MEMORY_ONLY).foreachRDD(_ => throw new Exception)
149+
try {
150+
ssc.start()
151+
ssc.awaitTermination(1000)
152+
} catch {
153+
case e: Exception =>
154+
if (ssc != null) {
155+
ssc.stop()
156+
ssc = null
157+
}
158+
}
159+
// Failed before putting to BM, so offset is not updated.
160+
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) }
161+
162+
// Restart to see if data is consumed from last checkpoint.
163+
ssc = new StreamingContext(sparkConf, batchDuration)
164+
KafkaUtils.createReliableStream[String, String, StringDecoder, StringDecoder](
165+
ssc,
166+
kafkaParams,
167+
topics,
168+
StorageLevel.MEMORY_ONLY).foreachRDD(_ => Unit)
169+
ssc.start()
170+
ssc.awaitTermination(3000)
171+
ssc.stop()
172+
173+
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) }
174+
}
175+
176+
private def getCommitOffset(groupId: String, topic: String, partition: Int): Long = {
177+
assert(zkClient != null, "Zookeeper client is not initialized")
178+
179+
val topicDirs = new ZKGroupTopicDirs(groupId, topic)
180+
val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
181+
182+
ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong).getOrElse(0L)
183+
}
184+
}

0 commit comments

Comments
 (0)