Skip to content

Commit 96c7a1d

Browse files
committed
Update the ReliableKafkaReceiver unit test
1 parent 8135d31 commit 96c7a1d

File tree

1 file changed

+18
-55
lines changed

1 file changed

+18
-55
lines changed

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala

Lines changed: 18 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.streaming.kafka
1919

20+
import java.io.File
21+
2022
import scala.collection.mutable
2123

2224
import kafka.serializer.StringDecoder
@@ -25,6 +27,7 @@ import kafka.utils.{ZkUtils, ZKGroupTopicDirs}
2527
import org.apache.spark.SparkConf
2628
import org.apache.spark.storage.StorageLevel
2729
import org.apache.spark.streaming.StreamingContext
30+
import org.apache.spark.util.Utils
2831

2932
class ReliableKafkaStreamSuite extends KafkaStreamSuite {
3033
import KafkaTestUtils._
@@ -35,6 +38,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
3538
.setAppName(framework)
3639
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
3740
val ssc = new StreamingContext(sparkConf, batchDuration)
41+
val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
42+
s"test-checkpoint${random.nextInt(10000)}"
43+
Utils.registerShutdownDeleteDir(new File(checkpointDir))
44+
ssc.checkpoint(checkpointDir)
45+
3846
val topic = "test"
3947
val sent = Map("a" -> 1, "b" -> 1, "c" -> 1)
4048
createTopic(topic)
@@ -73,6 +81,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
7381
.setAppName(framework)
7482
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
7583
val ssc = new StreamingContext(sparkConf, batchDuration)
84+
val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
85+
s"test-checkpoint${random.nextInt(10000)}"
86+
Utils.registerShutdownDeleteDir(new File(checkpointDir))
87+
ssc.checkpoint(checkpointDir)
88+
7689
val topic = "test"
7790
val sent = Map("a" -> 10, "b" -> 10, "c" -> 10)
7891
createTopic(topic)
@@ -105,6 +118,11 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
105118
.setAppName(framework)
106119
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
107120
val ssc = new StreamingContext(sparkConf, batchDuration)
121+
val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
122+
s"test-checkpoint${random.nextInt(10000)}"
123+
Utils.registerShutdownDeleteDir(new File(checkpointDir))
124+
ssc.checkpoint(checkpointDir)
125+
108126
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
109127
val sent = Map("a" -> 10, "b" -> 10, "c" -> 10)
110128
topics.foreach { case (t, _) =>
@@ -133,61 +151,6 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuite {
133151
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) }
134152
}
135153

136-
test("Verify offset commit when exception is met") {
137-
val sparkConf = new SparkConf()
138-
.setMaster(master)
139-
.setAppName(framework)
140-
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
141-
var ssc = new StreamingContext(
142-
sparkConf.clone.set("spark.streaming.blockInterval", "10000"),
143-
batchDuration)
144-
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
145-
val sent = Map("a" -> 10, "b" -> 10, "c" -> 10)
146-
topics.foreach { case (t, _) =>
147-
createTopic(t)
148-
produceAndSendMessage(t, sent)
149-
}
150-
151-
val groupId = s"test-consumer-${random.nextInt(10000)}"
152-
153-
val kafkaParams = Map("zookeeper.connect" -> s"$zkHost:$zkPort",
154-
"group.id" -> groupId,
155-
"auto.offset.reset" -> "smallest")
156-
157-
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
158-
ssc,
159-
kafkaParams,
160-
topics,
161-
StorageLevel.MEMORY_ONLY)
162-
.foreachRDD(_ => throw new Exception)
163-
try {
164-
ssc.start()
165-
ssc.awaitTermination(1000)
166-
} catch {
167-
case e: Exception =>
168-
if (ssc != null) {
169-
ssc.stop()
170-
ssc = null
171-
}
172-
}
173-
// Failed before putting to BM, so offset is not updated.
174-
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) }
175-
176-
// Restart to see if data is consumed from last checkpoint.
177-
ssc = new StreamingContext(sparkConf, batchDuration)
178-
KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
179-
ssc,
180-
kafkaParams,
181-
topics,
182-
StorageLevel.MEMORY_ONLY)
183-
.foreachRDD(_ => Unit)
184-
ssc.start()
185-
ssc.awaitTermination(3000)
186-
ssc.stop()
187-
188-
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) }
189-
}
190-
191154
private def getCommitOffset(groupId: String, topic: String, partition: Int): Long = {
192155
assert(zkClient != null, "Zookeeper client is not initialized")
193156

0 commit comments

Comments
 (0)