Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions contrib/storage-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@
<name>Drill : Contrib : Storage : Kafka</name>

<properties>
<kafka.version>2.8.2</kafka.version>
<kafka.version>3.9.0</kafka.version>
<kafka_scala.version>3.9.0</kafka_scala.version>
<kafka.TestSuite>**/TestKafkaSuite.class</kafka.TestSuite>
</properties>

Expand Down Expand Up @@ -80,7 +81,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.13</artifactId>
<version>${kafka.version}</version>
<version>${kafka_scala.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.junit.runners.Suite.SuiteClasses;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -83,8 +82,8 @@ public static void initKafka() throws Exception {
embeddedKafkaCluster = new EmbeddedKafkaCluster();
zkClient = KafkaZkClient.apply(embeddedKafkaCluster.getZkServer().getConnectionString(),
false, SESSION_TIMEOUT, CONN_TIMEOUT, 0, Time.SYSTEM,
"kafka.server", "SessionExpireListener",
Option.<String>empty(), Option.<ZKClientConfig>empty());
"kafka.server", new ZKClientConfig(),
"kafka.server", "SessionExpireListener", false, false);
createTopicHelper(TestQueryConstants.JSON_TOPIC, 1);
createTopicHelper(TestQueryConstants.AVRO_TOPIC, 1);
KafkaMessageGenerator generator = new KafkaMessageGenerator(embeddedKafkaCluster.getKafkaBrokerList(), StringSerializer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,29 +70,28 @@ public EmbeddedKafkaCluster(Properties baseProps, int numberOfBrokers) throws IO
}

this.props.put("metadata.broker.list", sb.toString());
this.props.put(KafkaConfig.ZkConnectProp(), this.zkHelper.getConnectionString());
this.props.put("zookeeper.connect", this.zkHelper.getConnectionString());
logger.info("Initialized Kafka Server");
this.closer = new KafkaAsyncCloser();
}

private void addBroker(Properties props, int brokerID, int ephemeralBrokerPort) {
Properties properties = new Properties();
properties.putAll(props);
properties.put(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp(), String.valueOf(1));
properties.put(KafkaConfig.OffsetsTopicPartitionsProp(), String.valueOf(1));
properties.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), String.valueOf(1));
properties.put(KafkaConfig.DefaultReplicationFactorProp(), String.valueOf(1));
properties.put(KafkaConfig.GroupMinSessionTimeoutMsProp(), String.valueOf(100));
properties.put(KafkaConfig.AutoCreateTopicsEnableProp(), Boolean.FALSE);
properties.put(KafkaConfig.ZkConnectProp(), zkHelper.getConnectionString());
properties.put(KafkaConfig.BrokerIdProp(), String.valueOf(brokerID + 1));
properties.put(KafkaConfig.HostNameProp(), LOCAL_HOST);
properties.put(KafkaConfig.AdvertisedHostNameProp(), LOCAL_HOST);
properties.put(KafkaConfig.PortProp(), String.valueOf(ephemeralBrokerPort));
properties.put(KafkaConfig.AdvertisedPortProp(), String.valueOf(ephemeralBrokerPort));
properties.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.TRUE);
properties.put(KafkaConfig.LogDirsProp(), getTemporaryDir().getAbsolutePath());
properties.put(KafkaConfig.LogFlushIntervalMessagesProp(), String.valueOf(1));
properties.put("leader.imbalance.check.interval.seconds", String.valueOf(1));
properties.put("offsets.topic.num.partitions", String.valueOf(1));
properties.put("offsets.topic.replication.factor", String.valueOf(1));
properties.put("default.replication.factor", String.valueOf(1));
properties.put("group.min.session.timeout.ms", String.valueOf(100));
properties.put("auto.create.topics.enable", Boolean.FALSE);
properties.put("zookeeper.connect", zkHelper.getConnectionString());
properties.put("broker.id", String.valueOf(brokerID + 1));
properties.put("listeners", "PLAINTEXT://" + LOCAL_HOST + ":" + ephemeralBrokerPort);
properties.put("advertised.listeners", "PLAINTEXT://" + LOCAL_HOST + ":" + ephemeralBrokerPort);
properties.put("port", String.valueOf(ephemeralBrokerPort));
properties.put("delete.topic.enable", Boolean.TRUE);
properties.put("log.dirs", getTemporaryDir().getAbsolutePath());
properties.put("log.flush.interval.messages", String.valueOf(1));
brokers.add(getBroker(properties));
}

Expand All @@ -119,7 +118,7 @@ public void shutDownCluster() {

public void shutDownBroker(int brokerId) {
brokers.stream()
.filter(broker -> Integer.parseInt(broker.config().getString(KafkaConfig.BrokerIdProp())) == brokerId)
.filter(broker -> Integer.parseInt(broker.config().getString("broker.id")) == brokerId)
.findAny()
.ifPresent(KafkaServer::shutdown);
}
Expand All @@ -145,7 +144,19 @@ public ZookeeperHelper getZkServer() {
public String getKafkaBrokerList() {
return brokers.stream()
.map(KafkaServer::config)
.map(serverConfig -> serverConfig.hostName() + ":" + serverConfig.port())
.map(serverConfig -> {
// Try modern listeners first, fall back to legacy host.name/port
try {
String listeners = serverConfig.getString("listeners");
// Extract host:port from listeners (format: PLAINTEXT://host:port)
return listeners.replaceAll("^[A-Z]+://", "");
} catch (Exception e) {
// Fall back to legacy approach using advertised properties or default host/port
String host = LOCAL_HOST;
int port = serverConfig.getInt("port");
return host + ":" + port;
}
})
.collect(Collectors.joining(","));
}

Expand Down
Loading