Skip to content

Refactored autoCommit for consumeFirstMessage methods. #62

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ This works for both `withRunningKafka` and `EmbeddedKafka.start()`
Also, it is now possible to provide custom properties to the broker while starting Kafka. `EmbeddedKafkaConfig` has a
`customBrokerProperties` field which can be used to provide extra properties contained in a `Map[String, String]`.
Those properties will be added to the broker configuration, be careful some properties are set by the library itself and
in case of conflict your values will take precedence. Please look at the source code to see what these properties
in case of conflict the `customBrokerProperties` values will take precedence. Please look at the source code to see what these properties
are.

## Utility methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,16 +210,20 @@ sealed trait EmbeddedKafkaSupport {
ProducerConfig.RETRY_BACKOFF_MS_CONFIG -> 1000.toString
)

def consumeFirstStringMessageFrom(topic: String)(
def consumeFirstStringMessageFrom(topic: String, autoCommit: Boolean = false)(
implicit config: EmbeddedKafkaConfig): String =
consumeFirstMessageFrom(topic)(config, new StringDeserializer())
consumeFirstMessageFrom(topic, autoCommit)(config, new StringDeserializer())

/**
* Consumes the first message available in a given topic, deserializing it as a String.
*
* Only the messsage that is returned is committed if config.autoCommit is false. If config.autoCommit is true then all messages that were polled will be committed.
* Only the messsage that is returned is committed if autoCommit is false.
* If autoCommit is true then all messages that were polled will be committed.
*
* @param topic the topic to consume a message from
* @param autoCommit if false, only the offset for the consumed message will be commited.
* if true, the offset for the last polled message will be committed instead.
* Defaulted to false.
* @param config an implicit [[EmbeddedKafkaConfig]]
* @param deserializer an implicit [[org.apache.kafka.common.serialization.Deserializer]] for the type [[T]]
* @return the first message consumed from the given topic, with a type [[T]]
Expand All @@ -228,7 +232,7 @@ sealed trait EmbeddedKafkaSupport {
*/
@throws(classOf[TimeoutException])
@throws(classOf[KafkaUnavailableException])
def consumeFirstMessageFrom[T](topic: String)(
def consumeFirstMessageFrom[T](topic: String, autoCommit: Boolean = false)(
implicit config: EmbeddedKafkaConfig,
deserializer: Deserializer[T]): T = {

Expand All @@ -238,7 +242,7 @@ sealed trait EmbeddedKafkaSupport {
props.put("group.id", s"embedded-kafka-spec")
props.put("bootstrap.servers", s"localhost:${config.kafkaPort}")
props.put("auto.offset.reset", "earliest")
props.put("enable.auto.commit", s"${config.autoCommit}")
props.put("enable.auto.commit", autoCommit.toString)

val consumer =
new KafkaConsumer[String, T](props, new StringDeserializer, deserializer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package net.manub.embeddedkafka

case class EmbeddedKafkaConfig(kafkaPort: Int = 6001,
zooKeeperPort: Int = 6000,
customBrokerProperties: Map[String, String] = Map.empty,
autoCommit: Boolean = false)
customBrokerProperties: Map[String, String] = Map.empty)

object EmbeddedKafkaConfig {
implicit val defaultConfig = EmbeddedKafkaConfig()
Expand Down