Skip to content

Commit b2b2f84

Browse files
committed
Refactored Kafka receiver logic and Kafka testsuites
1 parent e501b3c commit b2b2f84

File tree

5 files changed

+254
-236
lines changed

5 files changed

+254
-236
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala

Lines changed: 108 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,23 @@
1818
package org.apache.spark.streaming.kafka
1919

2020
import java.util.Properties
21-
import java.util.concurrent.ConcurrentHashMap
21+
import java.util.concurrent.{ThreadPoolExecutor, ConcurrentHashMap}
2222

23-
import scala.collection.Map
24-
import scala.collection.mutable
25-
import scala.reflect.{classTag, ClassTag}
23+
import scala.collection.{Map, mutable}
24+
import scala.reflect.{ClassTag, classTag}
2625

2726
import kafka.common.TopicAndPartition
2827
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
28+
import kafka.message.MessageAndMetadata
2929
import kafka.serializer.Decoder
30-
import kafka.utils.{ZkUtils, ZKGroupTopicDirs, ZKStringSerializer, VerifiableProperties}
30+
import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils}
3131
import org.I0Itec.zkclient.ZkClient
3232

33-
import org.apache.spark.{SparkEnv, Logging}
34-
import org.apache.spark.storage.{StreamBlockId, StorageLevel}
35-
import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
33+
import org.apache.spark.{Logging, SparkEnv}
34+
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
35+
import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
3636
import org.apache.spark.util.Utils
3737

38-
3938
/**
4039
* ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
4140
* It is turned off by default and will be enabled when
@@ -60,10 +59,8 @@ class ReliableKafkaReceiver[
6059
extends Receiver[(K, V)](storageLevel) with Logging {
6160

6261
private val groupId = kafkaParams("group.id")
63-
6462
private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
65-
66-
private def conf() = SparkEnv.get.conf
63+
private def conf = SparkEnv.get.conf
6764

6865
/** High level consumer to connect to Kafka. */
6966
private var consumerConnector: ConsumerConnector = null
@@ -86,58 +83,8 @@ class ReliableKafkaReceiver[
8683
*/
8784
private var blockGenerator: BlockGenerator = null
8885

89-
/** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */
90-
private final class OffsetCheckpointListener extends BlockGeneratorListener {
91-
92-
override def onGenerateBlock(blockId: StreamBlockId): Unit = {
93-
// Get a snapshot of current offset map and store with related block id. Since this hook
94-
// function is called in synchronized block, so we can get the snapshot without explicit lock.
95-
val offsetSnapshot = topicPartitionOffsetMap.toMap
96-
blockOffsetMap.put(blockId, offsetSnapshot)
97-
topicPartitionOffsetMap.clear()
98-
}
99-
100-
override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
101-
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
102-
103-
// Commit and remove the related offsets.
104-
Option(blockOffsetMap.get(blockId)).foreach { offsetMap =>
105-
commitOffset(offsetMap)
106-
}
107-
blockOffsetMap.remove(blockId)
108-
}
109-
110-
override def onError(message: String, throwable: Throwable): Unit = {
111-
reportError(message, throwable)
112-
}
113-
}
114-
115-
override def onStop(): Unit = {
116-
if (consumerConnector != null) {
117-
consumerConnector.shutdown()
118-
consumerConnector = null
119-
}
120-
121-
if (zkClient != null) {
122-
zkClient.close()
123-
zkClient = null
124-
}
125-
126-
if (blockGenerator != null) {
127-
blockGenerator.stop()
128-
blockGenerator = null
129-
}
130-
131-
if (topicPartitionOffsetMap != null) {
132-
topicPartitionOffsetMap.clear()
133-
topicPartitionOffsetMap = null
134-
}
135-
136-
if (blockOffsetMap != null) {
137-
blockOffsetMap.clear()
138-
blockOffsetMap = null
139-
}
140-
}
86+
/** Threadpool running the handlers for receiving message from multiple topics and partitions. */
87+
private var messageHandlerThreadPool: ThreadPoolExecutor = null
14188

14289
override def onStart(): Unit = {
14390
logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
@@ -149,7 +96,7 @@ class ReliableKafkaReceiver[
14996
blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()
15097

15198
// Initialize the block generator for storing Kafka message.
152-
blockGenerator = new BlockGenerator(new OffsetCheckpointListener, streamId, conf())
99+
blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, conf)
153100

154101
if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
155102
logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
@@ -174,7 +121,9 @@ class ReliableKafkaReceiver[
174121
zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
175122
consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
176123

177-
// start BlockGenerator
124+
messageHandlerThreadPool = Utils.newDaemonFixedThreadPool(
125+
topics.values.sum, "KafkaMessageHandler")
126+
178127
blockGenerator.start()
179128

180129
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
@@ -188,40 +137,70 @@ class ReliableKafkaReceiver[
188137
val topicMessageStreams = consumerConnector.createMessageStreams(
189138
topics, keyDecoder, valueDecoder)
190139

191-
val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
192-
193-
try {
194-
topicMessageStreams.values.foreach { streams =>
195-
streams.foreach { stream =>
196-
executorPool.submit(new MessageHandler(stream))
197-
}
140+
topicMessageStreams.values.foreach { streams =>
141+
streams.foreach { stream =>
142+
messageHandlerThreadPool.submit(new MessageHandler(stream))
198143
}
199-
} finally {
200-
executorPool.shutdown()
201144
}
145+
println("Starting")
202146
}
203147

204-
/** A inner class to handle received Kafka message. */
205-
private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
206-
override def run(): Unit = {
207-
logInfo(s"Starting message process thread ${Thread.currentThread().getId}.")
208-
try {
209-
val streamIterator = stream.iterator()
210-
while (streamIterator.hasNext()) {
211-
val msgAndMetadata = streamIterator.next()
212-
val topicAndPartition = TopicAndPartition(
213-
msgAndMetadata.topic, msgAndMetadata.partition)
214-
blockGenerator.synchronized {
215-
blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message))
216-
topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset)
217-
}
218-
}
219-
} catch {
220-
case e: Throwable => logError("Error handling message; existing", e)
221-
}
148+
override def onStop(): Unit = {
149+
if (messageHandlerThreadPool != null) {
150+
messageHandlerThreadPool.shutdown()
151+
messageHandlerThreadPool = null
152+
}
153+
154+
if (consumerConnector != null) {
155+
consumerConnector.shutdown()
156+
consumerConnector = null
157+
}
158+
159+
if (zkClient != null) {
160+
zkClient.close()
161+
zkClient = null
162+
}
163+
164+
if (blockGenerator != null) {
165+
blockGenerator.stop()
166+
blockGenerator = null
167+
}
168+
169+
if (topicPartitionOffsetMap != null) {
170+
topicPartitionOffsetMap.clear()
171+
topicPartitionOffsetMap = null
172+
}
173+
174+
if (blockOffsetMap != null) {
175+
blockOffsetMap.clear()
176+
blockOffsetMap = null
222177
}
223178
}
224179

180+
/** Store a Kafka message and the associated metadata as a tuple */
181+
private def storeMessageAndMetadata(
182+
msgAndMetadata: MessageAndMetadata[K, V]): Unit = synchronized {
183+
val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
184+
blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message))
185+
topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset)
186+
}
187+
188+
/** Remember the current offsets for each topic and partition. This is called when a block is generated */
189+
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = synchronized {
190+
// Get a snapshot of current offset map and store with related block id. Since this hook
191+
// function is called in synchronized block, so we can get the snapshot without explicit lock.
192+
val offsetSnapshot = topicPartitionOffsetMap.toMap
193+
blockOffsetMap.put(blockId, offsetSnapshot)
194+
topicPartitionOffsetMap.clear()
195+
}
196+
197+
/** Store the ready-to-be-stored block and commit the related offsets to zookeeper */
198+
private def storeBlockAndCommitOffset(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
199+
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
200+
Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
201+
blockOffsetMap.remove(blockId)
202+
}
203+
225204
/**
226205
* Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
227206
* metadata schema in Zookeeper.
@@ -248,4 +227,40 @@ class ReliableKafkaReceiver[
248227
s"partition ${topicAndPart.partition}")
249228
}
250229
}
230+
231+
/** Class to handle received Kafka message. */
232+
private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
233+
override def run(): Unit = {
234+
while (!isStopped) {
235+
println(s"Starting message process thread ${Thread.currentThread().getId}.")
236+
try {
237+
val streamIterator = stream.iterator()
238+
while (streamIterator.hasNext) {
239+
storeMessageAndMetadata(streamIterator.next)
240+
}
241+
} catch {
242+
case e: Exception =>
243+
logError("Error handling message", e)
244+
}
245+
}
246+
}
247+
}
248+
249+
/** Class to handle blocks generated by the block generator. */
250+
private final class GeneratedBlockHandler extends BlockGeneratorListener {
251+
252+
override def onGenerateBlock(blockId: StreamBlockId): Unit = {
253+
// Remember the offsets of topics/partitions when a block has been generated
254+
rememberBlockOffsets(blockId)
255+
}
256+
257+
override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
258+
// Store block and commit the blocks offset
259+
storeBlockAndCommitOffset(blockId, arrayBuffer)
260+
}
261+
262+
override def onError(message: String, throwable: Throwable): Unit = {
263+
reportError(message, throwable)
264+
}
265+
}
251266
}

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.Serializable;
2121
import java.util.HashMap;
2222
import java.util.List;
23+
import java.util.Random;
2324

2425
import scala.Predef;
2526
import scala.Tuple2;
@@ -42,25 +43,23 @@
4243
import org.junit.After;
4344
import org.junit.Before;
4445

45-
public class JavaKafkaStreamSuite extends LocalJavaStreamingContext implements Serializable {
46-
private transient KafkaStreamSuite testSuite = new KafkaStreamSuite();
46+
public class JavaKafkaStreamSuite extends KafkaStreamSuiteBase implements Serializable {
47+
private transient JavaStreamingContext ssc = null;
48+
private Random random = new Random();
4749

4850
@Before
49-
@Override
5051
public void setUp() {
51-
testSuite.beforeFunction();
52+
beforeFunction();
5253
System.clearProperty("spark.driver.port");
53-
//System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.SystemClock");
54-
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
54+
ssc = new JavaStreamingContext(sparkConf(), batchDuration());
5555
}
5656

5757
@After
58-
@Override
5958
public void tearDown() {
6059
ssc.stop();
6160
ssc = null;
6261
System.clearProperty("spark.driver.port");
63-
testSuite.afterFunction();
62+
afterFunction();
6463
}
6564

6665
@Test
@@ -74,15 +73,15 @@ public void testKafkaStream() throws InterruptedException {
7473
sent.put("b", 3);
7574
sent.put("c", 10);
7675

77-
testSuite.createTopic(topic);
76+
createTopic(topic);
7877
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
79-
testSuite.produceAndSendMessage(topic,
78+
produceAndSendMessage(topic,
8079
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
8180
Predef.<Tuple2<String, Object>>conforms()));
8281

8382
HashMap<String, String> kafkaParams = new HashMap<String, String>();
84-
kafkaParams.put("zookeeper.connect", testSuite.zkHost() + ":" + testSuite.zkPort());
85-
kafkaParams.put("group.id", "test-consumer-" + KafkaTestUtils.random().nextInt(10000));
83+
kafkaParams.put("zookeeper.connect", zkAddress());
84+
kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000));
8685
kafkaParams.put("auto.offset.reset", "smallest");
8786

8887
JavaPairDStream<String, String> stream = KafkaUtils.createStream(ssc,

0 commit comments

Comments
 (0)