Skip to content

Commit a4f16c9

Browse files
committed
Support transactions, better listener names in config
Transactions are now possible as only a single broker is enough. The listener names in the config can be anything. What most people do (use the protocol name) is very confusing. Instead, we use the role of the listener.
1 parent c8bca4c commit a4f16c9

File tree

1 file changed

+10
-6
lines changed
  • embedded-kafka/src/main/scala/io/github/embeddedkafka/ops

1 file changed

+10
-6
lines changed

embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/kafkaOps.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ package io.github.embeddedkafka.ops
33
import io.github.embeddedkafka.{EmbeddedK, EmbeddedKafkaConfig, EmbeddedServer}
44
import kafka.server._
55
import org.apache.kafka.common.Uuid
6-
import org.apache.kafka.common.security.auth.SecurityProtocol
76
import org.apache.kafka.common.utils.Time
87
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
8+
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
99
import org.apache.kafka.metadata.properties.{
1010
MetaProperties,
1111
MetaPropertiesEnsemble,
@@ -17,6 +17,7 @@ import org.apache.kafka.raft.QuorumConfig
1717
import org.apache.kafka.server.ServerSocketFactory
1818
import org.apache.kafka.server.config.{
1919
KRaftConfigs,
20+
ReplicationConfigs,
2021
ServerConfigs,
2122
ServerLogConfigs
2223
}
@@ -52,23 +53,26 @@ trait KafkaOps {
5253
// Without this the controller starts correctly on a random port but it's too late to use this port in the configs for the broker
5354
val actualControllerPort = findPortForControllerOrFail(controllerPort)
5455

55-
val brokerListener = s"${SecurityProtocol.PLAINTEXT}://localhost:$kafkaPort"
56+
val brokerListener = s"BROKER://localhost:$kafkaPort"
5657
val controllerListener = s"CONTROLLER://localhost:$actualControllerPort"
5758

5859
val configProperties = Map[String, Object](
59-
KRaftConfigs.PROCESS_ROLES_CONFIG -> "broker,controller",
60-
KRaftConfigs.NODE_ID_CONFIG -> nodeId.toString,
61-
KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG -> "CONTROLLER",
60+
KRaftConfigs.PROCESS_ROLES_CONFIG -> "broker,controller",
61+
KRaftConfigs.NODE_ID_CONFIG -> nodeId.toString,
62+
ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG -> "BROKER",
63+
KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG -> "CONTROLLER",
6264
QuorumConfig.QUORUM_VOTERS_CONFIG -> s"$nodeId@localhost:$actualControllerPort",
6365
ServerConfigs.BROKER_ID_CONFIG -> nodeId.toString,
6466
SocketServerConfigs.LISTENERS_CONFIG -> s"$brokerListener,$controllerListener",
6567
SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG -> brokerListener,
66-
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG -> "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL",
68+
SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG -> "BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT",
6769
ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG -> autoCreateTopics.toString,
6870
ServerLogConfigs.LOG_DIRS_CONFIG -> kafkaLogDir.toAbsolutePath.toString,
6971
ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG -> 1.toString,
7072
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG -> 1.toString,
7173
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG -> 1.toString,
74+
TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG -> 1.toString,
75+
TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG -> 1.toString,
7276
// The total memory used for log deduplication across all cleaner threads, keep it small to not exhaust suite memory
7377
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP -> logCleanerDedupeBufferSize.toString
7478
) ++ customBrokerProperties

0 commit comments

Comments
 (0)