Skip to content

Commit ec2e95e

Browse files
committed
Removed the receiver's locks and essentially reverted to Saisai's original design.
1 parent 2a20a01 commit ec2e95e

File tree

7 files changed

+157
-98
lines changed

7 files changed

+157
-98
lines changed

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/ReliableKafkaReceiver.scala

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -178,17 +178,23 @@ class ReliableKafkaReceiver[
178178

179179
/** Store a Kafka message and the associated metadata as a tuple. */
180180
private def storeMessageAndMetadata(
181-
msgAndMetadata: MessageAndMetadata[K, V]): Unit = synchronized {
181+
msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
182182
val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition)
183-
blockGenerator += ((msgAndMetadata.key, msgAndMetadata.message))
184-
topicPartitionOffsetMap.put(topicAndPartition, msgAndMetadata.offset)
183+
val data = (msgAndMetadata.key, msgAndMetadata.message)
184+
val metadata = (topicAndPartition, msgAndMetadata.offset)
185+
blockGenerator.addDataWithCallback(data, metadata)
186+
}
187+
188+
/** Update stored offset */
189+
private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = {
190+
topicPartitionOffsetMap.put(topicAndPartition, offset)
185191
}
186192

187193
/**
188194
* Remember the current offsets for each topic and partition. This is called when a block is
189195
* generated.
190196
*/
191-
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = synchronized {
197+
private def rememberBlockOffsets(blockId: StreamBlockId): Unit = {
192198
// Get a snapshot of current offset map and store with related block id.
193199
val offsetSnapshot = topicPartitionOffsetMap.toMap
194200
blockOffsetMap.put(blockId, offsetSnapshot)
@@ -250,17 +256,25 @@ class ReliableKafkaReceiver[
250256
/** Class to handle blocks generated by the block generator. */
251257
private final class GeneratedBlockHandler extends BlockGeneratorListener {
252258

253-
override def onGenerateBlock(blockId: StreamBlockId): Unit = {
259+
def onAddData(data: Any, metadata: Any): Unit = {
260+
// Update the offset of the data that was added to the generator
261+
if (metadata != null) {
262+
val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
263+
updateOffset(topicAndPartition, offset)
264+
}
265+
}
266+
267+
def onGenerateBlock(blockId: StreamBlockId): Unit = {
254268
// Remember the offsets of topics/partitions when a block has been generated
255269
rememberBlockOffsets(blockId)
256270
}
257271

258-
override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
272+
def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
259273
// Store block and commit the blocks offset
260274
storeBlockAndCommitOffset(blockId, arrayBuffer)
261275
}
262276

263-
override def onError(message: String, throwable: Throwable): Unit = {
277+
def onError(message: String, throwable: Throwable): Unit = {
264278
reportError(message, throwable)
265279
}
266280
}

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import java.util.List;
2323
import java.util.Random;
2424

25+
import org.apache.spark.SparkConf;
26+
import org.apache.spark.streaming.Duration;
2527
import scala.Predef;
2628
import scala.Tuple2;
2729
import scala.collection.JavaConverters;
@@ -43,23 +45,25 @@
4345

4446
public class JavaKafkaStreamSuite implements Serializable {
4547
private transient JavaStreamingContext ssc = null;
46-
private Random random = new Random();
48+
private transient Random random = new Random();
4749
private transient KafkaStreamSuiteBase suiteBase = null;
4850

4951
@Before
5052
public void setUp() {
5153
suiteBase = new KafkaStreamSuiteBase() { };
52-
suiteBase.beforeFunction();
54+
suiteBase.setupKafka();
5355
System.clearProperty("spark.driver.port");
54-
ssc = new JavaStreamingContext(suiteBase.sparkConf(), suiteBase.batchDuration());
56+
SparkConf sparkConf = new SparkConf()
57+
.setMaster("local[4]").setAppName(this.getClass().getSimpleName());
58+
ssc = new JavaStreamingContext(sparkConf, new Duration(500));
5559
}
5660

5761
@After
5862
public void tearDown() {
5963
ssc.stop();
6064
ssc = null;
6165
System.clearProperty("spark.driver.port");
62-
suiteBase.afterFunction();
66+
suiteBase.tearDownKafka();
6367
}
6468

6569
@Test
@@ -76,8 +80,8 @@ public void testKafkaStream() throws InterruptedException {
7680
suiteBase.createTopic(topic);
7781
HashMap<String, Object> tmp = new HashMap<String, Object>(sent);
7882
suiteBase.produceAndSendMessage(topic,
79-
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
80-
Predef.<Tuple2<String, Object>>conforms()));
83+
JavaConverters.mapAsScalaMapConverter(tmp).asScala().toMap(
84+
Predef.<Tuple2<String, Object>>conforms()));
8185

8286
HashMap<String, String> kafkaParams = new HashMap<String, String>();
8387
kafkaParams.put("zookeeper.connect", suiteBase.zkAddress());

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,13 @@ import org.apache.spark.storage.StorageLevel
4242
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
4343
import org.apache.spark.util.Utils
4444

45+
/**
46+
* This is an abstract base class for Kafka testsuites. This has the functionality to set up
47+
* and tear down local Kafka servers, and to push data using Kafka producers.
48+
*/
4549
abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
4650
import KafkaTestUtils._
4751

48-
val sparkConf = new SparkConf()
49-
.setMaster("local[4]")
50-
.setAppName(this.getClass.getSimpleName)
51-
val batchDuration = Milliseconds(500)
52-
var ssc: StreamingContext = _
53-
5452
var zkAddress: String = _
5553
var zkClient: ZkClient = _
5654

@@ -64,7 +62,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
6462
private var server: KafkaServer = _
6563
private var producer: Producer[String, String] = _
6664

67-
def beforeFunction() {
65+
def setupKafka() {
6866
// Zookeeper server startup
6967
zookeeper = new EmbeddedZookeeper(s"$zkHost:$zkPort")
7068
// Get the actual zookeeper binding port
@@ -100,12 +98,7 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
10098
logInfo("==================== 4 ====================")
10199
}
102100

103-
def afterFunction() {
104-
if (ssc != null) {
105-
ssc.stop()
106-
ssc = null
107-
}
108-
101+
def tearDownKafka() {
109102
if (producer != null) {
110103
producer.close()
111104
producer = null
@@ -146,21 +139,31 @@ abstract class KafkaStreamSuiteBase extends FunSuite with Logging {
146139

147140
def produceAndSendMessage(topic: String, sent: Map[String, Int]) {
148141
val brokerAddr = brokerConf.hostName + ":" + brokerConf.port
149-
if (producer == null) {
150-
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
151-
}
142+
producer = new Producer[String, String](new ProducerConfig(getProducerConfig(brokerAddr)))
152143
producer.send(createTestMessage(topic, sent): _*)
144+
producer.close()
153145
logInfo("==================== 6 ====================")
154146
}
155147
}
156148

157149
class KafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
150+
var ssc: StreamingContext = _
151+
152+
before {
153+
setupKafka()
154+
}
158155

159-
before { beforeFunction() }
160-
after { afterFunction() }
156+
after {
157+
if (ssc != null) {
158+
ssc.stop()
159+
ssc = null
160+
}
161+
tearDownKafka()
162+
}
161163

162164
test("Kafka input stream") {
163-
ssc = new StreamingContext(sparkConf, batchDuration)
165+
val sparkConf = new SparkConf().setMaster("local[4]").setAppName(this.getClass.getSimpleName)
166+
ssc = new StreamingContext(sparkConf, Milliseconds(500))
164167
val topic = "topic1"
165168
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
166169
createTopic(topic)

external/kafka/src/test/scala/org/apache/spark/streaming/kafka/ReliableKafkaStreamSuite.scala

Lines changed: 52 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -17,57 +17,75 @@
1717

1818
package org.apache.spark.streaming.kafka
1919

20+
2021
import java.io.File
2122

2223
import scala.collection.mutable
2324
import scala.concurrent.duration._
2425
import scala.language.postfixOps
2526
import scala.util.Random
2627

28+
import com.google.common.io.Files
2729
import kafka.serializer.StringDecoder
2830
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
31+
import org.apache.commons.io.FileUtils
2932
import org.scalatest.BeforeAndAfter
3033
import org.scalatest.concurrent.Eventually
3134

35+
import org.apache.spark.SparkConf
3236
import org.apache.spark.storage.StorageLevel
33-
import org.apache.spark.streaming.StreamingContext
34-
import org.apache.spark.util.Utils
37+
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
3538

3639
class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter with Eventually {
37-
val topic = "topic"
40+
41+
val sparkConf = new SparkConf()
42+
.setMaster("local[4]")
43+
.setAppName(this.getClass.getSimpleName)
44+
.set("spark.streaming.receiver.writeAheadLog.enable", "true")
3845
val data = Map("a" -> 10, "b" -> 10, "c" -> 10)
46+
47+
var topic: String = _
3948
var groupId: String = _
4049
var kafkaParams: Map[String, String] = _
50+
var ssc: StreamingContext = _
51+
var tempDirectory: File = null
4152

4253
before {
43-
beforeFunction() // call this first to start ZK and Kafka
54+
setupKafka()
55+
topic = s"test-topic-${Random.nextInt(10000)}"
4456
groupId = s"test-consumer-${Random.nextInt(10000)}"
4557
kafkaParams = Map(
4658
"zookeeper.connect" -> zkAddress,
4759
"group.id" -> groupId,
4860
"auto.offset.reset" -> "smallest"
4961
)
62+
63+
ssc = new StreamingContext(sparkConf, Milliseconds(500))
64+
tempDirectory = Files.createTempDir()
65+
ssc.checkpoint(tempDirectory.getAbsolutePath)
5066
}
5167

5268
after {
53-
afterFunction()
69+
if (ssc != null) {
70+
ssc.stop()
71+
}
72+
if (tempDirectory != null && tempDirectory.exists()) {
73+
FileUtils.deleteDirectory(tempDirectory)
74+
tempDirectory = null
75+
}
76+
tearDownKafka()
5477
}
5578

56-
test("Reliable Kafka input stream") {
57-
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
58-
ssc = new StreamingContext(sparkConf, batchDuration)
59-
val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
60-
s"test-checkpoint${Random.nextInt(10000)}"
61-
Utils.registerShutdownDeleteDir(new File(checkpointDir))
62-
ssc.checkpoint(checkpointDir)
79+
80+
test("Reliable Kafka input stream with single topic") {
6381
createTopic(topic)
6482
produceAndSendMessage(topic, data)
6583

84+
// Verify whether the offset of this group/topic/partition is 0 before starting.
85+
assert(getCommitOffset(groupId, topic, 0) === None)
86+
6687
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
67-
ssc,
68-
kafkaParams,
69-
Map(topic -> 1),
70-
StorageLevel.MEMORY_ONLY)
88+
ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
7189
val result = new mutable.HashMap[String, Long]()
7290
stream.map { case (k, v) => v }.foreachRDD { r =>
7391
val ret = r.collect()
@@ -77,84 +95,64 @@ class ReliableKafkaStreamSuite extends KafkaStreamSuiteBase with BeforeAndAfter
7795
}
7896
}
7997
ssc.start()
80-
eventually(timeout(10000 milliseconds), interval(100 milliseconds)) {
98+
99+
eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
81100
// A basic process verification for ReliableKafkaReceiver.
82101
// Verify whether received message number is equal to the sent message number.
83102
assert(data.size === result.size)
84103
// Verify whether each message is the same as the data to be verified.
85104
data.keys.foreach { k => assert(data(k) === result(k).toInt) }
105+
// Verify the offset number whether it is equal to the total message number.
106+
assert(getCommitOffset(groupId, topic, 0) === Some(29L))
107+
86108
}
87109
ssc.stop()
88110
}
111+
/*
89112
test("Verify the offset commit") {
90113
// Verify the correctness of offset commit mechanism.
91-
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
92-
ssc = new StreamingContext(sparkConf, batchDuration)
93-
val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
94-
s"test-checkpoint${Random.nextInt(10000)}"
95-
Utils.registerShutdownDeleteDir(new File(checkpointDir))
96-
ssc.checkpoint(checkpointDir)
97-
98114
createTopic(topic)
99115
produceAndSendMessage(topic, data)
100116
101-
// Verify whether the offset of this group/topic/partition is 0 before starting.
102-
assert(getCommitOffset(groupId, topic, 0) === 0L)
103-
104117
// Do this to consume all the message of this group/topic.
105118
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
106-
ssc,
107-
kafkaParams,
108-
Map(topic -> 1),
109-
StorageLevel.MEMORY_ONLY)
119+
ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY)
110120
stream.foreachRDD(_ => Unit)
111121
ssc.start()
112-
eventually(timeout(3000 milliseconds), interval(100 milliseconds)) {
113-
// Verify the offset number whether it is equal to the total message number.
114-
assert(getCommitOffset(groupId, topic, 0) === 29L)
122+
eventually(timeout(20000 milliseconds), interval(200 milliseconds)) {
115123
}
116124
ssc.stop()
117125
}
118-
119-
test("Verify multiple topics offset commit") {
120-
sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
121-
ssc = new StreamingContext(sparkConf, batchDuration)
122-
val checkpointDir = s"${System.getProperty("java.io.tmpdir", "/tmp")}/" +
123-
s"test-checkpoint${Random.nextInt(10000)}"
124-
Utils.registerShutdownDeleteDir(new File(checkpointDir))
125-
ssc.checkpoint(checkpointDir)
126-
126+
*/
127+
test("Reliable Kafka input stream with multiple topics") {
127128
val topics = Map("topic1" -> 1, "topic2" -> 1, "topic3" -> 1)
128129
topics.foreach { case (t, _) =>
129130
createTopic(t)
130131
produceAndSendMessage(t, data)
131132
}
132133

133134
// Before started, verify all the group/topic/partition offsets are 0.
134-
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 0L) }
135+
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === None) }
135136

136137
// Consuming all the data sent to the broker which will potential commit the offsets internally.
137138
val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](
138-
ssc,
139-
kafkaParams,
140-
topics,
141-
StorageLevel.MEMORY_ONLY)
139+
ssc, kafkaParams, topics, StorageLevel.MEMORY_ONLY)
142140
stream.foreachRDD(_ => Unit)
143141
ssc.start()
144-
eventually(timeout(3000 milliseconds), interval(100 milliseconds)) {
142+
eventually(timeout(20000 milliseconds), interval(100 milliseconds)) {
145143
// Verify the offset for each group/topic to see whether they are equal to the expected one.
146-
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === 29L) }
144+
topics.foreach { case (t, _) => assert(getCommitOffset(groupId, t, 0) === Some(29L)) }
147145
}
148146
ssc.stop()
149147
}
150148

149+
151150
/** Getting partition offset from Zookeeper. */
152-
private def getCommitOffset(groupId: String, topic: String, partition: Int): Long = {
151+
private def getCommitOffset(groupId: String, topic: String, partition: Int): Option[Long] = {
153152
assert(zkClient != null, "Zookeeper client is not initialized")
154-
155153
val topicDirs = new ZKGroupTopicDirs(groupId, topic)
156154
val zkPath = s"${topicDirs.consumerOffsetDir}/$partition"
157-
158-
ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong).getOrElse(0L)
155+
val offset = ZkUtils.readDataMaybeNull(zkClient, zkPath)._1.map(_.toLong)
156+
offset
159157
}
160158
}

0 commit comments

Comments
 (0)