Skip to content

Allow consumeFirstStringMessageFrom to be called multiple times #61

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import java.util.concurrent.Executors
import kafka.admin.AdminUtils
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.utils.ZkUtils
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.{KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.serialization.{Deserializer, Serializer, StringDeserializer, StringSerializer}
import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}
import org.scalatest.Suite
Expand Down Expand Up @@ -217,6 +217,8 @@ sealed trait EmbeddedKafkaSupport {
/**
* Consumes the first message available in a given topic, deserializing it as a String.
*
* Only the messsage that is returned is committed if config.autoCommit is false. If config.autoCommit is true then all messages that were polled will be committed.
*
* @param topic the topic to consume a message from
* @param config an implicit [[EmbeddedKafkaConfig]]
* @param deserializer an implicit [[org.apache.kafka.common.serialization.Deserializer]] for the type [[T]]
Expand All @@ -236,6 +238,7 @@ sealed trait EmbeddedKafkaSupport {
props.put("group.id", s"embedded-kafka-spec")
props.put("bootstrap.servers", s"localhost:${config.kafkaPort}")
props.put("auto.offset.reset", "earliest")
props.put("enable.auto.commit", s"${config.autoCommit}")

val consumer =
new KafkaConsumer[String, T](props, new StringDeserializer, deserializer)
Expand All @@ -248,7 +251,14 @@ sealed trait EmbeddedKafkaSupport {
throw new TimeoutException(
"Unable to retrieve a message from Kafka in 5000ms")
}
records.iterator().next().value()

val record = records.iterator().next()

val tp = new TopicPartition(record.topic(), record.partition())
val om = new OffsetAndMetadata(record.offset() + 1)
consumer.commitSync(Map(tp -> om))

record.value()
}

consumer.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ package net.manub.embeddedkafka

case class EmbeddedKafkaConfig(kafkaPort: Int = 6001,
zooKeeperPort: Int = 6000,
customBrokerProperties: Map[String, String] = Map.empty)
customBrokerProperties: Map[String, String] = Map.empty,
autoCommit: Boolean = false)

object EmbeddedKafkaConfig {
implicit val defaultConfig = EmbeddedKafkaConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ class EmbeddedKafkaMethodsSpec extends EmbeddedKafkaSpecSupport with EmbeddedKaf
"publish synchronously a String message to Kafka" in {
implicit val serializer = new StringSerializer()
val message = "hello world!"
val topic = "test_topic"
val topic = "publish_test_topic"

publishToKafka(topic, message)

val consumer = new KafkaConsumer[String, String](consumerProps, new StringDeserializer, new StringDeserializer)
consumer.subscribe(List("test_topic"))
consumer.subscribe(List(topic))

val records = consumer.poll(ConsumerPollTimeout)

Expand All @@ -50,12 +50,12 @@ class EmbeddedKafkaMethodsSpec extends EmbeddedKafkaSpecSupport with EmbeddedKaf
implicit val serializer = new StringSerializer()
val key = "key"
val message = "hello world!"
val topic = "test_topic"
val topic = "publish_test_topic"

publishToKafka(topic, key, message)

val consumer = new KafkaConsumer[String, String](consumerProps, new StringDeserializer, new StringDeserializer)
consumer.subscribe(List("test_topic"))
consumer.subscribe(List(topic))

val records = consumer.poll(ConsumerPollTimeout)

Expand Down Expand Up @@ -111,7 +111,7 @@ class EmbeddedKafkaMethodsSpec extends EmbeddedKafkaSpecSupport with EmbeddedKaf
"the consumeFirstStringMessageFrom method" should {
"return a message published to a topic" in {
val message = "hello world!"
val topic = "test_topic"
val topic = "consume_test_topic"

val producer = new KafkaProducer[String, String](Map(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001",
Expand All @@ -126,9 +126,34 @@ class EmbeddedKafkaMethodsSpec extends EmbeddedKafkaSpecSupport with EmbeddedKaf
producer.close()
}

"consume only a single message when multiple messages have been published to a topic" in {
val messages = Set("message 1", "message 2", "message 3")
val topic = "consume_test_topic"

val producer = new KafkaProducer[String, String](Map(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer].getName
))

messages.foreach { message =>
producer.send(new ProducerRecord[String, String](topic, message))
}

producer.flush()

val consumedMessages = for (i <- 1 to messages.size) yield {
consumeFirstStringMessageFrom(topic)
}

consumedMessages.toSet shouldEqual messages

producer.close()
}

"return a message published to a topic with implicit decoder" in {
val message = "hello world!"
val topic = "test_topic"
val topic = "consume_test_topic"

val producer = new KafkaProducer[String, String](Map(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> s"localhost:6001",
Expand All @@ -149,7 +174,7 @@ class EmbeddedKafkaMethodsSpec extends EmbeddedKafkaSpecSupport with EmbeddedKaf
import avro._

val message = TestAvroClass("name")
val topic = "test_topic"
val topic = "consume_test_topic"
implicit val testAvroClassDecoder = specificAvroDeserializer[TestAvroClass](TestAvroClass.SCHEMA$)

val producer = new KafkaProducer[String, TestAvroClass](Map(
Expand Down