Skip to content

Fix references to invalid 'ackTimeout' property #1015

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -625,12 +625,12 @@ Apache Pulsar provides various native strategies for message redelivery and erro
By default, Pulsar consumers do not redeliver messages unless the consumer crashes, but you can change this behavior by setting an ack timeout on the Pulsar consumer.
If the ack timeout property has a value above zero and if the Pulsar consumer does not acknowledge a message within that timeout period, the message is redelivered.

When you use Spring for Apache Pulsar, you can set this property via a <<_consumer_customization_on_pulsarlistener,consumer customizer>> or with the native Pulsar `ackTimeout` property in the `properties` attribute of `@PulsarListener`:
When you use Spring for Apache Pulsar, you can set this property via a <<_consumer_customization_on_pulsarlistener,consumer customizer>> or with the native Pulsar `ackTimeoutMillis` property in the `properties` attribute of `@PulsarListener`:

[source, java]
----
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"ackTimeout=60s"})
properties = {"ackTimeoutMillis=60000"})
public void listen(String s) {
...
}
Expand All @@ -649,7 +649,7 @@ class AckTimeoutRedeliveryConfig {
@PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
topics = "withAckTimeoutRedeliveryBackoff-test-topic",
ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
properties = { "ackTimeout=60s" })
properties = { "ackTimeoutMillis=60000" })
void listen(String msg) {
// some long-running process that may cause an ack timeout
}
Expand Down Expand Up @@ -720,7 +720,7 @@ class DeadLetterPolicyConfig {

@PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeout=1s" })
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeoutMillis=1000" })
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
Expand All @@ -743,7 +743,7 @@ This bean specifies a number of things, such as the max delivery (10, in this ca
If you do not specify a DLQ topic name, it defaults to `<topicname>-<subscriptionname>-DLQ` in Pulsar.
Next, we provide this bean name to `PulsarListener` by setting the `deadLetterPolicy` property.
Note that the `PulsarListener` has a subscription type of `Shared`, as the DLQ feature only works with shared subscriptions.
This code is primarily for demonstration purposes, so we provide an `ackTimeout` value of 1 second.
This code is primarily for demonstration purposes, so we provide an `ackTimeoutMillis` value of 1000.
The idea is that the code throws the exception and, if Pulsar does not receive an ack within 1 second, it does a retry.
If that cycle continues ten times (as that is our max redelivery count in the `DeadLetterPolicy`), the Pulsar consumer publishes the messages to the DLQ topic.
We have another `PulsarListener` that listens on the DLQ topic to receive data as it is published to the DLQ topic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<Strin
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
.map(MessageResult::acknowledge);
}
----
Here we receive the records as a `Flux` of Pulsar messages.
In addition, to enable stream consumption at the `ReactivePulsarListener` level, you need to set the `stream` property on the annotation to `true`.
Expand Down Expand Up @@ -293,7 +294,7 @@ You can specify this property directly as a Pulsar consumer property via a <<rea
----
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("ackTimeout", "60s");
return b -> b.property("ackTimeoutMillis", "60000");
}
----

Expand Down Expand Up @@ -342,7 +343,7 @@ class DeadLetterPolicyConfig {

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
return b -> b.property("ackTimeout", "1s");
return b -> b.property("ackTimeoutMillis", "1000");
}
}
----
Expand All @@ -352,7 +353,7 @@ This bean specifies a number of things, such as the max delivery (10, in this ca
If you do not specify a DLQ topic name, it defaults to `<topicname>-<subscriptionname>-DLQ` in Pulsar.
Next, we provide this bean name to `ReactivePulsarListener` by setting the `deadLetterPolicy` property.
Note that the `ReactivePulsarListener` has a subscription type of `Shared`, as the DLQ feature only works with shared subscriptions.
This code is primarily for demonstration purposes, so we provide an `ackTimeout` value of 1 second.
This code is primarily for demonstration purposes, so we provide an `ackTimeoutMillis` value of 1000.
The idea is that the code throws the exception and, if Pulsar does not receive an ack within 1 second, it does a retry.
If that cycle continues ten times (as that is our max redelivery count in the `DeadLetterPolicy`), the Pulsar consumer publishes the messages to the DLQ topic.
We have another `ReactivePulsarListener` that listens on the DLQ topic to receive data as it is published to the DLQ topic.
Expand Down
Loading