Skip to content

Listener docs cleanup #128

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 19, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 200 additions & 26 deletions spring-pulsar-docs/src/main/asciidoc/pulsar.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,13 @@ In addition, in order to enable batch consumption at the `PulsarListener` level,
Based on the actual type that the `List` holds, the framework tries to infer the schema to use.
If the `List` contains a complex type, then the `schemaType` still needs to be provided on `PulsarListener`.

The following also should work in which we use the `Messages` holder type provided by the Pulsar Java client.
The following also should work in which we use the `Message` envelope provided by the Pulsar Java client.

====
[source, java]
----
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(Messages<Foo> messages) {
public void listen4(List<Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
Expand All @@ -244,6 +244,10 @@ Note that the properties used are direct Pulsar consumer properties.
[[pulsar-message-listener-container]]
==== Pulsar Message Listener Container

Now that we saw the basic interactions on the consumer side through `PulsarListener`, let us now dive into the inner workings of how `PulsarListener` interacts with the underlying Pulsar consumer.
Keep in mind that, for end-user applications, in most scenarios, we recommend using `PulsarListener` annotation directly for consuming from a Pulsar topic when using Spring for Apache Pulsar, as that model covers a broad set of application use cases.
However, it is important to understand how `PulsarListener` works internally and this section will go through those details.

As briefly mentioned above, the message listener container is at the heart of message consumption when using Spring for Apache Pulsar.
`PulsarListener` uses the message listener container infrastructure behind the scenes to create and manage the Pulsar consumer.
Spring for Apache Pulsar provides the contract for this message listener container through `PulsarMessageListenerContainer`.
Expand Down Expand Up @@ -336,6 +340,47 @@ public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T>
Concurrency of more than `1` is only allowed on non-exclusive subscriptions (`failover`, `shared` and `key-shared`).
You can only have the default `1` for concurrency when you have an exclusive subscription mode.

Here is an example of enabling `concurrency` through the `PulsarListener` annotation for a `failover` subscription.

====
[source, java]
----
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
...
System.out.println("Current Thread: " + Thread.currentThread().getName());
System.out.println("Current Consumer: " + consumer.getConsumerName());
}
----
====

In the above listener, it is assumed that the topic `my-topic` has 3 partitions.
If it is a non-partitioned topic, then having concurrency set to `3`, will not do anything, you will simply get two idle consumers in addition to the main active one.
If the topic has more than 3 partitions, then messages will be load-balanced across the consumers that the container creates.
If you run this `PulsarListener`, you will see that messages from different partitions will be consumed through different consumers as implied by the thread name and consumer names printouts in the example code above.

**Note: When using the `Failover` subscription this way on partitioned topics, Pulsar guarantees message ordering.**

Here is another example of `PulsarListener`, but with `Shared` subscription and `concurrency` enabled.

====
[source, java]
----
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
...
}
----
====

In the example above, the `PulsarListener` creates 5 different consumers (once again, we are assuming that the topic has 5 partitions).

**Keep in mind that, in this version, there is no message ordering as `Shared` subscriptions do not guarantee any message ordering in Pulsar**

If you need message ordering and still want a shared subscription types, then you need to use the `Key_Shared` subscription type.

==== Consuming the Records

In this section, we are going to see how the message listener container enables both single record and batch based message consumption.
Expand Down Expand Up @@ -396,31 +441,159 @@ MANUAL;
```

`BATCH` acknowledgment mode is the default, but you can change it on the message listener container.
In the following sections, we will see how acknowledgment works when using both single and batch versions of `PulsarListener` and how they translate to the backing message listener container (and of course ultimately to the Pulsar consumer).

==== Automatic Message Ack in Single Record Mode

==== Message Ack in Single Record Mode
Let us revisit our basic single message based `PulsarListener`.

When consuming single records using `PulsarRecordMessageListener` and the default ack mode of `BATCH` is used, the framework waits for all the record received from the `batchReceive` call to process successfully and then call the `acknowledge` method on the Pulsar Consumer.
====
[source, java]
----
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
----
====

It is natural to wonder, how acknowledgment works when using `PulsarListener`, espcially if you are familiar with Pulsar consumer directly.
The answer comes down to the message listener container as that is the central place in Spring for Apache Pulsar which coordinates all the consumer related activities.

Assuming you are not overriding the default behavior, this is what happens behind the scenes when using the above `PulsarListener`.

. First, the Listener container receives messages as batch from the Pulsar consumer.
. The received messages are handed down to `PulsarListener` one message at a time
. When all the records are handed down to the listener method and successfully processed, the container will acknowledge all the messages from the original bach receive.

This is the normal flow. If any record from the original batch received, throws an exception, Spring for Apache Pulsar will track them separately.
When all the records from the batch are processed, then Spring for Apache Pulsar will acknowledge all the succesful messages and negatively acknowledge (nack) all the failed messages.
In other words, when consuming single records using `PulsarRecordMessageListener` and the default ack mode of `BATCH` is used, the framework waits for all the record received from the `batchReceive` call to process successfully and then call the `acknowledge` method on the Pulsar Consumer.
If any particular record throws an exception when invoking the handler method, Spring for Apache Pulsar tracks those records and separately call `negativeAcknowledge` on those records after the entire batch is processed.

If the application wants the acknowledgment or negative acknowledgment to occur per record, then the `RECORD` ack mode can be enabled.
In that case, after handling each record the message is acknowledged if no error or negatively acknowledged if there was an error.
In that case, after handling each record, the message is acknowledged if no error and negatively acknowledged if there was an error.
Here is an example of enabling `RECORD` ack mode on Pulsar Listener.

====
[source, java]
----
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
System.out.println("Message Received: " + message);
}
----
====

You can also set the listener property, `spring.pulsar.listner.ack-mode` to set the ack mode application wide.
When doing this, you do not need to set this on the `PulsarListener` annotation.
In that case, all the `PulsarListener` methods in the application acquires that property.

==== Manual Message Ack in Single Record Mode

There are situations in which you might not want the framework to do any acknowledgments, but rather do that directly from the application itself.
Spring for Apache Pulsar provides a couple of ways to enable manual message acknowledgments.
Let us look at a few examples.

====
[source, java]
----
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
System.out.println("Message Received: " + message.getValue());
acknowledgment.acknowledge();
}
----
====

Few things merit explanation here - First we are enabling manual ack mode by setting `ackMode` on `PulsarListener`.
When enabling manual ack mode, Spring for Apache Pulsar allows the application to inject an `Acknowledgment` object as you can see in the above `PulsarListener` method.
The framework achieves this by selecting a compatible message listener container - `PulsarAcknowledgingMessageListener` for single record based consumption which gives you access to an `Acknowledgment` object.

The `Acknowledgment` object provides the following API methods.

====
[source, java]
----
void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);
----
====

You can inject this `Acknowledgment` object to your `PulsarListener` while using `MANUAL` ack mode and then call one of the corresponding methods above.

In the above `PulsarListener` example, we are calling a parameter-less `acknowledge` method.
This is because the framework knows which `Message` it is operating under currently.
When calling `acknowledge()`, you do not need to receive the payload with the `Message` enveloper`, but rather simply using the target type - `String` in this example.
You can also call a different variant of `acknowledge` by providing the message id - `acknowledge.acknowledge(message.getMessageId());`
When using `acknowledge(messageId)`, you must receive the payload using the `Message<?>` envelope.

Similar to what is possible for acknowledging, the `Acknowledgment` API also provides options for negatively acknowledging - see the nack methods above.

You can also call `acknowledge` directly on the Pulsar consumer as below.

====
[source, java]
----
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
System.out.println("Message Received: " + message.getValue());
try {
consumer.acknowledge(message);
}
catch (Exception e) {
....
}
}
----
====

As you can see, when calling `acknowledge` directly on the underlying consumer, then you need to do error handling by yourself.
Using the `Acknowledgment` does not require that as the framework can do that for you.
Therefore, it is recommended to use the `Acknowledgment` object approach when using manual acknowledgment.

When using manual acknowledgment, it is important to understand that the framework completely stay from any acknowledgment at all.
Hence, it is extremely important for the end-users to think through the right acknowledgment strategies when designing applications.

==== Message Ack in Batch Consumption

When records are consumed in batches (See the section above), then if the default ack mode of `BATCH` is used, then when the entire batch is processed successfully, it will be acknowledged.
If any records throw an exception, then the entire batch is negatively acknowledged.
Note that this may not be the same batch that was batched on the producer side, rather this is the batch that returned from calling `batchReceive` on the consumer

Let us look at the following batch listener:

====
[source, java]
----
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
for (Foo foo : messages) {
...
}
}
----
====

When all the messages in the incoming collection (`messages` in this example) are processed, the framework will acknowledge all of them.

When consuming in batch mode, `RECORD` is not an allowed ack mode.
This might cause an issue as application does not want the entire batch to be re-delivered again.
This might cause an issue as application may not want the entire batch to be re-delivered again.
For such situations, you need to use the `MANUAL` acknowledgement mode.

==== Manual Acknowledgment
==== Manual Messge Acknowledgment in Batch Consumption

When `MANUAL` ack mode is set on the message listener container, then the framework will not do any acknowledgment - positive or negative.
As seen in the previous section, when `MANUAL` ack mode is set on the message listener container, then the framework will not do any acknowledgment - positive or negative.
It is entirely up to the application to take care of such concerns.
When `MANUAL` ack mode is set, Spring for Apache Pulsar selects a compatible message listener container - `PulsarAcknowledgingMessageListener` when in record consumption and `PulsarBatchAcknowledgingMessageListener` for batch consumption.
These interfaces provide you access to an `Acknowledgment` object.
The `Acknowledgment` object provides the following API methods.

When `MANUAL` ack mode is set, Spring for Apache Pulsar selects a compatible message listener container - `PulsarBatchAcknowledgingMessageListener` for batch consumption which gives you access to an `Acknowledgment` object.
Once again, the following are the methods availble in the `Acknowledgment` API.
====
[source, java]
----
Expand All @@ -436,22 +609,27 @@ void nack(MessageId messageId);
----
====

You can inject this `Acknowledgment` object to your `PulsarListener` while using `MANUAL` ack mode and then call the corresponding method.
Here is a basic example for a record based listener.
You can inject this `Acknowledgment` object to your `PulsarListener` while using `MANUAL` ack mode.
Here is a basic example for a batch based listener.

====
[source, java]
----
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, Acknowlegement acknowledgment) {
System.out.println("Message Received: " + message);
acknowledgment.acknowledge();
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
for (Message<String> message : messages) {
try {
...
acknowledgment.acknowledge(message.getMessageId());
}
catch (Exception e) {
acknowledgment.nack(message.getMessageId());
}
}
}
----
====

You can also call `acknowledgment.nack()` to negatively acknowledge in which case the record will be re-delivered.

When using a batch listener, the message listener container cannot know which record it is currently operating upon.
Therefore, in order to manually acknowledge, you need to use one of the overloaded `acknowledge` method that takes a `MessageId` or a `List<MessageId>`.
You can also negatively acknowledge with the `MessageId` for the batch listener.
Expand Down Expand Up @@ -588,20 +766,16 @@ public void listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println("Data Received: " + message.getValue());
}
----
====

==== Accessing the Pulsar Messages Object

When consuming messages in batch mode using `PulsarListener`, instead of receiving them as a `List, you can receive them as Pulsar Messages type.
Here is an example.
====
or in batch receiver:

====
[source, java]
----
@PulsarListener(subscriptionName = "batch-subscription", topics = "hello-pulsar", batch = "true")
public void listen(org.apache.pulsar.client.api.Messages<String> messages) {
public void listen(List<org.apache.pulsar.client.api.Message<String>> messages) {
// Iterate on the messages
// Each iteration gives access to a org.apache.pulsar.client.api.Message object
}
----
====
Expand Down