Skip to content

Tombstone records not supported in @PulsarListener #506

Closed
@onobc

Description

@onobc

The @PulsarListener is not able to consume records that have a key but have no value (aka Tombstone records).

Reproduce

Create the following producer and consumer:

private static final String TOPIC = "test-topic";

@Bean
ApplicationRunner sendComplexMessagesToPulsarTopic(PulsarTemplate<Foo> template) {
	return (args) -> {
		template.newMessage(null)
				.withTopic(TOPIC)
				.withSchema(Schema.JSON(Foo.class))
				.withMessageCustomizer((mc) -> mc.key("key0"))
				.send();
	};
}

@PulsarListener(
		subscriptionName = TOPIC + "-sub",
		topics = TOPIC,
		schemaType = SchemaType.JSON
)
public void consumeEntireTopic(
		@Payload(required = false) Foo foo,
		@Header(PulsarHeaders.TOPIC_NAME) String topic,
		@Header(PulsarHeaders.KEY) String key,
		@Header(PulsarHeaders.PUBLISH_TIME) String ts
) {
	System.out.println("******* " + foo);
}

Cause

The problem lies in the fact that currently if the listener accepts a Spring Message or uses Spring Headers, it will attempt to convert it into a Spring Message which does not accept null values - resulting in the following:

java.lang.IllegalArgumentException: Payload must not be null
	at org.springframework.util.Assert.notNull(Assert.java:172)
	at org.springframework.messaging.support.MessageBuilder.createMessage(MessageBuilder.java:205)
	at org.springframework.pulsar.support.converter.PulsarRecordMessageConverter.toMessage(PulsarRecordMessageConverter.java:49)
	at org.springframework.pulsar.listener.adapter.AbstractPulsarMessageToSpringMessageAdapter.toMessagingMessage(AbstractPulsarMessageToSpringMessageAdapter.java:140)
	at org.springframework.pulsar.listener.adapter.PulsarRecordMessageToSpringMessageListenerAdapter.received(PulsarRecordMessageToSpringMessageListenerAdapter.java:49)
	at org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer$Listener.dispatchMessageToListener(DefaultPulsarMessageListenerContainer.java:507)
	at org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer$Listener.lambda$run$9(DefaultPulsarMessageListenerContainer.java:464)
	at io.micrometer.observation.Observation.observe(Observation.java:499)
	at org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer$Listener.run(DefaultPulsarMessageListenerContainer.java:464)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804)
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java)
	at java.base/java.lang.Thread.run(Thread.java:833)

Solution

Use a special marker value for null to indicate tombstone record.
Spring for Apache Kafka uses the KafkaNull for this purpose.

Metadata

Metadata

Assignees

Labels

type: improvementAn improvement of implementation of existing feature

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions