Skip to content

Commit dd9aeeb

Browse files
committed
Initial commit for reliable Kafka receiver
1 parent 6e03de3 commit dd9aeeb

File tree

7 files changed

+441
-8
lines changed

7 files changed

+441
-8
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,8 @@
1818
package org.apache.spark.streaming.kafka
1919

2020
import scala.collection.Map
21-
import scala.reflect.{classTag, ClassTag}
21+
import scala.reflect.ClassTag
2222

23-
import java.util.Properties
24-
import java.util.concurrent.Executors
25-
26-
import kafka.consumer._
2723
import kafka.serializer.Decoder
2824
import kafka.utils.VerifiableProperties
2925

@@ -51,11 +47,16 @@ class KafkaInputDStream[
5147
@transient ssc_ : StreamingContext,
5248
kafkaParams: Map[String, String],
5349
topics: Map[String, Int],
50+
reliableStoreEnabled: Boolean,
5451
storageLevel: StorageLevel
5552
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
5653

5754
def getReceiver(): Receiver[(K, V)] = {
58-
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
55+
if (!reliableStoreEnabled) {
56+
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
57+
.asInstanceOf[Receiver[(K, V)]]
58+
} else {
59+
new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
5960
.asInstanceOf[Receiver[(K, V)]]
6061
}
6162
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.kafka
19+
20+
import scala.collection.Map
21+
import scala.reflect.{classTag, ClassTag}
22+
23+
import java.util.Properties
24+
import java.util.concurrent.Executors
25+
26+
import kafka.consumer._
27+
import kafka.serializer.Decoder
28+
import kafka.utils.VerifiableProperties
29+
import kafka.utils.ZKStringSerializer
30+
import org.I0Itec.zkclient._
31+
32+
import org.apache.spark.Logging
33+
import org.apache.spark.storage.StorageLevel
34+
import org.apache.spark.streaming.receiver.Receiver
35+
36+
private[streaming]
37+
class KafkaReceiver[
38+
K: ClassTag,
39+
V: ClassTag,
40+
U <: Decoder[_]: ClassTag,
41+
T <: Decoder[_]: ClassTag](
42+
kafkaParams: Map[String, String],
43+
topics: Map[String, Int],
44+
storageLevel: StorageLevel
45+
) extends Receiver[Any](storageLevel) with Logging {
46+
47+
// Connection to Kafka
48+
var consumerConnector: ConsumerConnector = null
49+
50+
def onStop() {
51+
if (consumerConnector != null) {
52+
consumerConnector.shutdown()
53+
}
54+
}
55+
56+
def onStart() {
57+
58+
logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))
59+
60+
// Kafka connection properties
61+
val props = new Properties()
62+
kafkaParams.foreach(param => props.put(param._1, param._2))
63+
64+
val zkConnect = kafkaParams("zookeeper.connect")
65+
// Create the connection to the cluster
66+
logInfo("Connecting to Zookeeper: " + zkConnect)
67+
val consumerConfig = new ConsumerConfig(props)
68+
consumerConnector = Consumer.create(consumerConfig)
69+
logInfo("Connected to " + zkConnect)
70+
71+
// When auto.offset.reset is defined, it is our responsibility to try and whack the
72+
// consumer group zk node.
73+
if (kafkaParams.contains("auto.offset.reset")) {
74+
tryZookeeperConsumerGroupCleanup(zkConnect, kafkaParams("group.id"))
75+
}
76+
77+
val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
78+
.newInstance(consumerConfig.props)
79+
.asInstanceOf[Decoder[K]]
80+
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
81+
.newInstance(consumerConfig.props)
82+
.asInstanceOf[Decoder[V]]
83+
84+
// Create Threads for each Topic/Message Stream we are listening
85+
val topicMessageStreams = consumerConnector.createMessageStreams(
86+
topics, keyDecoder, valueDecoder)
87+
88+
val executorPool = Executors.newFixedThreadPool(topics.values.sum)
89+
try {
90+
// Start the messages handler for each partition
91+
topicMessageStreams.values.foreach { streams =>
92+
streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }
93+
}
94+
} finally {
95+
executorPool.shutdown() // Just causes threads to terminate after work is done
96+
}
97+
}
98+
99+
// Handles Kafka Messages
100+
private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
101+
extends Runnable {
102+
def run() {
103+
logInfo("Starting MessageHandler.")
104+
try {
105+
for (msgAndMetadata <- stream) {
106+
store((msgAndMetadata.key, msgAndMetadata.message))
107+
}
108+
} catch {
109+
case e: Throwable => logError("Error handling message; exiting", e)
110+
}
111+
}
112+
}
113+
114+
// It is our responsibility to delete the consumer group when specifying auto.offset.reset. This
115+
// is because Kafka 0.7.2 only honors this param when the group is not in zookeeper.
116+
//
117+
// The kafka high level consumer doesn't expose setting offsets currently, this is a trick copied
118+
// from Kafka's ConsoleConsumer. See code related to 'auto.offset.reset' when it is set to
119+
// 'smallest'/'largest':
120+
// scalastyle:off
121+
// https://github.com/apache/kafka/blob/0.7.2/core/src/main/scala/kafka/consumer/ConsoleConsumer.scala
122+
// scalastyle:on
123+
private def tryZookeeperConsumerGroupCleanup(zkUrl: String, groupId: String) {
124+
val dir = "/consumers/" + groupId
125+
logInfo("Cleaning up temporary Zookeeper data under " + dir + ".")
126+
val zk = new ZkClient(zkUrl, 30*1000, 30*1000, ZKStringSerializer)
127+
try {
128+
zk.deleteRecursive(dir)
129+
} catch {
130+
case e: Throwable => logWarning("Error cleaning up temporary Zookeeper data", e)
131+
} finally {
132+
zk.close()
133+
}
134+
}
135+
}

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ object KafkaUtils {
7070
topics: Map[String, Int],
7171
storageLevel: StorageLevel
7272
): ReceiverInputDStream[(K, V)] = {
73-
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, storageLevel)
73+
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, false, storageLevel)
7474
}
7575

7676
/**
@@ -144,4 +144,71 @@ object KafkaUtils {
144144
createStream[K, V, U, T](
145145
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
146146
}
147+
148+
def createReliableStream(
149+
ssc: StreamingContext,
150+
zkQuorum: String,
151+
groupId: String,
152+
topics: Map[String, Int],
153+
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2)
154+
: ReceiverInputDStream[(String, String)] = {
155+
val kafkaParams = Map[String, String](
156+
"zookeeper.connect" -> zkQuorum, "group.id" -> groupId,
157+
"zookeeper.connection.timeout.ms" -> "10000")
158+
createReliableStream[String, String, StringDecoder, StringDecoder](
159+
ssc, kafkaParams, topics, storageLevel)
160+
}
161+
162+
def createReliableStream[
163+
K: ClassTag,
164+
V: ClassTag,
165+
U <: Decoder[_]: ClassTag,
166+
T <: Decoder[_]: ClassTag](
167+
ssc: StreamingContext,
168+
kafkaParams: Map[String, String],
169+
topics: Map[String, Int],
170+
storageLevel: StorageLevel
171+
): ReceiverInputDStream[(K, V)] = {
172+
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, true, storageLevel)
173+
}
174+
175+
def createReliableStream(
176+
jssc: JavaStreamingContext,
177+
zkQuorum: String,
178+
groupId: String,
179+
topics: JMap[String, JInt]
180+
): JavaPairReceiverInputDStream[String, String] = {
181+
createReliableStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
182+
}
183+
184+
def createReliableStream(
185+
jssc: JavaStreamingContext,
186+
zkQuorum: String,
187+
groupId: String,
188+
topics: JMap[String, JInt],
189+
storageLevel: StorageLevel
190+
): JavaPairReceiverInputDStream[String, String] = {
191+
createReliableStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*),
192+
storageLevel)
193+
}
194+
195+
def createReliableStream[K, V, U <: Decoder[_], T <: Decoder[_]](
196+
jssc: JavaStreamingContext,
197+
keyTypeClass: Class[K],
198+
valueTypeClass: Class[V],
199+
keyDecoderClass: Class[U],
200+
valueDecoderClass: Class[T],
201+
kafkaParams: JMap[String, String],
202+
topics: JMap[String, JInt],
203+
storageLevel: StorageLevel
204+
): JavaPairReceiverInputDStream[K, V] = {
205+
implicit val keyCmt: ClassTag[K] = ClassTag(keyTypeClass)
206+
implicit val valueCmt: ClassTag[V] = ClassTag(valueTypeClass)
207+
208+
implicit val keyCmd: ClassTag[U] = ClassTag(keyDecoderClass)
209+
implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass)
210+
211+
createReliableStream[K, V, U, T](
212+
jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
213+
}
147214
}

0 commit comments

Comments
 (0)