Skip to content

Commit

Permalink
fix: Update kafka-clients to 3.8.0 past snappy CVEs and implement dum…
Browse files Browse the repository at this point in the history
…my `clientInstanceId` (#482)
  • Loading branch information
bcol-google authored Oct 2, 2024
1 parent 63a590e commit 06062a4
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.4.0</version>
<version>3.8.0</version>
</dependency>
</dependencies>
</dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;

/**
* A class that uses a SingleSubscriptionConsumer to remove the duplicate methods from the kafka
Expand Down Expand Up @@ -121,6 +122,12 @@ private SingleSubscriptionConsumer requireValidConsumer() {
return consumer.get();
}

@Override
public Uuid clientInstanceId(Duration timeout) {
// https://javadoc.io/static/org.apache.kafka/kafka-clients/3.8.0/org/apache/kafka/clients/consumer/KafkaConsumer.html#clientInstanceId-java.time.Duration-
throw new IllegalStateException("Pub/Sub Lite Kafka Connector does not support telemetry");
}

@Override
public Set<TopicPartition> assignment() {
return requireValidConsumer().assignment().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnsupportedVersionException;

class PubsubLiteProducer implements Producer<byte[], byte[]> {
Expand Down Expand Up @@ -77,6 +78,12 @@ public void failed(State from, Throwable failure) {
this.publisher.startAsync().awaitRunning();
}

@Override
public Uuid clientInstanceId(Duration timeout) {
// https://javadoc.io/static/org.apache.kafka/kafka-clients/3.8.0/org/apache/kafka/clients/consumer/KafkaConsumer.html#clientInstanceId-java.time.Duration-
throw new IllegalStateException("Pub/Sub Lite Kafka Connector does not support telemetry");
}

@Override
public void initTransactions() {
throw NO_TRANSACTIONS_EXCEPTION;
Expand Down

0 comments on commit 06062a4

Please sign in to comment.