From 245ce26c127c7f10feb05671b891afd91de6fa6c Mon Sep 17 00:00:00 2001 From: Graeme Rocher Date: Wed, 16 Nov 2022 08:30:46 +0100 Subject: [PATCH] KAFKA-527 FEAT Allow overriding of producer creation (#619) --- .../kafka/DefaultProducerFactory.java | 52 +++++++++++++++++ .../kafka/KafkaProducerFactory.java | 57 ++++++++++--------- .../configuration/kafka/ProducerFactory.java | 46 +++++++++++++++ 3 files changed, 128 insertions(+), 27 deletions(-) create mode 100644 kafka/src/main/java/io/micronaut/configuration/kafka/DefaultProducerFactory.java create mode 100644 kafka/src/main/java/io/micronaut/configuration/kafka/ProducerFactory.java diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/DefaultProducerFactory.java b/kafka/src/main/java/io/micronaut/configuration/kafka/DefaultProducerFactory.java new file mode 100644 index 000000000..6979240e5 --- /dev/null +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/DefaultProducerFactory.java @@ -0,0 +1,52 @@ +/* + * Copyright 2017-2022 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.configuration.kafka; + +import java.util.Properties; + +import io.micronaut.context.annotation.Factory; +import io.micronaut.core.annotation.NonNull; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.Serializer; + +/** + * A default implementation of {@link ProducerFactory} used for creating producer. + * + * @author milanspre + * @since 5.0.0 + */ +@Factory +public class DefaultProducerFactory implements ProducerFactory { + + /** + * + * Creates kafka producer, could be overridden for further control. + * + * @param config properties for producer + * @param ks key serializer + * @param vs value serializer + * @param key type + * @param value type + * @since 5.0.0 + * @return new instance of producer + */ + @Override + @NonNull + public Producer createProducer(Properties config, Serializer ks, Serializer vs) { + return new KafkaProducer<>(config, ks, vs); + } +} diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerFactory.java b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerFactory.java index b77b263c1..db4970645 100644 --- a/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerFactory.java +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/KafkaProducerFactory.java @@ -15,6 +15,25 @@ */ package io.micronaut.configuration.kafka; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; + +import javax.annotation.PreDestroy; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.Serializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.micronaut.configuration.kafka.annotation.KafkaClient; import io.micronaut.configuration.kafka.config.AbstractKafkaProducerConfiguration; import io.micronaut.configuration.kafka.config.DefaultKafkaProducerConfiguration; @@ -34,26 +53,9 @@ import io.micronaut.inject.FieldInjectionPoint; import io.micronaut.inject.InjectionPoint; import io.micronaut.inject.qualifiers.Qualifiers; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.Serializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.annotation.PreDestroy; -import java.time.Duration; -import java.util.HashMap; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; -import java.util.Properties; -import java.util.concurrent.ConcurrentHashMap; -import java.util.function.Supplier; /** - * A factory class for creating Kafka {@link org.apache.kafka.clients.producer.Producer} instances. + * A registry class for Kafka {@link org.apache.kafka.clients.producer.Producer} instances. * * @author Graeme Rocher * @since 1.0 @@ -64,15 +66,22 @@ public class KafkaProducerFactory implements ProducerRegistry, TransactionalProd private final Map clients = new ConcurrentHashMap<>(); private final BeanContext beanContext; private final SerdeRegistry serdeRegistry; + private final ProducerFactory producerFactory; /** * Default constructor. * @param beanContext The bean context * @param serdeRegistry The serde registry + * @param producerFactory The producer factory */ - public KafkaProducerFactory(BeanContext beanContext, SerdeRegistry serdeRegistry) { + public KafkaProducerFactory( + BeanContext beanContext, + SerdeRegistry serdeRegistry, + ProducerFactory producerFactory + ) { this.beanContext = beanContext; this.serdeRegistry = serdeRegistry; + this.producerFactory = producerFactory; } /** @@ -98,17 +107,11 @@ public Producer getProducer( if (keySerializer.isPresent() && valueSerializer.isPresent()) { Serializer ks = keySerializer.get(); Serializer vs = valueSerializer.get(); - return new KafkaProducer<>( - config, - ks, - vs - ); + return producerFactory.createProducer(config, ks, vs); } else if (keySerializer.isPresent() || valueSerializer.isPresent()) { throw new ConfigurationException("Both the [keySerializer] and [valueSerializer] must be set when setting either"); } else { - return new KafkaProducer<>( - config - ); + return producerFactory.createProducer(config, null, null); } } else { throw new ConfigurationException("No Kafka configuration specified when using direct instantiation"); diff --git a/kafka/src/main/java/io/micronaut/configuration/kafka/ProducerFactory.java b/kafka/src/main/java/io/micronaut/configuration/kafka/ProducerFactory.java new file mode 100644 index 000000000..40e7e0b76 --- /dev/null +++ b/kafka/src/main/java/io/micronaut/configuration/kafka/ProducerFactory.java @@ -0,0 +1,46 @@ +/* + * Copyright 2017-2022 original authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.micronaut.configuration.kafka; + +import java.util.Properties; + +import io.micronaut.core.annotation.NonNull; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.common.serialization.Serializer; + +/** + * A factory class for creating Kafka {@link org.apache.kafka.clients.producer.Producer} instances. + * + * @author milanspre + * @since 5.0.0 + */ +public interface ProducerFactory { + + /** + * + * Creates kafka producer, could be overridden for further control. + * + * @param config properties for producer + * @param ks key serializer + * @param vs value serializer + * @param key type + * @param value type + * @since 5.0.0 + * @return new instance of producer + */ + @NonNull + Producer createProducer(Properties config, Serializer ks, Serializer vs); +}