Skip to content

Commit

Permalink
KAFKA-527 FEAT Allow overriding of producer creation (#619)
Browse files Browse the repository at this point in the history
  • Loading branch information
graemerocher authored Nov 16, 2022
1 parent 4b3b1b7 commit 245ce26
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2017-2022 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka;

import java.util.Properties;

import io.micronaut.context.annotation.Factory;
import io.micronaut.core.annotation.NonNull;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Serializer;

/**
* A default implementation of {@link ProducerFactory} used for creating producer.
*
* @author milanspre
* @since 5.0.0
*/
@Factory
public class DefaultProducerFactory implements ProducerFactory {

/**
*
* Creates kafka producer, could be overridden for further control.
*
* @param config properties for producer
* @param ks key serializer
* @param vs value serializer
* @param <K> key type
* @param <V> value type
* @since 5.0.0
* @return new instance of producer
*/
@Override
@NonNull
public <K, V> Producer<K, V> createProducer(Properties config, Serializer<K> ks, Serializer<V> vs) {
return new KafkaProducer<>(config, ks, vs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,25 @@
*/
package io.micronaut.configuration.kafka;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

import javax.annotation.PreDestroy;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.micronaut.configuration.kafka.annotation.KafkaClient;
import io.micronaut.configuration.kafka.config.AbstractKafkaProducerConfiguration;
import io.micronaut.configuration.kafka.config.DefaultKafkaProducerConfiguration;
Expand All @@ -34,26 +53,9 @@
import io.micronaut.inject.FieldInjectionPoint;
import io.micronaut.inject.InjectionPoint;
import io.micronaut.inject.qualifiers.Qualifiers;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.PreDestroy;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

/**
* A factory class for creating Kafka {@link org.apache.kafka.clients.producer.Producer} instances.
* A registry class for Kafka {@link org.apache.kafka.clients.producer.Producer} instances.
*
* @author Graeme Rocher
* @since 1.0
Expand All @@ -64,15 +66,22 @@ public class KafkaProducerFactory implements ProducerRegistry, TransactionalProd
private final Map<ClientKey, Producer> clients = new ConcurrentHashMap<>();
private final BeanContext beanContext;
private final SerdeRegistry serdeRegistry;
private final ProducerFactory producerFactory;

/**
* Default constructor.
* @param beanContext The bean context
* @param serdeRegistry The serde registry
* @param producerFactory The producer factory
*/
public KafkaProducerFactory(BeanContext beanContext, SerdeRegistry serdeRegistry) {
public KafkaProducerFactory(
BeanContext beanContext,
SerdeRegistry serdeRegistry,
ProducerFactory producerFactory
) {
this.beanContext = beanContext;
this.serdeRegistry = serdeRegistry;
this.producerFactory = producerFactory;
}

/**
Expand All @@ -98,17 +107,11 @@ public <K, V> Producer<K, V> getProducer(
if (keySerializer.isPresent() && valueSerializer.isPresent()) {
Serializer<K> ks = keySerializer.get();
Serializer<V> vs = valueSerializer.get();
return new KafkaProducer<>(
config,
ks,
vs
);
return producerFactory.createProducer(config, ks, vs);
} else if (keySerializer.isPresent() || valueSerializer.isPresent()) {
throw new ConfigurationException("Both the [keySerializer] and [valueSerializer] must be set when setting either");
} else {
return new KafkaProducer<>(
config
);
return producerFactory.createProducer(config, null, null);
}
} else {
throw new ConfigurationException("No Kafka configuration specified when using direct instantiation");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2017-2022 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka;

import java.util.Properties;

import io.micronaut.core.annotation.NonNull;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Serializer;

/**
* A factory class for creating Kafka {@link org.apache.kafka.clients.producer.Producer} instances.
*
* @author milanspre
* @since 5.0.0
*/
public interface ProducerFactory {

/**
*
* Creates kafka producer, could be overridden for further control.
*
* @param config properties for producer
* @param ks key serializer
* @param vs value serializer
* @param <K> key type
* @param <V> value type
* @since 5.0.0
* @return new instance of producer
*/
@NonNull
<K, V> Producer<K, V> createProducer(Properties config, Serializer<K> ks, Serializer<V> vs);
}

0 comments on commit 245ce26

Please sign in to comment.