Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Test #24097] Ut fix check kafka clients CVE fix #24269

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Fix CVE-2022-34917 about kafka-client dependency
Upgraded version to 3.7.1

upgraded to 3.9.0

fixed presto-kafka test cases

excluded io.netty

combined exclusion
  • Loading branch information
adkharat committed Dec 17, 2024
commit 87c8bbb1c8df913e2b00e560fec890a668d1b5e8
30 changes: 27 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
<dep.gcs.version>1.9.17</dep.gcs.version>
<dep.alluxio.version>313</dep.alluxio.version>
<dep.slf4j.version>1.7.32</dep.slf4j.version>
<dep.kafka.version>2.3.1</dep.kafka.version>
<dep.kafka.version>3.9.0</dep.kafka.version>
<dep.pinot.version>0.11.0</dep.pinot.version>
<dep.druid.version>30.0.1</dep.druid.version>
<dep.jaxb.version>2.3.1</dep.jaxb.version>
Expand Down Expand Up @@ -1294,7 +1294,7 @@
<dependency>
<groupId>org.pcollections</groupId>
<artifactId>pcollections</artifactId>
<version>2.1.2</version>
<version>4.0.1</version>
</dependency>

<dependency>
Expand Down Expand Up @@ -1811,6 +1811,18 @@
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-csv</artifactId>
</exclusion>
<exclusion>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down Expand Up @@ -1921,7 +1933,7 @@
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.14</version>
<version>3.8.4</version>
<exclusions>
<exclusion>
<artifactId>jline</artifactId>
Expand All @@ -1935,6 +1947,14 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down Expand Up @@ -2003,6 +2023,10 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
</exclusion>
</exclusions>
</dependency>

Expand Down
28 changes: 28 additions & 0 deletions presto-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@
</properties>

<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>com.facebook.airlift</groupId>
<artifactId>bootstrap</artifactId>
Expand Down Expand Up @@ -234,6 +239,19 @@
<version>${scala.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>5.6.0</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand All @@ -248,6 +266,16 @@
</excludes>
</configuration>
</plugin>
<plugin>
<groupId>org.basepom.maven</groupId>
<artifactId>duplicate-finder-maven-plugin</artifactId>
<configuration>
<ignoredResourcePatterns>
<ignoredResourcePattern>kafka/kafka-version.properties</ignoredResourcePattern>
<ignoredResourcePattern>about.html</ignoredResourcePattern>
</ignoredResourcePatterns>
</configuration>
</plugin>
</plugins>
</build>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,21 @@
package com.facebook.presto.kafka;

import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;
import com.facebook.presto.kafka.schema.file.FileTableDescriptionSupplier;
import com.facebook.presto.kafka.server.file.FileKafkaClusterMetadataSupplier;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;

import javax.validation.constraints.NotNull;

import java.io.File;
import java.util.List;

import static com.google.common.collect.ImmutableList.toImmutableList;

public class KafkaConnectorConfig
{
/**
Expand Down Expand Up @@ -58,6 +66,8 @@ public class KafkaConnectorConfig
*/
private String clusterMetadataSupplier = FileKafkaClusterMetadataSupplier.NAME;

private List<File> resourceConfigFiles = ImmutableList.of();

@NotNull
public String getDefaultSchema()
{
Expand Down Expand Up @@ -145,4 +155,20 @@ public KafkaConnectorConfig setHideInternalColumns(boolean hideInternalColumns)
this.hideInternalColumns = hideInternalColumns;
return this;
}

@NotNull
public List<File> getResourceConfigFiles()
{
return resourceConfigFiles;
}

@Config("kafka.config.resources")
@ConfigDescription("Optional config files")
public KafkaConnectorConfig setResourceConfigFiles(String files)
{
this.resourceConfigFiles = Splitter.on(',').trimResults().omitEmptyStrings().splitToList(files).stream()
.map(File::new)
.collect(toImmutableList());
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public void testDefaults()
.setTableDescriptionSupplier(FileTableDescriptionSupplier.NAME)
.setHideInternalColumns(true)
.setMaxPartitionFetchBytes(1048576)
.setMaxPollRecords(500));
.setMaxPollRecords(500)
.setResourceConfigFiles(""));
}

@Test
Expand All @@ -46,6 +47,7 @@ public void testExplicitPropertyMappings()
.put("kafka.hide-internal-columns", "false")
.put("kafka.max-partition-fetch-bytes", "1024")
.put("kafka.max-poll-records", "1000")
.put("kafka.config.resources", "test-config.file")
.build();

KafkaConnectorConfig expected = new KafkaConnectorConfig()
Expand All @@ -55,7 +57,8 @@ public void testExplicitPropertyMappings()
.setKafkaConnectTimeout("1h")
.setHideInternalColumns(false)
.setMaxPartitionFetchBytes(1024)
.setMaxPollRecords(1000);
.setMaxPollRecords(1000)
.setResourceConfigFiles("test-config.file");

ConfigAssertions.assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.ZkUtils;
import kafka.server.KafkaServer;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.utils.Time;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.facebook.presto.kafka.util.TestUtils.findUnusedPort;
Expand All @@ -45,27 +48,28 @@
public class EmbeddedKafka
implements Closeable
{
private final EmbeddedZookeeper zookeeper;
private final ZooKeeperEmbedded zookeeper;
private final int port;
private final File kafkaDataDir;
private final KafkaServerStartable kafka;
private final KafkaServer kafka;
private final AdminClient adminClient;

private final AtomicBoolean started = new AtomicBoolean();
private final AtomicBoolean stopped = new AtomicBoolean();

public static EmbeddedKafka createEmbeddedKafka()
throws IOException
throws Exception
{
return new EmbeddedKafka(new EmbeddedZookeeper(), new Properties());
return new EmbeddedKafka(new ZooKeeperEmbedded(findUnusedPort()), new Properties());
}

public static EmbeddedKafka createEmbeddedKafka(Properties overrideProperties)
throws IOException
throws Exception
{
return new EmbeddedKafka(new EmbeddedZookeeper(), overrideProperties);
return new EmbeddedKafka(new ZooKeeperEmbedded(findUnusedPort()), overrideProperties);
}

EmbeddedKafka(EmbeddedZookeeper zookeeper, Properties overrideProperties)
EmbeddedKafka(ZooKeeperEmbedded zookeeper, Properties overrideProperties)
throws IOException
{
this.zookeeper = requireNonNull(zookeeper, "zookeeper is null");
Expand All @@ -77,6 +81,7 @@ public static EmbeddedKafka createEmbeddedKafka(Properties overrideProperties)
Map<String, String> properties = ImmutableMap.<String, String>builder()
.put("broker.id", "0")
.put("host.name", "localhost")
.put("listeners", "PLAINTEXT://localhost:" + getPort())
.put("num.partitions", "2")
.put("log.flush.interval.messages", "10000")
.put("log.flush.interval.ms", "1000")
Expand All @@ -85,20 +90,24 @@ public static EmbeddedKafka createEmbeddedKafka(Properties overrideProperties)
.put("zookeeper.connection.timeout.ms", "1000000")
.put("port", Integer.toString(port))
.put("log.dirs", kafkaDataDir.getAbsolutePath())
.put("zookeeper.connect", zookeeper.getConnectString())
.put("zookeeper.connect", zookeeper.connectString())
.put("offsets.topic.replication.factor", "1")
.putAll(Maps.fromProperties(overrideProperties))
.build();

KafkaConfig config = new KafkaConfig(toProperties(properties));
this.kafka = new KafkaServerStartable(config);
Time time = Time.SYSTEM;
this.kafka = new KafkaServer(config, time, scala.Option.empty(), false);
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getConnectString());
adminProps.put("log.level", "DEBUG");
this.adminClient = AdminClient.create(adminProps);
}

public void start()
throws InterruptedException, IOException
{
if (!started.getAndSet(true)) {
zookeeper.start();
kafka.startup();
}
}
Expand All @@ -110,8 +119,9 @@ public void close()
if (started.get() && !stopped.getAndSet(true)) {
kafka.shutdown();
kafka.awaitShutdown();
zookeeper.close();
zookeeper.stop();
deleteRecursively(kafkaDataDir.toPath(), ALLOW_INSECURE);
adminClient.close();
}
}

Expand All @@ -123,15 +133,15 @@ public void createTopics(String... topics)
public void createTopics(int partitions, int replication, Properties topicProperties, String... topics)
{
checkState(started.get() && !stopped.get(), "not started!");

ZkUtils zkUtils = ZkUtils.apply(getZookeeperConnectString(), 30_000, 30_000, false);
try {
for (String topic : topics) {
AdminUtils.createTopic(zkUtils, topic, partitions, replication, topicProperties, RackAwareMode.Disabled$.MODULE$);
NewTopic newTopic = new NewTopic(topic, partitions, (short) replication);
newTopic.configs(Maps.fromProperties(topicProperties));
adminClient.createTopics(Collections.singleton(newTopic)).all().get();
}
}
finally {
zkUtils.close();
catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Failed to create topics", e);
}
}

Expand All @@ -146,11 +156,6 @@ public KafkaProducer<Long, Object> createProducer()
return new KafkaProducer<>(properties);
}

public int getZookeeperPort()
{
return zookeeper.getPort();
}

public int getPort()
{
return port;
Expand All @@ -163,6 +168,6 @@ public String getConnectString()

public String getZookeeperConnectString()
{
return zookeeper.getConnectString();
return zookeeper.connectString();
}
}
Loading