Skip to content

Commit 5930f64

Browse files
jerryshaotdas
authored andcommitted
[SPARK-4062][Streaming]Add ReliableKafkaReceiver in Spark Streaming Kafka connector
Add ReliableKafkaReceiver in Kafka connector to prevent data loss if WAL in Spark Streaming is enabled. Details and design doc can be seen in [SPARK-4062](https://issues.apache.org/jira/browse/SPARK-4062). Author: jerryshao <saisai.shao@intel.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Saisai Shao <saisai.shao@intel.com> Closes apache#2991 from jerryshao/kafka-refactor and squashes the following commits: 5461f1c [Saisai Shao] Merge pull request alteryx#8 from tdas/kafka-refactor3 eae4ad6 [Tathagata Das] Refectored KafkaStreamSuiteBased to eliminate KafkaTestUtils and made Java more robust. fab14c7 [Tathagata Das] minor update. 149948b [Tathagata Das] Fixed mistake 14630aa [Tathagata Das] Minor updates. d9a452c [Tathagata Das] Minor updates. ec2e95e [Tathagata Das] Removed the receiver's locks and essentially reverted to Saisai's original design. 2a20a01 [jerryshao] Address some comments 9f636b3 [Saisai Shao] Merge pull request alteryx#5 from tdas/kafka-refactor b2b2f84 [Tathagata Das] Refactored Kafka receiver logic and Kafka testsuites e501b3c [jerryshao] Add Mima excludes b798535 [jerryshao] Fix the missed issue e5e21c1 [jerryshao] Change to while loop ea873e4 [jerryshao] Further address the comments 98f3d07 [jerryshao] Fix comment style 4854ee9 [jerryshao] Address all the comments 96c7a1d [jerryshao] Update the ReliableKafkaReceiver unit test 8135d31 [jerryshao] Fix flaky test a949741 [jerryshao] Address the comments 16bfe78 [jerryshao] Change the ordering of imports 0894aef [jerryshao] Add some comments 77c3e50 [jerryshao] Code refactor and add some unit tests dd9aeeb [jerryshao] Initial commit for reliable Kafka receiver
1 parent 0cbdb01 commit 5930f64

File tree

10 files changed

+651
-143
lines changed

10 files changed

+651
-143
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717

1818
package org.apache.spark.streaming.kafka
1919

20+
import java.util.Properties
21+
2022
import scala.collection.Map
2123
import scala.reflect.{classTag, ClassTag}
2224

23-
import java.util.Properties
24-
import java.util.concurrent.Executors
25-
26-
import kafka.consumer._
25+
import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, ConsumerConnector}
2726
import kafka.serializer.Decoder
2827
import kafka.utils.VerifiableProperties
2928

@@ -32,6 +31,7 @@ import org.apache.spark.storage.StorageLevel
3231
import org.apache.spark.streaming.StreamingContext
3332
import org.apache.spark.streaming.dstream._
3433
import org.apache.spark.streaming.receiver.Receiver
34+
import org.apache.spark.util.Utils
3535

3636
/**
3737
* Input stream that pulls messages from a Kafka Broker.
@@ -51,12 +51,16 @@ class KafkaInputDStream[
5151
@transient ssc_ : StreamingContext,
5252
kafkaParams: Map[String, String],
5353
topics: Map[String, Int],
54+
useReliableReceiver: Boolean,
5455
storageLevel: StorageLevel
5556
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
5657

5758
def getReceiver(): Receiver[(K, V)] = {
58-
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
59-
.asInstanceOf[Receiver[(K, V)]]
59+
if (!useReliableReceiver) {
60+
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
61+
} else {
62+
new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
63+
}
6064
}
6165
}
6266

@@ -69,14 +73,15 @@ class KafkaReceiver[
6973
kafkaParams: Map[String, String],
7074
topics: Map[String, Int],
7175
storageLevel: StorageLevel
72-
) extends Receiver[Any](storageLevel) with Logging {
76+
) extends Receiver[(K, V)](storageLevel) with Logging {
7377

7478
// Connection to Kafka
75-
var consumerConnector : ConsumerConnector = null
79+
var consumerConnector: ConsumerConnector = null
7680

7781
def onStop() {
7882
if (consumerConnector != null) {
7983
consumerConnector.shutdown()
84+
consumerConnector = null
8085
}
8186
}
8287

@@ -102,11 +107,11 @@ class KafkaReceiver[
102107
.newInstance(consumerConfig.props)
103108
.asInstanceOf[Decoder[V]]
104109

105-
// Create Threads for each Topic/Message Stream we are listening
110+
// Create threads for each topic/message Stream we are listening
106111
val topicMessageStreams = consumerConnector.createMessageStreams(
107112
topics, keyDecoder, valueDecoder)
108113

109-
val executorPool = Executors.newFixedThreadPool(topics.values.sum)
114+
val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
110115
try {
111116
// Start the messages handler for each partition
112117
topicMessageStreams.values.foreach { streams =>
@@ -117,13 +122,15 @@ class KafkaReceiver[
117122
}
118123
}
119124

120-
// Handles Kafka Messages
121-
private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
125+
// Handles Kafka messages
126+
private class MessageHandler(stream: KafkaStream[K, V])
122127
extends Runnable {
123128
def run() {
124129
logInfo("Starting MessageHandler.")
125130
try {
126-
for (msgAndMetadata <- stream) {
131+
val streamIterator = stream.iterator()
132+
while (streamIterator.hasNext()) {
133+
val msgAndMetadata = streamIterator.next()
127134
store((msgAndMetadata.key, msgAndMetadata.message))
128135
}
129136
} catch {

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@ object KafkaUtils {
7070
topics: Map[String, Int],
7171
storageLevel: StorageLevel
7272
): ReceiverInputDStream[(K, V)] = {
73-
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
73+
val walEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)
74+
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
7475
}
7576

7677
/**
@@ -99,7 +100,6 @@ object KafkaUtils {
99100
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
100101
* in its own thread.
101102
* @param storageLevel RDD storage level.
102-
*
103103
*/
104104
def createStream(
105105
jssc: JavaStreamingContext,
Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.kafka
19+
20+
import java.util.Properties
21+
import java.util.concurrent.{ThreadPoolExecutor, ConcurrentHashMap}
22+
23+
import scala.collection.{Map, mutable}
24+
import scala.reflect.{ClassTag, classTag}
25+
26+
import kafka.common.TopicAndPartition
27+
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerConnector, KafkaStream}
28+
import kafka.message.MessageAndMetadata
29+
import kafka.serializer.Decoder
30+
import kafka.utils.{VerifiableProperties, ZKGroupTopicDirs, ZKStringSerializer, ZkUtils}
31+
import org.I0Itec.zkclient.ZkClient
32+
33+
import org.apache.spark.{Logging, SparkEnv}
34+
import org.apache.spark.storage.{StorageLevel, StreamBlockId}
35+
import org.apache.spark.streaming.receiver.{BlockGenerator, BlockGeneratorListener, Receiver}
36+
import org.apache.spark.util.Utils
37+
38+
/**
39+
* ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
40+
* It is turned off by default and will be enabled when
41+
* spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
42+
* is that this receiver manages topic-partition/offset itself and updates the offset information
43+
* after data is reliably stored as write-ahead log. Offsets will only be updated when data is
44+
* reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
45+
*
46+
* Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
47+
* commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
48+
* will not take effect.
49+
*/
50+
private[streaming]
51+
class ReliableKafkaReceiver[
52+
K: ClassTag,
53+
V: ClassTag,
54+
U <: Decoder[_]: ClassTag,
55+
T <: Decoder[_]: ClassTag](
56+
kafkaParams: Map[String, String],
57+
topics: Map[String, Int],
58+
storageLevel: StorageLevel)
59+
extends Receiver[(K, V)](storageLevel) with Logging {
60+
61+
private val groupId = kafkaParams("group.id")
62+
private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
63+
private def conf = SparkEnv.get.conf
64+
65+
/** High level consumer to connect to Kafka. */
66+
private var consumerConnector: ConsumerConnector = null
67+
68+
/** zkClient to connect to Zookeeper to commit the offsets. */
69+
private var zkClient: ZkClient = null
70+
71+
/**
72+
* A HashMap to manage the offset for each topic/partition, this HashMap is called in
73+
* synchronized block, so mutable HashMap will not meet concurrency issue.
74+
*/
75+
private var topicPartitionOffsetMap: mutable.HashMap[TopicAndPartition, Long] = null
76+
77+
/** A concurrent HashMap to store the stream block id and related offset snapshot. */
78+
private var blockOffsetMap: ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]] = null
79+
80+
/**
81+
* Manage the BlockGenerator in receiver itself for better managing block store and offset
82+
* commit.
83+
*/
84+
private var blockGenerator: BlockGenerator = null
85+
86+
/** Thread pool running the handlers for receiving message from multiple topics and partitions. */
87+
private var messageHandlerThreadPool: ThreadPoolExecutor = null
88+
89+
override def onStart(): Unit = {
90+
logInfo(s"Starting Kafka Consumer Stream with group: $groupId")
91+
92+
// Initialize the topic-partition / offset hash map.
93+
topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long]
94+
95+
// Initialize the stream block id / offset snapshot hash map.
96+
blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]()
97+
98+
// Initialize the block generator for storing Kafka message.
99+
blockGenerator = new BlockGenerator(new GeneratedBlockHandler, streamId, conf)
100+
101+
if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") {
102+
logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " +
103+
"otherwise we will manually set it to false to turn off auto offset commit in Kafka")
104+
}
105+
106+
val props = new Properties()
107+
kafkaParams.foreach(param => props.put(param._1, param._2))
108+
// Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true,
109+
// we have to make sure this property is set to false to turn off auto commit mechanism in
110+
// Kafka.
111+
props.setProperty(AUTO_OFFSET_COMMIT, "false")
112+
113+
val consumerConfig = new ConsumerConfig(props)
114+
115+
assert(!consumerConfig.autoCommitEnable)
116+
117+
logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}")
118+
consumerConnector = Consumer.create(consumerConfig)
119+
logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}")
120+
121+
zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs,
122+
consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer)
123+
124+
messageHandlerThreadPool = Utils.newDaemonFixedThreadPool(
125+
topics.values.sum, "KafkaMessageHandler")
126+
127+
blockGenerator.start()
128+
129+
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
130+
.newInstance(consumerConfig.props)
131+
.asInstanceOf[Decoder[K]]
132+
133+
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
134+
.newInstance(consumerConfig.props)
135+
.asInstanceOf[Decoder[V]]
136+
137+
val topicMessageStreams = consumerConnector.createMessageStreams(
138+
topics, keyDecoder, valueDecoder)
139+
140+
topicMessageStreams.values.foreach { streams =>
141+
streams.foreach { stream =>
142+
messageHandlerThreadPool.submit(new MessageHandler(stream))
143+
}
144+
}
145+
}
146+
147+
override def onStop(): Unit = {
148+
if (messageHandlerThreadPool != null) {
149+
messageHandlerThreadPool.shutdown()
150+
messageHandlerThreadPool = null
151+
}
152+
153+
if (consumerConnector != null) {
154+
consumerConnector.shutdown()
155+
consumerConnector = null
156+
}
157+
158+
if (zkClient != null) {
159+
zkClient.close()
160+
zkClient = null
161+
}
162+
163+
if (blockGenerator != null) {
164+
blockGenerator.stop()
165+
blockGenerator = null
166+
}
167+
168+
if (topicPartitionOffsetMap != null) {
169+
topicPartitionOffsetMap.clear()
170+
topicPartitionOffsetMap = null
171+
}
172+
173+
if (blockOffsetMap != null) {
174+
blockOffsetMap.clear()
175+
blockOffsetMap = null
176+
}
177+
}
178+
179+
/** Store a Kafka message and the associated metadata as a tuple. */
180+
private def storeMessageAndMetadata(
181+
msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
182+
val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
183+
val data = (msgAndMetadata.key, msgAndMetadata.message)
184+
val metadata = (topicAndPartition, msgAndMetadata.offset)
185+
blockGenerator.addDataWithCallback(data, metadata)
186+
}
187+
188+
/** Update stored offset */
189+
private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = {
190+
topicPartitionOffsetMap.put(topicAndPartition, offset)
191+
}
192+
193+
/**
194+
* Remember the current offsets for each topic and partition. This is called when a block is
195+
* generated.
196+
*/
197+
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
198+
// Get a snapshot of current offset map and store with related block id.
199+
val offsetSnapshot = topicPartitionOffsetMap.toMap
200+
blockOffsetMap.put(blockId, offsetSnapshot)
201+
topicPartitionOffsetMap.clear()
202+
}
203+
204+
/** Store the ready-to-be-stored block and commit the related offsets to zookeeper. */
205+
private def storeBlockAndCommitOffset(
206+
blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
207+
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
208+
Option(blockOffsetMap.get(blockId)).foreach(commitOffset)
209+
blockOffsetMap.remove(blockId)
210+
}
211+
212+
/**
213+
* Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's
214+
* metadata schema in Zookeeper.
215+
*/
216+
private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = {
217+
if (zkClient == null) {
218+
val thrown = new IllegalStateException("Zookeeper client is unexpectedly null")
219+
stop("Zookeeper client is not initialized before commit offsets to ZK", thrown)
220+
return
221+
}
222+
223+
for ((topicAndPart, offset) <- offsetMap) {
224+
try {
225+
val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic)
226+
val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}"
227+
228+
ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString)
229+
} catch {
230+
case e: Exception =>
231+
logWarning(s"Exception during commit offset $offset for topic" +
232+
s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e)
233+
}
234+
235+
logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " +
236+
s"partition ${topicAndPart.partition}")
237+
}
238+
}
239+
240+
/** Class to handle received Kafka message. */
241+
private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
242+
override def run(): Unit = {
243+
while (!isStopped) {
244+
try {
245+
val streamIterator = stream.iterator()
246+
while (streamIterator.hasNext) {
247+
storeMessageAndMetadata(streamIterator.next)
248+
}
249+
} catch {
250+
case e: Exception =>
251+
logError("Error handling message", e)
252+
}
253+
}
254+
}
255+
}
256+
257+
/** Class to handle blocks generated by the block generator. */
258+
private final class GeneratedBlockHandler extends BlockGeneratorListener {
259+
260+
def onAddData(data: Any, metadata: Any): Unit = {
261+
// Update the offset of the data that was added to the generator
262+
if (metadata != null) {
263+
val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
264+
updateOffset(topicAndPartition, offset)
265+
}
266+
}
267+
268+
def onGenerateBlock(blockId: StreamBlockId): Unit = {
269+
// Remember the offsets of topics/partitions when a block has been generated
270+
rememberBlockOffsets(blockId)
271+
}
272+
273+
def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
274+
// Store block and commit the blocks offset
275+
storeBlockAndCommitOffset(blockId, arrayBuffer)
276+
}
277+
278+
def onError(message: String, throwable: Throwable): Unit = {
279+
reportError(message, throwable)
280+
}
281+
}
282+
}

0 commit comments

Comments
 (0)