@@ -20,20 +20,21 @@ package org.apache.spark.streaming.kafka
20
20
import java .io .File
21
21
22
22
import scala .collection .mutable
23
+ import scala .collection .mutable .ArrayBuffer
23
24
import scala .concurrent .duration ._
24
25
import scala .language .postfixOps
25
26
27
+ import kafka .common .TopicAndPartition
28
+ import kafka .message .MessageAndMetadata
26
29
import kafka .serializer .StringDecoder
27
30
import org .scalatest .{BeforeAndAfter , BeforeAndAfterAll }
28
- import org .scalatest .concurrent .{ Eventually , Timeouts }
31
+ import org .scalatest .concurrent .Eventually
29
32
30
- import org .apache .spark .{SparkContext , SparkConf }
33
+ import org .apache .spark .{SparkConf , SparkContext }
31
34
import org .apache .spark .rdd .RDD
32
35
import org .apache .spark .streaming .{Milliseconds , StreamingContext , Time }
33
- import org .apache .spark .streaming .dstream .{ DStream , InputDStream }
36
+ import org .apache .spark .streaming .dstream .DStream
34
37
import org .apache .spark .util .Utils
35
- import kafka .common .TopicAndPartition
36
- import kafka .message .MessageAndMetadata
37
38
38
39
class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
39
40
with BeforeAndAfter with BeforeAndAfterAll with Eventually {
@@ -67,13 +68,14 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
67
68
}
68
69
69
70
70
- ignore (" basic stream receiving with multiple topics and smallest starting offset" ) {
71
+ test (" basic stream receiving with multiple topics and smallest starting offset" ) {
71
72
val topics = Set (" basic1" , " basic2" , " basic3" )
72
73
val data = Map (" a" -> 7 , " b" -> 9 )
73
74
topics.foreach { t =>
74
75
createTopic(t)
75
76
sendMessages(t, data)
76
77
}
78
+ val totalSent = data.values.sum * topics.size
77
79
val kafkaParams = Map (
78
80
" metadata.broker.list" -> s " $brokerAddress" ,
79
81
" auto.offset.reset" -> " smallest"
@@ -84,7 +86,8 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
84
86
KafkaUtils .createDirectStream[String , String , StringDecoder , StringDecoder ](
85
87
ssc, kafkaParams, topics)
86
88
}
87
- var total = 0L
89
+
90
+ val allReceived = new ArrayBuffer [(String , String )]
88
91
89
92
stream.foreachRDD { rdd =>
90
93
// Get the offset ranges in the RDD
@@ -104,16 +107,17 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
104
107
collected.foreach { case (partSize, rangeSize) =>
105
108
assert(partSize === rangeSize, " offset ranges are wrong" )
106
109
}
107
- total += collected.size // Add up all the collected items
108
110
}
111
+ stream.foreachRDD { rdd => allReceived ++= rdd.collect() }
109
112
ssc.start()
110
113
eventually(timeout(20000 .milliseconds), interval(200 .milliseconds)) {
111
- assert(total === data.values.sum * topics.size, " didn't get all messages" )
114
+ assert(allReceived.size === totalSent,
115
+ " didn't get expected number of messages, messages:\n " + allReceived.mkString(" \n " ))
112
116
}
113
117
ssc.stop()
114
118
}
115
119
116
- ignore (" receiving from largest starting offset" ) {
120
+ test (" receiving from largest starting offset" ) {
117
121
val topic = " largest"
118
122
val topicPartition = TopicAndPartition (topic, 0 )
119
123
val data = Map (" a" -> 10 )
@@ -158,7 +162,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
158
162
}
159
163
160
164
161
- ignore (" creating stream by offset" ) {
165
+ test (" creating stream by offset" ) {
162
166
val topic = " offset"
163
167
val topicPartition = TopicAndPartition (topic, 0 )
164
168
val data = Map (" a" -> 10 )
@@ -204,7 +208,7 @@ class DirectKafkaStreamSuite extends KafkaStreamSuiteBase
204
208
}
205
209
206
210
// Test to verify the offset ranges can be recovered from the checkpoints
207
- ignore (" offset recovery" ) {
211
+ test (" offset recovery" ) {
208
212
val topic = " recovery"
209
213
createTopic(topic)
210
214
testDir = Utils .createTempDir()
0 commit comments