Closed
Description
my di class
@Factory
public class DIKafka {
@Bean
KafkaProducer<String, String> provideKafkaProducer(@External Vertx vertx, @External YmlJsonObject config) {
// kafka producer
val servers = config.getJsonArray(Constant.Text.KAFKA_SERVERS).stream().map(String::valueOf).toList();
val kafkaProducerMap = new HashMap<String, String>();
kafkaProducerMap.put(Constant.Text.BOOTSTRAP_SERVERS, String.join(",", servers));
kafkaProducerMap.put(Constant.Text.KEY_SERIALIZER, Constant.Text.KAFKA_STRING_SERIALIZER);
kafkaProducerMap.put(Constant.Text.VALUE_SERIALIZER, Constant.Text.KAFKA_STRING_SERIALIZER);
val extraConfig = config.getJsonObject(Constant.Text.KAFKA_PRODUCER_PROPS).stream()
.map(entry -> Map.entry(entry.getKey(), String.valueOf(entry.getValue())))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
kafkaProducerMap.putAll(extraConfig);
return KafkaProducer.create(vertx, kafkaProducerMap);
}
}
Generated class by DI
@Generated("io.avaje.inject.generator")
public final class DIKafka$DI {
public static final Type TYPE_KafkaProducerStringString =
new GenericType<KafkaProducer<String, String>>(){}.type();
public static final Type TYPE_WriteStreamKafkaProducerRecord =
new GenericType<WriteStream<KafkaProducerRecord<K, V>>>(){}.type();
public static void build(Builder builder) {
if (builder.isBeanAbsent(DIKafka.class)) {
var bean = new DIKafka();
builder.register(bean);
}
}
/**
* Create and register KafkaProducer via factory bean method DIKafka#provideKafkaProducer().
*/
public static void build_provideKafkaProducer(Builder builder) {
if (builder.isBeanAbsent(TYPE_KafkaProducerStringString, TYPE_WriteStreamKafkaProducerRecord, StreamBase.class)) {
var factory = builder.get(DIKafka.class);
var bean = factory.provideKafkaProducer(builder.get(Vertx.class,"!vertx"), builder.get(YmlJsonObject.class,"!config"));
builder.register(bean);
}
}
}
public interface KafkaProducer<K, V> extends WriteStream<KafkaProducerRecord<K, V>> {
this code error java public static final Type TYPE_WriteStreamKafkaProducerRecord = new GenericType<WriteStream<KafkaProducerRecord<K, V>>>(){}.type();
because the K, V must be String, String like public static final Type TYPE_KafkaProducerStringString = new GenericType<KafkaProducer<String, String>>(){}.type();