Skip to content

Commit

Permalink
Handle subclasses for Kafka Serde autodetection
Browse files Browse the repository at this point in the history
Added provided serde classes to the jandex index
  • Loading branch information
ozangunalp authored and melloware committed Sep 17, 2024
1 parent c3e7ee5 commit 280f66d
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,21 @@ void addSaslProvidersToNativeImage(BuildProducer<NativeImageSecurityProviderBuil

@BuildStep
void contributeClassesToIndex(BuildProducer<AdditionalIndexedClassesBuildItem> additionalIndexedClasses,
BuildProducer<IndexDependencyBuildItem> indexDependency) {
BuildProducer<IndexDependencyBuildItem> indexDependency, Capabilities capabilities) {
indexDependency.produce(new IndexDependencyBuildItem("org.apache.kafka", "kafka-clients"));
additionalIndexedClasses.produce(new AdditionalIndexedClassesBuildItem(
JsonObjectSerializer.class.getName(),
JsonObjectDeserializer.class.getName(),
JsonArraySerializer.class.getName(),
JsonArrayDeserializer.class.getName(),
BufferSerializer.class.getName(),
BufferDeserializer.class.getName(),
ObjectMapperSerializer.class.getName(),
ObjectMapperDeserializer.class.getName()));
if (capabilities.isPresent(Capability.JSONB)) {
additionalIndexedClasses.produce(new AdditionalIndexedClassesBuildItem(
JsonbSerializer.class.getName(), JsonbDeserializer.class.getName()));
}
}

@BuildStep
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static io.quarkus.smallrye.reactivemessaging.kafka.deployment.SmallRyeReactiveMessagingKafkaProcessor.getChannelPropertyKey;

import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -17,8 +18,8 @@
import org.jboss.jandex.ClassInfo;
import org.jboss.jandex.DotName;
import org.jboss.jandex.IndexView;
import org.jboss.jandex.Type;

import io.quarkus.deployment.util.JandexUtil;
import io.quarkus.smallrye.reactivemessaging.deployment.items.ChannelDirection;
import io.quarkus.smallrye.reactivemessaging.deployment.items.ConnectorManagedChannelBuildItem;
import io.smallrye.reactive.messaging.kafka.KafkaConnector;
Expand Down Expand Up @@ -184,24 +185,20 @@ boolean hasJsonb() {
}

ClassInfo getSubclassOfWithTypeArgument(DotName superclass, DotName expectedTypeArgument) {
return index.getKnownDirectSubclasses(superclass)
return index.getAllKnownSubclasses(superclass)
.stream()
.filter(it -> it.superClassType().kind() == Type.Kind.PARAMETERIZED_TYPE
&& it.superClassType().asParameterizedType().arguments().size() == 1
&& it.superClassType().asParameterizedType().arguments().get(0).name().equals(expectedTypeArgument))
.findAny()
.filter(ci -> !ci.isAbstract() && JandexUtil.resolveTypeParameters(ci.name(), superclass, index)
.stream().anyMatch(t -> t.name().equals(expectedTypeArgument)))
.min(Comparator.comparing(ClassInfo::name))
.orElse(null);
}

ClassInfo getImplementorOfWithTypeArgument(DotName implementedInterface, DotName expectedTypeArgument) {
return index.getKnownDirectImplementors(implementedInterface)
return index.getAllKnownImplementors(implementedInterface)
.stream()
.filter(ci -> ci.interfaceTypes().stream()
.anyMatch(it -> it.name().equals(implementedInterface)
&& it.kind() == Type.Kind.PARAMETERIZED_TYPE
&& it.asParameterizedType().arguments().size() == 1
&& it.asParameterizedType().arguments().get(0).name().equals(expectedTypeArgument)))
.findAny()
.filter(ci -> !ci.isAbstract() && JandexUtil.resolveTypeParameters(ci.name(), implementedInterface, index)
.stream().anyMatch(t -> t.name().equals(expectedTypeArgument)))
.min(Comparator.comparing(ClassInfo::name))
.orElse(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ private static void doTest(Config customConfig, Tuple[] expectations,
List<Class<?>> classes = new ArrayList<>(Arrays.asList(classesToIndex));
classes.add(Incoming.class);
classes.add(Outgoing.class);
classes.add(Serializer.class);
classes.add(Deserializer.class);
classes.add(io.quarkus.kafka.client.serialization.ObjectMapperDeserializer.class);
classes.add(io.quarkus.kafka.client.serialization.ObjectMapperSerializer.class);
classes.add(io.quarkus.kafka.client.serialization.JsonbSerializer.class);
classes.add(io.quarkus.kafka.client.serialization.JsonbDeserializer.class);
DefaultSerdeDiscoveryState discovery = new DefaultSerdeDiscoveryState(index(classes)) {
@Override
Config getConfig() {
Expand Down Expand Up @@ -3001,5 +3007,46 @@ Multi<GenericPayload<ProducerRecord<String, Long>>> method4() {
}
}

@Test
void inheritingSerdeClass() {
Tuple[] expectations = {
tuple("mp.messaging.outgoing.channel1.value.serializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$MyChildSerializer"),
tuple("mp.messaging.incoming.channel2.value.deserializer", "io.quarkus.smallrye.reactivemessaging.kafka.deployment.DefaultSerdeConfigTest$MyChildDeserializer"),
};
doTest(expectations, ChannelChildSerializer.class, MyChildSerializer.class, ParentSerializer.class, MyChildDeserializer.class, ParentDeserializer.class);
}

private static class MyChildSerializer extends ParentSerializer<JsonbDto> {

}

private static abstract class ParentSerializer<T> implements Serializer<T> {

@Override
public byte[] serialize(String topic, T data) {
return new byte[0];
}
}

private static class MyChildDeserializer extends ParentDeserializer {

}

private static abstract class ParentDeserializer implements Deserializer<JsonbDto> {

@Override
public JsonbDto deserialize(String topic, byte[] data) {
return null;
}
}

private static class ChannelChildSerializer {
@Channel("channel1")
Emitter<JsonbDto> emitter1;

@Channel("channel2")
Multi<JsonbDto> channel2;
}


}
17 changes: 17 additions & 0 deletions integration-tests/reactive-messaging-kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jsonb</artifactId>
</dependency>

<!-- Health -->
<dependency>
Expand Down Expand Up @@ -140,6 +144,19 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-jsonb-deployment</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>*</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health-deployment</artifactId>
Expand Down

0 comments on commit 280f66d

Please sign in to comment.