Skip to content

Commit a906743

Browse files
committed
Using 0.9.0 consumer in code and tests - related to #17.
1 parent 4b435d2 commit a906743

File tree

3 files changed

+55
-66
lines changed

3 files changed

+55
-66
lines changed

src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,22 @@ import java.util.Properties
55
import java.util.concurrent.Executors
66

77
import kafka.admin.AdminUtils
8-
import kafka.consumer.{Consumer, ConsumerConfig, Whitelist}
9-
import kafka.serializer.{Decoder, StringDecoder}
108
import kafka.server.{KafkaConfig, KafkaServer}
119
import kafka.utils.ZkUtils
10+
import org.apache.kafka.clients.consumer.KafkaConsumer
1211
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
13-
import org.apache.kafka.common.serialization.{Serializer, StringSerializer}
12+
import org.apache.kafka.common.KafkaException
13+
import org.apache.kafka.common.serialization.{Deserializer, Serializer, StringDeserializer, StringSerializer}
1414
import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}
1515
import org.scalatest.Suite
1616

17+
import scala.collection.JavaConversions
1718
import scala.collection.JavaConversions.mapAsJavaMap
18-
import scala.concurrent._
1919
import scala.concurrent.duration._
20+
import scala.concurrent.{ExecutionContext, TimeoutException}
2021
import scala.language.{higherKinds, postfixOps}
2122
import scala.reflect.io.Directory
2223
import scala.util.Try
23-
import scala.util.control.NonFatal
2424

2525
trait EmbeddedKafka extends EmbeddedKafkaSupport {
2626
this: Suite =>
@@ -77,14 +77,14 @@ sealed trait EmbeddedKafkaSupport {
7777
val executorService = Executors.newFixedThreadPool(2)
7878
implicit val executionContext = ExecutionContext.fromExecutorService(executorService)
7979

80-
val zkSessionTimeoutMs = 10000
80+
val zkSessionTimeoutMs = 10000
8181
val zkConnectionTimeoutMs = 10000
8282
val zkSecurityEnabled = false
8383

8484
/**
8585
* Starts a ZooKeeper instance and a Kafka broker, then executes the body passed as a parameter.
8686
*
87-
* @param body the function to execute
87+
* @param body the function to execute
8888
* @param config an implicit [[EmbeddedKafkaConfig]]
8989
*/
9090
def withRunningKafka(body: => Unit)(implicit config: EmbeddedKafkaConfig) = {
@@ -104,9 +104,9 @@ sealed trait EmbeddedKafkaSupport {
104104
* Publishes synchronously a message of type [[String]] to the running Kafka broker.
105105
*
106106
* @see [[EmbeddedKafka#publishToKafka]]
107-
* @param topic the topic to which publish the message (it will be auto-created)
107+
* @param topic the topic to which publish the message (it will be auto-created)
108108
* @param message the [[String]] message to publish
109-
* @param config an implicit [[EmbeddedKafkaConfig]]
109+
* @param config an implicit [[EmbeddedKafkaConfig]]
110110
* @throws KafkaUnavailableException if unable to connect to Kafka
111111
*/
112112
def publishStringMessageToKafka(topic: String, message: String)(implicit config: EmbeddedKafkaConfig): Unit =
@@ -115,9 +115,9 @@ sealed trait EmbeddedKafkaSupport {
115115
/**
116116
* Publishes synchronously a message to the running Kafka broker.
117117
*
118-
* @param topic the topic to which publish the message (it will be auto-created)
119-
* @param message the message of type [[T]] to publish
120-
* @param config an implicit [[EmbeddedKafkaConfig]]
118+
* @param topic the topic to which publish the message (it will be auto-created)
119+
* @param message the message of type [[T]] to publish
120+
* @param config an implicit [[EmbeddedKafkaConfig]]
121121
* @param serializer an implicit [[Serializer]] for the type [[T]]
122122
* @throws KafkaUnavailableException if unable to connect to Kafka
123123
*/
@@ -144,48 +144,46 @@ sealed trait EmbeddedKafkaSupport {
144144
}
145145

146146
def consumeFirstStringMessageFrom(topic: String)(implicit config: EmbeddedKafkaConfig): String =
147-
consumeFirstMessageFrom(topic)(config, new StringDecoder())
147+
consumeFirstMessageFrom(topic)(config, new StringDeserializer())
148148

149149

150150
/**
151151
* Consumes the first message available in a given topic, deserializing it as a String.
152152
*
153-
* @param topic the topic to consume a message from
154-
* @param config an implicit [[EmbeddedKafkaConfig]]
155-
* @param decoder an implicit [[Decoder]] for the type [[T]]
153+
* @param topic the topic to consume a message from
154+
* @param config an implicit [[EmbeddedKafkaConfig]]
155+
* @param deserializer an implicit [[org.apache.kafka.common.serialization.Deserializer]] for the type [[T]]
156156
* @return the first message consumed from the given topic, with a type [[T]]
157157
* @throws TimeoutException if unable to consume a message within 5 seconds
158158
* @throws KafkaUnavailableException if unable to connect to Kafka
159159
*/
160160
@throws(classOf[TimeoutException])
161161
@throws(classOf[KafkaUnavailableException])
162-
def consumeFirstMessageFrom[T](topic: String)(implicit config: EmbeddedKafkaConfig, decoder: Decoder[T]): T = {
162+
def consumeFirstMessageFrom[T](topic: String)(implicit config: EmbeddedKafkaConfig, deserializer: Deserializer[T]): T = {
163+
164+
import scala.collection.JavaConversions._
165+
163166
val props = new Properties()
164167
props.put("group.id", s"embedded-kafka-spec")
165-
props.put("zookeeper.connect", s"localhost:${config.zooKeeperPort}")
166-
props.put("auto.offset.reset", "smallest")
167-
props.put("zookeeper.connection.timeout.ms", "6000")
168-
169-
val consumer =
170-
try Consumer.create(new ConsumerConfig(props))
171-
catch {
172-
case NonFatal(e) =>
173-
throw new KafkaUnavailableException(e)
174-
}
168+
props.put("bootstrap.servers", s"localhost:${config.kafkaPort}")
169+
props.put("auto.offset.reset", "earliest")
175170

176-
val messageStreams =
177-
consumer.createMessageStreamsByFilter(Whitelist(topic), keyDecoder = new StringDecoder, valueDecoder = decoder)
171+
val consumer = new KafkaConsumer[String, T](props, new StringDeserializer, deserializer)
178172

179-
val messageFuture = Future {
180-
messageStreams.headOption
181-
.getOrElse(throw new KafkaSpecException("Unable to find a message stream")).iterator().next().message()
173+
val message = Try {
174+
consumer.subscribe(List(topic))
175+
consumer.partitionsFor(topic) // as poll doesn't honour the timeout, forcing the consumer to fail here.
176+
val records = consumer.poll(5000)
177+
if (records.isEmpty) {
178+
throw new TimeoutException("Unable to retrieve a message from Kafka in 5000ms")
179+
}
180+
records.iterator().next().value()
182181
}
183182

184-
try {
185-
Await.result(messageFuture, 5 seconds)
186-
} finally {
187-
consumer.shutdown()
188-
}
183+
consumer.close()
184+
message.recover {
185+
case ex: KafkaException => throw new KafkaUnavailableException(ex)
186+
}.get
189187
}
190188

191189
object aKafkaProducer {
Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package net.manub.embeddedkafka
22

3-
class KafkaUnavailableException(cause: Throwable) extends RuntimeException(cause)
3+
class KafkaUnavailableException(msg: String, cause: Throwable) extends RuntimeException(msg, cause) {
4+
def this(msg: String) = this(msg, null)
5+
def this(cause: Throwable) = this(null, cause)
6+
}
47

58
class KafkaSpecException(msg: String) extends RuntimeException(msg)

src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaSpec.scala

Lines changed: 16 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,14 @@ import java.util.Properties
44
import java.util.concurrent.TimeoutException
55

66
import kafka.admin.AdminUtils
7-
import kafka.consumer.{Consumer, ConsumerConfig, Whitelist}
8-
import kafka.serializer.StringDecoder
97
import kafka.utils.ZkUtils
8+
import org.apache.kafka.clients.consumer.KafkaConsumer
109
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
11-
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
10+
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringDeserializer, StringSerializer}
1211
import org.scalatest.exceptions.TestFailedException
1312
import org.scalatest.time.{Milliseconds, Seconds, Span}
1413

1514
import scala.collection.JavaConversions._
16-
import scala.concurrent.ExecutionContext.Implicits.global
17-
import scala.concurrent.Future
1815
import scala.language.postfixOps
1916

2017
class EmbeddedKafkaSpec extends EmbeddedKafkaSpecSupport with EmbeddedKafka {
@@ -85,27 +82,17 @@ class EmbeddedKafkaSpec extends EmbeddedKafkaSpecSupport with EmbeddedKafka {
8582

8683
publishStringMessageToKafka(topic, message)
8784

88-
val consumer = Consumer.create(consumerConfigForEmbeddedKafka)
85+
val consumer = new KafkaConsumer[String, String](consumerProps, new StringDeserializer, new StringDeserializer)
86+
consumer.subscribe(List("test_topic"))
8987

90-
val filter = new Whitelist("test_topic")
91-
val stringDecoder = new StringDecoder
88+
val records = consumer.poll(ConsumerPollTimeout)
9289

93-
val messageStreams = consumer.createMessageStreamsByFilter(filter, 1, stringDecoder, stringDecoder)
90+
records.iterator().hasNext shouldBe true
91+
val record = records.iterator().next()
9492

95-
val eventualMessage = Future {
96-
messageStreams
97-
.headOption
98-
.getOrElse(throw new RuntimeException("Unable to retrieve message streams"))
99-
.iterator()
100-
.next()
101-
.message()
102-
}
103-
104-
whenReady(eventualMessage) { msg =>
105-
msg shouldBe message
106-
}
93+
record.value() shouldBe message
10794

108-
consumer.shutdown()
95+
consumer.close()
10996
}
11097
}
11198

@@ -184,7 +171,7 @@ class EmbeddedKafkaSpec extends EmbeddedKafkaSpecSupport with EmbeddedKafka {
184171

185172
val message = TestAvroClass("name")
186173
val topic = "test_topic"
187-
implicit val testAvroClassDecoder = specificAvroDecoder[TestAvroClass](TestAvroClass.SCHEMA$)
174+
implicit val testAvroClassDecoder = specificAvroDeserializer[TestAvroClass](TestAvroClass.SCHEMA$)
188175

189176
val producer = new KafkaProducer[String, TestAvroClass](Map(
190177
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001"
@@ -243,12 +230,13 @@ class EmbeddedKafkaSpec extends EmbeddedKafkaSpecSupport with EmbeddedKafka {
243230
}
244231
}
245232

246-
lazy val consumerConfigForEmbeddedKafka: ConsumerConfig = {
233+
lazy val consumerProps: Properties = {
247234
val props = new Properties()
248235
props.put("group.id", "test")
249-
props.put("zookeeper.connect", "localhost:6000")
250-
props.put("auto.offset.reset", "smallest")
251-
252-
new ConsumerConfig(props)
236+
props.put("bootstrap.servers", "localhost:6001")
237+
props.put("auto.offset.reset", "earliest")
238+
props
253239
}
240+
241+
val ConsumerPollTimeout = 3000
254242
}

0 commit comments

Comments
 (0)