Closed
Description
The @PulsarListener
is not able to consume records that have a key but have no value (aka Tombstone records).
Reproduce
Create the following producer and consumer:
private static final String TOPIC = "test-topic";
@Bean
ApplicationRunner sendComplexMessagesToPulsarTopic(PulsarTemplate<Foo> template) {
return (args) -> {
template.newMessage(null)
.withTopic(TOPIC)
.withSchema(Schema.JSON(Foo.class))
.withMessageCustomizer((mc) -> mc.key("key0"))
.send();
};
}
@PulsarListener(
subscriptionName = TOPIC + "-sub",
topics = TOPIC,
schemaType = SchemaType.JSON
)
public void consumeEntireTopic(
@Payload(required = false) Foo foo,
@Header(PulsarHeaders.TOPIC_NAME) String topic,
@Header(PulsarHeaders.KEY) String key,
@Header(PulsarHeaders.PUBLISH_TIME) String ts
) {
System.out.println("******* " + foo);
}
Cause
The problem lies in the fact that currently if the listener accepts a Spring Message or uses Spring Headers, it will attempt to convert it into a Spring Message
which does not accept null values - resulting in the following:
java.lang.IllegalArgumentException: Payload must not be null
at org.springframework.util.Assert.notNull(Assert.java:172)
at org.springframework.messaging.support.MessageBuilder.createMessage(MessageBuilder.java:205)
at org.springframework.pulsar.support.converter.PulsarRecordMessageConverter.toMessage(PulsarRecordMessageConverter.java:49)
at org.springframework.pulsar.listener.adapter.AbstractPulsarMessageToSpringMessageAdapter.toMessagingMessage(AbstractPulsarMessageToSpringMessageAdapter.java:140)
at org.springframework.pulsar.listener.adapter.PulsarRecordMessageToSpringMessageListenerAdapter.received(PulsarRecordMessageToSpringMessageListenerAdapter.java:49)
at org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer$Listener.dispatchMessageToListener(DefaultPulsarMessageListenerContainer.java:507)
at org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer$Listener.lambda$run$9(DefaultPulsarMessageListenerContainer.java:464)
at io.micrometer.observation.Observation.observe(Observation.java:499)
at org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer$Listener.run(DefaultPulsarMessageListenerContainer.java:464)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804)
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java)
at java.base/java.lang.Thread.run(Thread.java:833)
Solution
Use a special marker value for null to indicate tombstone record.
Spring for Apache Kafka uses the KafkaNull for this purpose.