Skip to content

Commit ea873e4

Browse files
committed
Further address the comments
1 parent 98f3d07 commit ea873e4

File tree

7 files changed

+42
-175
lines changed

7 files changed

+42
-175
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaInputDStream.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,12 @@
1717

1818
package org.apache.spark.streaming.kafka
1919

20+
import java.util.Properties
21+
2022
import scala.collection.Map
21-
import scala.reflect.ClassTag
23+
import scala.reflect.{classTag, ClassTag}
2224

25+
import kafka.consumer.{KafkaStream, Consumer, ConsumerConfig, ConsumerConnector}
2326
import kafka.serializer.Decoder
2427
import kafka.utils.VerifiableProperties
2528

@@ -28,6 +31,7 @@ import org.apache.spark.storage.StorageLevel
2831
import org.apache.spark.streaming.StreamingContext
2932
import org.apache.spark.streaming.dstream._
3033
import org.apache.spark.streaming.receiver.Receiver
34+
import org.apache.spark.util.Utils
3135

3236
/**
3337
* Input stream that pulls messages from a Kafka Broker.
@@ -47,17 +51,16 @@ class KafkaInputDStream[
4751
@transient ssc_ : StreamingContext,
4852
kafkaParams: Map[String, String],
4953
topics: Map[String, Int],
50-
reliableReceiveEnabled: Boolean,
54+
useReliableReceiver: Boolean,
5155
storageLevel: StorageLevel
5256
) extends ReceiverInputDStream[(K, V)](ssc_) with Logging {
5357

5458
def getReceiver(): Receiver[(K, V)] = {
55-
if (!reliableReceiveEnabled) {
59+
if (!useReliableReceiver) {
5660
new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
57-
.asInstanceOf[Receiver[(K, V)]]
5861
} else {
5962
new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel)
60-
.asInstanceOf[Receiver[(K, V)]]
63+
}
6164
}
6265
}
6366

@@ -70,14 +73,15 @@ class KafkaReceiver[
7073
kafkaParams: Map[String, String],
7174
topics: Map[String, Int],
7275
storageLevel: StorageLevel
73-
) extends Receiver[Any](storageLevel) with Logging {
76+
) extends Receiver[(K, V)](storageLevel) with Logging {
7477

7578
// Connection to Kafka
76-
var consumerConnector : ConsumerConnector = null
79+
var consumerConnector: ConsumerConnector = null
7780

7881
def onStop() {
7982
if (consumerConnector != null) {
8083
consumerConnector.shutdown()
84+
consumerConnector = null
8185
}
8286
}
8387

@@ -103,11 +107,11 @@ class KafkaReceiver[
103107
.newInstance(consumerConfig.props)
104108
.asInstanceOf[Decoder[V]]
105109

106-
// Create Threads for each Topic/Message Stream we are listening
110+
// Create threads for each topic/message Stream we are listening
107111
val topicMessageStreams = consumerConnector.createMessageStreams(
108112
topics, keyDecoder, valueDecoder)
109113

110-
val executorPool = Executors.newFixedThreadPool(topics.values.sum)
114+
val executorPool = Utils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")
111115
try {
112116
// Start the messages handler for each partition
113117
topicMessageStreams.values.foreach { streams =>
@@ -118,8 +122,8 @@ class KafkaReceiver[
118122
}
119123
}
120124

121-
// Handles Kafka Messages
122-
private class MessageHandler[K: ClassTag, V: ClassTag](stream: KafkaStream[K, V])
125+
// Handles Kafka messages
126+
private class MessageHandler(stream: KafkaStream[K, V])
123127
extends Runnable {
124128
def run() {
125129
logInfo("Starting MessageHandler.")

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaReceiver.scala

Lines changed: 0 additions & 135 deletions
This file was deleted.

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ object KafkaUtils {
7070
topics: Map[String, Int],
7171
storageLevel: StorageLevel
7272
): ReceiverInputDStream[(K, V)] = {
73-
val WALEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)
74-
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, WALEnabled, storageLevel)
73+
val walEnabled = ssc.conf.getBoolean("spark.streaming.receiver.writeAheadLog.enable", false)
74+
new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel)
7575
}
7676

7777
/**

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,19 @@ import org.apache.spark.storage.{StreamBlockId, StorageLevel}
3535
import org.apache.spark.streaming.receiver.{BlockGeneratorListener, BlockGenerator, Receiver}
3636
import org.apache.spark.util.Utils
3737

38+
39+
/**
40+
* ReliableKafkaReceiver offers the ability to reliably store data into BlockManager without loss.
41+
* It is turned off by default and will be enabled when
42+
* spark.streaming.receiver.writeAheadLog.enable is true. The difference compared to KafkaReceiver
43+
* is that this receiver manages topic-partition/offset itself and updates the offset information
44+
* after data is reliably stored as write-ahead log. Offsets will only be updated when data is
45+
* reliably stored, so the potential data loss problem of KafkaReceiver can be eliminated.
46+
*
47+
* Note: ReliableKafkaReceiver will set auto.commit.enable to false to turn off automatic offset
48+
* commit mechanism in Kafka consumer. So setting this configuration manually within kafkaParams
49+
* will not take effect.
50+
*/
3851
private[streaming]
3952
class ReliableKafkaReceiver[
4053
K: ClassTag,
@@ -44,20 +57,20 @@ class ReliableKafkaReceiver[
4457
kafkaParams: Map[String, String],
4558
topics: Map[String, Int],
4659
storageLevel: StorageLevel)
47-
extends Receiver[Any](storageLevel) with Logging {
60+
extends Receiver[(K, V)](storageLevel) with Logging {
61+
62+
private val groupId = kafkaParams("group.id")
63+
64+
private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
65+
66+
private def conf() = SparkEnv.get.conf
4867

4968
/** High level consumer to connect to Kafka. */
5069
private var consumerConnector: ConsumerConnector = null
5170

5271
/** zkClient to connect to Zookeeper to commit the offsets. */
5372
private var zkClient: ZkClient = null
5473

55-
private val groupId = kafkaParams("group.id")
56-
57-
private def conf() = SparkEnv.get.conf
58-
59-
private val AUTO_OFFSET_COMMIT = "auto.commit.enable"
60-
6174
/**
6275
* A HashMap to manage the offset for each topic/partition, this HashMap is called in
6376
* synchronized block, so mutable HashMap will not meet concurrency issue.
@@ -75,12 +88,6 @@ class ReliableKafkaReceiver[
7588

7689
/** Kafka offsets checkpoint listener to register into BlockGenerator for offsets checkpoint. */
7790
private final class OffsetCheckpointListener extends BlockGeneratorListener {
78-
override def onStoreData(data: Any, metadata: Any): Unit = {
79-
if (metadata != null) {
80-
val kafkaMetadata = metadata.asInstanceOf[(TopicAndPartition, Long)]
81-
topicPartitionOffsetMap.put(kafkaMetadata._1, kafkaMetadata._2)
82-
}
83-
}
8491

8592
override def onGenerateBlock(blockId: StreamBlockId): Unit = {
8693
// Get a snapshot of current offset map and store with related block id. Since this hook
@@ -91,7 +98,7 @@ class ReliableKafkaReceiver[
9198
}
9299

93100
override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
94-
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[Any]])
101+
store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]])
95102

96103
// Commit and remove the related offsets.
97104
Option(blockOffsetMap.get(blockId)).foreach { offsetMap =>
@@ -202,9 +209,10 @@ class ReliableKafkaReceiver[
202209
for (msgAndMetadata <- stream) {
203210
val topicAndPartition = TopicAndPartition(
204211
msgAndMetadata.topic, msgAndMetadata.partition)
205-
val metadata = (topicAndPartition, msgAndMetadata.offset)
206-
207-
blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message), metadata)
212+
blockGenerator.synchronized {
213+
blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message))
214+
topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset)
215+
}
208216
}
209217
} catch {
210218
case e: Throwable => logError("Error handling message; existing", e)

streaming/src/main/scala/org/apache/spark/streaming/receiver/BlockGenerator.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,6 @@ import org.apache.spark.streaming.util.{RecurringTimer, SystemClock}
2727

2828
/** Listener object for BlockGenerator events */
2929
private[streaming] trait BlockGeneratorListener {
30-
/** Called when new data is added into BlockGenerator, this hook function will be called each
31-
* time new data is added into BlockGenerator, any heavy or blocking operation will hurt the
32-
* throughput */
33-
def onStoreData(data: Any, metadata: Any)
34-
3530
/** Called when a new block is generated */
3631
def onGenerateBlock(blockId: StreamBlockId)
3732

@@ -92,7 +87,6 @@ private[streaming] class BlockGenerator(
9287
def += (data: Any, metadata: Any = null): Unit = synchronized {
9388
waitToPush()
9489
currentBuffer += data
95-
listener.onStoreData(data, metadata)
9690
}
9791

9892
/** Change the buffer to which single records are added to. */

streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,6 @@ private[streaming] class ReceiverSupervisorImpl(
9999

100100
/** Divides received data records into data blocks for pushing in BlockManager. */
101101
private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
102-
def onStoreData(data: Any, metadata: Any): Unit = { }
103-
104102
def onGenerateBlock(blockId: StreamBlockId): Unit = { }
105103

106104
def onError(message: String, throwable: Throwable) {

streaming/src/test/scala/org/apache/spark/streaming/ReceiverSuite.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -299,8 +299,6 @@ class ReceiverSuite extends FunSuite with Timeouts {
299299
val arrayBuffers = new ArrayBuffer[ArrayBuffer[Int]]
300300
val errors = new ArrayBuffer[Throwable]
301301

302-
def onStoreData(data: Any, metadata: Any) { }
303-
304302
def onGenerateBlock(blockId: StreamBlockId) { }
305303

306304
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {

0 commit comments

Comments
 (0)