Skip to content

Commit 4b435d2

Browse files
lucapertilemanub
authored andcommitted
Added a way to create topics and brokers by passing a custom configuration (#15)
* Added a way to create topics and brokers by passing a custom configuration. * moved the local vals to the main trait to avoid multiple instantiation when whe create custom topics
1 parent 3885f74 commit 4b435d2

File tree

3 files changed

+35
-1
lines changed

3 files changed

+35
-1
lines changed

src/main/scala/net/manub/embeddedkafka/EmbeddedKafka.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import java.net.InetSocketAddress
44
import java.util.Properties
55
import java.util.concurrent.Executors
66

7+
import kafka.admin.AdminUtils
78
import kafka.consumer.{Consumer, ConsumerConfig, Whitelist}
89
import kafka.serializer.{Decoder, StringDecoder}
910
import kafka.server.{KafkaConfig, KafkaServer}
11+
import kafka.utils.ZkUtils
1012
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
1113
import org.apache.kafka.common.serialization.{Serializer, StringSerializer}
1214
import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer}
@@ -75,6 +77,10 @@ sealed trait EmbeddedKafkaSupport {
7577
val executorService = Executors.newFixedThreadPool(2)
7678
implicit val executionContext = ExecutionContext.fromExecutorService(executorService)
7779

80+
val zkSessionTimeoutMs = 10000
81+
val zkConnectionTimeoutMs = 10000
82+
val zkSecurityEnabled = false
83+
7884
/**
7985
* Starts a ZooKeeper instance and a Kafka broker, then executes the body passed as a parameter.
8086
*
@@ -226,6 +232,7 @@ sealed trait EmbeddedKafkaSupport {
226232
val zkAddress = s"localhost:${config.zooKeeperPort}"
227233

228234
val properties: Properties = new Properties
235+
config.customBrokerProperties.foreach { case (key, value) => properties.setProperty(key, value) }
229236
properties.setProperty("zookeeper.connect", zkAddress)
230237
properties.setProperty("broker.id", "0")
231238
properties.setProperty("host.name", "localhost")
@@ -239,4 +246,8 @@ sealed trait EmbeddedKafkaSupport {
239246
broker.startup()
240247
broker
241248
}
249+
250+
def createCustomTopic(topic: String, topicConfig: Properties)(implicit config: EmbeddedKafkaConfig): Unit = {
251+
AdminUtils.createTopic(ZkUtils(s"localhost:${config.zooKeeperPort}", zkSessionTimeoutMs, zkConnectionTimeoutMs, zkSecurityEnabled), topic, 1, 1, topicConfig)
252+
}
242253
}

src/main/scala/net/manub/embeddedkafka/EmbeddedKafkaConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package net.manub.embeddedkafka
22

3-
case class EmbeddedKafkaConfig(kafkaPort: Int = 6001, zooKeeperPort: Int = 6000)
3+
case class EmbeddedKafkaConfig(kafkaPort: Int = 6001, zooKeeperPort: Int = 6000, customBrokerProperties: Map[String, String] = Map.empty)
44

55
object EmbeddedKafkaConfig {
66
implicit val defaultConfig = EmbeddedKafkaConfig()

src/test/scala/net/manub/embeddedkafka/EmbeddedKafkaSpec.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package net.manub.embeddedkafka
33
import java.util.Properties
44
import java.util.concurrent.TimeoutException
55

6+
import kafka.admin.AdminUtils
67
import kafka.consumer.{Consumer, ConsumerConfig, Whitelist}
78
import kafka.serializer.StringDecoder
9+
import kafka.utils.ZkUtils
810
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
911
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
1012
import org.scalatest.exceptions.TestFailedException
@@ -114,6 +116,27 @@ class EmbeddedKafkaSpec extends EmbeddedKafkaSpecSupport with EmbeddedKafka {
114116
}
115117
}
116118

119+
"the createCustomTopic method" should {
120+
"create a topic with a custom configuration" in {
121+
implicit val config = EmbeddedKafkaConfig(customBrokerProperties = Map("log.cleaner.dedupe.buffer.size" -> "2000000"))
122+
val topic = "test_custom_topic"
123+
124+
withRunningKafka {
125+
val properties: Properties = new Properties
126+
properties.put("cleanup.policy", "compact")
127+
128+
createCustomTopic(topic, properties)
129+
130+
val zkSessionTimeoutMs = 10000
131+
val zkConnectionTimeoutMs = 10000
132+
val zkSecurityEnabled = false
133+
134+
AdminUtils.topicExists(ZkUtils(s"localhost:${config.zooKeeperPort}", zkSessionTimeoutMs, zkConnectionTimeoutMs, zkSecurityEnabled), topic) shouldBe true
135+
136+
}
137+
}
138+
}
139+
117140
"the consumeFirstStringMessageFrom method" should {
118141
"return a message published to a topic" in {
119142
withRunningKafka {

0 commit comments

Comments
 (0)