Skip to content

Commit 96251d7

Browse files
author
Alex Adriaanse
committed
Allow consumeFirstStringMessageFrom to be called multiple times
In v0.5.0 you were able to call consumeFirstStringMessageFrom multiple times, with each invocation causing the next message in a topic to be consumed. In v0.10.0 this was no longer the case: the first invocation works fine, but subsequent invocations would result in exceptions as the first invocation would cause *all* messages to be committed even though only the first one is returned, leaving no messages remaining to be consumed by subsequent calls. This regression was likely introduced by commit a906743. This commit fixes this by disabling auto-commit by default and simply committing only the message that consumeFirstStringMessageFrom returns.
1 parent 4925ecd commit 96251d7

File tree

3 files changed

+47
-11
lines changed

3 files changed

+47
-11
lines changed

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ import java.util.concurrent.Executors
77
import kafka.admin.AdminUtils
88
import kafka.server.{KafkaConfig, KafkaServer}
99
import kafka.utils.ZkUtils
10-
import org.apache.kafka.clients.consumer.KafkaConsumer
10+
import org.apache.kafka.clients.consumer.{KafkaConsumer, OffsetAndMetadata}
1111
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
12-
import org.apache.kafka.common.KafkaException
12+
import org.apache.kafka.common.{KafkaException, TopicPartition}
1313
import org.apache.kafka.common.serialization.{Deserializer, Serializer, StringDeserializer, StringSerializer}
1414
import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}
1515
import org.scalatest.Suite
@@ -217,6 +217,8 @@ sealed trait EmbeddedKafkaSupport {
217217
/**
218218
* Consumes the first message available in a given topic, deserializing it as a String.
219219
*
220+
* Only the messsage that is returned is committed if config.autoCommit is false. If config.autoCommit is true then all messages that were polled will be committed.
221+
*
220222
* @param topic the topic to consume a message from
221223
* @param config an implicit [[EmbeddedKafkaConfig]]
222224
* @param deserializer an implicit [[org.apache.kafka.common.serialization.Deserializer]] for the type [[T]]
@@ -236,6 +238,7 @@ sealed trait EmbeddedKafkaSupport {
236238
props.put("group.id", s"embedded-kafka-spec")
237239
props.put("bootstrap.servers", s"localhost:${config.kafkaPort}")
238240
props.put("auto.offset.reset", "earliest")
241+
props.put("enable.auto.commit", s"${config.autoCommit}")
239242

240243
val consumer =
241244
new KafkaConsumer[String, T](props, new StringDeserializer, deserializer)
@@ -248,7 +251,14 @@ sealed trait EmbeddedKafkaSupport {
248251
throw new TimeoutException(
249252
"Unable to retrieve a message from Kafka in 5000ms")
250253
}
251-
records.iterator().next().value()
254+
255+
val record = records.iterator().next()
256+
257+
val tp = new TopicPartition(record.topic(), record.partition())
258+
val om = new OffsetAndMetadata(record.offset() + 1)
259+
consumer.commitSync(Map(tp -> om))
260+
261+
record.value()
252262
}
253263

254264
consumer.close()

embedded-kafka/src/main/scala/net/manub/embeddedkafka/EmbeddedKafkaConfig.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@ package net.manub.embeddedkafka
22

33
case class EmbeddedKafkaConfig(kafkaPort: Int = 6001,
44
zooKeeperPort: Int = 6000,
5-
customBrokerProperties: Map[String, String] = Map.empty)
5+
customBrokerProperties: Map[String, String] = Map.empty,
6+
autoCommit: Boolean = false)
67

78
object EmbeddedKafkaConfig {
89
implicit val defaultConfig = EmbeddedKafkaConfig()

embedded-kafka/src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaMethodsSpec.scala

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,12 @@ class EmbeddedKafkaMethodsSpec extends EmbeddedKafkaSpecSupport with EmbeddedKaf
2828
"publish synchronously a String message to Kafka" in {
2929
implicit val serializer = new StringSerializer()
3030
val message = "hello world!"
31-
val topic = "test_topic"
31+
val topic = "publish_test_topic"
3232

3333
publishToKafka(topic, message)
3434

3535
val consumer = new KafkaConsumer[String, String](consumerProps, new StringDeserializer, new StringDeserializer)
36-
consumer.subscribe(List("test_topic"))
36+
consumer.subscribe(List(topic))
3737

3838
val records = consumer.poll(ConsumerPollTimeout)
3939

@@ -50,12 +50,12 @@ class EmbeddedKafkaMethodsSpec extends EmbeddedKafkaSpecSupport with EmbeddedKaf
5050
implicit val serializer = new StringSerializer()
5151
val key = "key"
5252
val message = "hello world!"
53-
val topic = "test_topic"
53+
val topic = "publish_test_topic"
5454

5555
publishToKafka(topic, key, message)
5656

5757
val consumer = new KafkaConsumer[String, String](consumerProps, new StringDeserializer, new StringDeserializer)
58-
consumer.subscribe(List("test_topic"))
58+
consumer.subscribe(List(topic))
5959

6060
val records = consumer.poll(ConsumerPollTimeout)
6161

@@ -111,7 +111,7 @@ class EmbeddedKafkaMethodsSpec extends EmbeddedKafkaSpecSupport with EmbeddedKaf
111111
"the consumeFirstStringMessageFrom method" should {
112112
"return a message published to a topic" in {
113113
val message = "hello world!"
114-
val topic = "test_topic"
114+
val topic = "consume_test_topic"
115115

116116
val producer = new KafkaProducer[String, String](Map(
117117
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001",
@@ -126,9 +126,34 @@ class EmbeddedKafkaMethodsSpec extends EmbeddedKafkaSpecSupport with EmbeddedKaf
126126
producer.close()
127127
}
128128

129+
"consume only a single message when multiple messages have been published to a topic" in {
130+
val messages = Set("message 1", "message 2", "message 3")
131+
val topic = "consume_test_topic"
132+
133+
val producer = new KafkaProducer[String, String](Map(
134+
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001",
135+
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
136+
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName
137+
))
138+
139+
messages.foreach { message =>
140+
producer.send(new ProducerRecord[String, String](topic, message))
141+
}
142+
143+
producer.flush()
144+
145+
val consumedMessages = for (i <- 1 to messages.size) yield {
146+
consumeFirstStringMessageFrom(topic)
147+
}
148+
149+
consumedMessages.toSet shouldEqual messages
150+
151+
producer.close()
152+
}
153+
129154
"return a message published to a topic with implicit decoder" in {
130155
val message = "hello world!"
131-
val topic = "test_topic"
156+
val topic = "consume_test_topic"
132157

133158
val producer = new KafkaProducer[String, String](Map(
134159
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001",
@@ -149,7 +174,7 @@ class EmbeddedKafkaMethodsSpec extends EmbeddedKafkaSpecSupport with EmbeddedKaf
149174
import avro._
150175

151176
val message = TestAvroClass("name")
152-
val topic = "test_topic"
177+
val topic = "consume_test_topic"
153178
implicit val testAvroClassDecoder = specificAvroDeserializer[TestAvroClass](TestAvroClass.SCHEMA$)
154179

155180
val producer = new KafkaProducer[String, TestAvroClass](Map(

0 commit comments

Comments
 (0)