Skip to content

Commit

Permalink
[Doc] Describe default ack timeout and update grammar in ConsumerBuil…
Browse files Browse the repository at this point in the history
…der javadoc (apache#14084)

* [Doc] Describe default ack timeout in javadoc

* [Doc] Update javadoc grammar

Write in the simple present tense as much as possible if you are covering facts that were, are, and forever shall be true.
  • Loading branch information
liry authored Jul 8, 2022
1 parent 6614a0d commit 86ab67f
Showing 1 changed file with 55 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,11 @@ public interface ConsumerBuilder<T> extends Cloneable {
/**
* Finalize the {@link Consumer} creation by subscribing to the topic.
*
* <p>If the subscription does not exist, a new subscription will be created. By default the subscription
* will be created at the end of the topic. See {@link #subscriptionInitialPosition(SubscriptionInitialPosition)}
* <p>If the subscription does not exist, a new subscription is created. By default, the subscription
* is created at the end of the topic. See {@link #subscriptionInitialPosition(SubscriptionInitialPosition)}
* to configure the initial position behavior.
*
* <p>Once a subscription is created, it will retain the data and the subscription cursor even if the consumer
* <p>Once a subscription is created, it retains the data and the subscription cursor even if the consumer
* is not connected.
*
* @return the consumer builder instance
Expand All @@ -96,42 +96,42 @@ public interface ConsumerBuilder<T> extends Cloneable {
/**
* Finalize the {@link Consumer} creation by subscribing to the topic in asynchronous mode.
*
* <p>If the subscription does not exist, a new subscription will be created. By default the subscription
* will be created at the end of the topic. See {@link #subscriptionInitialPosition(SubscriptionInitialPosition)}
* <p>If the subscription does not exist, a new subscription is created. By default, the subscription
* is created at the end of the topic. See {@link #subscriptionInitialPosition(SubscriptionInitialPosition)}
* to configure the initial position behavior.
*
* <p>Once a subscription is created, it will retain the data and the subscription cursor even
* <p>Once a subscription is created, it retains the data and the subscription cursor even
* if the consumer is not connected.
*
* @return a future that will yield a {@link Consumer} instance
* @return a future that yields a {@link Consumer} instance
* @throws PulsarClientException
* if the the subscribe operation fails
*/
CompletableFuture<Consumer<T>> subscribeAsync();

/**
* Specify the topics this consumer will subscribe on.
* Specify the topics this consumer subscribes on.
*
* @param topicNames a set of topic that the consumer will subscribe on
* @param topicNames a set of topic that the consumer subscribes on
* @return the consumer builder instance
*/
ConsumerBuilder<T> topic(String... topicNames);

/**
* Specify a list of topics that this consumer will subscribe on.
* Specify a list of topics that this consumer subscribes on.
*
* @param topicNames a list of topic that the consumer will subscribe on
* @param topicNames a list of topic that the consumer subscribes on
* @return the consumer builder instance
*/
ConsumerBuilder<T> topics(List<String> topicNames);

/**
* Specify a pattern for topics that this consumer will subscribe on.
* Specify a pattern for topics that this consumer subscribes on.
*
* <p>The pattern will be applied to subscribe to all the topics, within a single namespace, that will match the
* <p>The pattern is applied to subscribe to all the topics, within a single namespace, that matches the
* pattern.
*
* <p>The consumer will automatically subscribe to topics created after itself.
* <p>The consumer automatically subscribes to topics created after itself.
*
* @param topicsPattern
* a regular expression to select a list of topics to subscribe to
Expand All @@ -140,15 +140,15 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> topicsPattern(Pattern topicsPattern);

/**
* Specify a pattern for topics that this consumer will subscribe on.
* Specify a pattern for topics that this consumer subscribes on.
*
* <p>It accepts regular expression and will be compiled into a pattern internally. Eg.
* <p>It accepts regular expression that is compiled into a pattern internally. Eg.
* "persistent://public/default/pattern-topic-.*"
*
* <p>The pattern will be applied to subscribe to all the topics, within a single namespace, that will match the
* <p>The pattern is applied to subscribe to all the topics, within a single namespace, that matches the
* pattern.
*
* <p>The consumer will automatically subscribe to topics created after itself.
* <p>The consumer automatically subscribes to topics created after itself.
*
* @param topicsPattern
* given regular expression for topics pattern
Expand All @@ -169,7 +169,7 @@ public interface ConsumerBuilder<T> extends Cloneable {

/**
* Specify the subscription properties for this subscription.
* Properties are immutable, and consumers under the same subscription will fail to create a subscription
* Properties are immutable, and consumers under the same subscription fails to create a subscription
* if they use different properties.
* @param subscriptionProperties
* @return
Expand All @@ -182,10 +182,11 @@ public interface ConsumerBuilder<T> extends Cloneable {
* 1 second.
*
* <p>By default, the acknowledge timeout is disabled and that means that messages delivered to a
* consumer will not be re-delivered unless the consumer crashes.
* consumer is not re-delivered unless the consumer crashes. Since 2.3.0, when a dead letter policy
* is specified, and no ackTimeoutMillis is specified, the ack timeout is set to 30 seconds.
*
* <p>When enabling ack timeout, if a message is not acknowledged within the specified timeout
* it will be re-delivered to the consumer (possibly to a different consumer in case of
* it is re-delivered to the consumer (possibly to a different consumer in case of
* a shared subscription).
*
* @param ackTimeout
Expand All @@ -197,7 +198,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> ackTimeout(long ackTimeout, TimeUnit timeUnit);

/**
* Ack will return receipt but does not mean that the message will not be resent after get receipt.
* Ack returns receipt but does not mean that the message is not resent after get receipt.
*
* @param isAckReceiptEnabled {@link Boolean} is enable ack for receipt
* @return the consumer builder instance
Expand All @@ -207,8 +208,8 @@ public interface ConsumerBuilder<T> extends Cloneable {
/**
* Define the granularity of the ack-timeout redelivery.
*
* <p>By default, the tick time is set to 1 second. Using an higher tick time will
* reduce the memory overhead to track messages when the ack-timeout is set to
* <p>By default, the tick time is set to 1 second. Using an higher tick time
* reduces the memory overhead to track messages when the ack-timeout is set to
* bigger values (eg: 1hour).
*
* @param tickTime
Expand All @@ -223,7 +224,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Set the delay to wait before re-delivering messages that have failed to be process.
*
* <p>When application uses {@link Consumer#negativeAcknowledge(Message)}, the failed message
* will be redelivered after a fixed timeout. The default is 1 min.
* is redelivered after a fixed timeout. The default is 1 min.
*
* @param redeliveryDelay
* redelivery delay for failed messages
Expand Down Expand Up @@ -268,8 +269,8 @@ public interface ConsumerBuilder<T> extends Cloneable {
/**
* Sets a {@link MessageListener} for the consumer
*
* <p>When a {@link MessageListener} is set, application will receive messages through it. Calls to
* {@link Consumer#receive()} will not be allowed.
* <p>When a {@link MessageListener} is set, application receives messages through it. Calls to
* {@link Consumer#receive()} is not allowed.
*
* @param messageListener
* the listener object
Expand Down Expand Up @@ -327,7 +328,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Sets the ConsumerCryptoFailureAction to the value specified.
*
* @param action
* the action the consumer will take in case of decryption failures
* the action the consumer takes in case of decryption failures
* @return the consumer builder instance
*/
ConsumerBuilder<T> cryptoFailureAction(ConsumerCryptoFailureAction action);
Expand All @@ -347,8 +348,8 @@ public interface ConsumerBuilder<T> extends Cloneable {
* size is zero. {@link Consumer#receive()} function call should not be interrupted when the consumer queue size is
* zero.</li>
* <li>Doesn't support Batch-Message: if consumer receives any batch-message then it closes consumer connection with
* broker and {@link Consumer#receive()} call will remain blocked while {@link Consumer#receiveAsync()} receives
* exception in callback. <b> consumer will not be able receive any further message unless batch-message in pipeline
* broker and {@link Consumer#receive()} call remains blocked while {@link Consumer#receiveAsync()} receives
* exception in callback. <b> consumer is not able to receive any further message unless batch-message in pipeline
* is removed</b></li>
* </ul>
* Default value is {@code 1000} messages and should be good for most use cases.
Expand All @@ -362,10 +363,10 @@ public interface ConsumerBuilder<T> extends Cloneable {
/**
* Group the consumer acknowledgments for the specified time.
*
* <p>By default, the consumer will use a 100 ms grouping time to send out the acknowledgments to the broker.
* <p>By default, the consumer uses a 100 ms grouping time to send out the acknowledgments to the broker.
*
* <p>Setting a group time of 0, will send out the acknowledgments immediately. A longer ack group time
* will be more efficient at the expense of a slight increase in message re-deliveries after a failure.
* <p>Setting a group time of 0 sends out the acknowledgments immediately. A longer ack group time
* is more efficient at the expense of a slight increase in message re-deliveries after a failure.
*
* @param delay
* the max amount of time an acknowledgemnt can be delayed
Expand All @@ -384,7 +385,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
/**
* Set the max total receiver queue size across partitons.
*
* <p>This setting will be used to reduce the receiver queue size for individual partitions
* <p>This setting is used to reduce the receiver queue size for individual partitions
* {@link #receiverQueueSize(int)} if the total exceeds this value (default: 50000).
* The purpose of this setting is to have an upper-limit on the number
* of messages that a consumer can be pushed at once from a broker, across all
Expand Down Expand Up @@ -420,14 +421,14 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> consumerEventListener(ConsumerEventListener consumerEventListener);

/**
* If enabled, the consumer will read messages from the compacted topic rather than reading the full message backlog
* If enabled, the consumer reads messages from the compacted topic rather than reading the full message backlog
* of the topic. This means that, if the topic has been compacted, the consumer will only see the latest value for
* each key in the topic, up until the point in the topic message backlog that has been compacted. Beyond that
* point, the messages will be sent as normal.
* point, the messages are sent as normal.
*
* <p>readCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer
* (i.e. failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics
* or on a shared subscription, will lead to the subscription call throwing a PulsarClientException.
* or on a shared subscription leads to the subscription call throwing a PulsarClientException.
*
* @param readCompacted
* whether to read from the compacted topic
Expand Down Expand Up @@ -466,11 +467,11 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching
* messages. Here, broker follows descending priorities. (eg: 0=max-priority, 1, 2,..)
*
* <p>In Shared subscription mode, broker will first dispatch messages to max priority-level
* consumers if they have permits, else broker will consider next priority level consumers.
* <p>In Shared subscription mode, broker first dispatches messages to max priority-level
* consumers if they have permits, else broker considers next priority level consumers.
*
* <p>If subscription has consumer-A with priorityLevel 0 and Consumer-B with priorityLevel 1
* then broker will dispatch messages to only consumer-A until it runs out permit and then broker
* then broker dispatches messages to only consumer-A until it runs out permit and then broker
* starts dispatching messages to Consumer-B.
*
* <p><pre>
Expand Down Expand Up @@ -511,7 +512,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Set a name/value property with this consumer.
*
* <p>Properties are application defined metadata that can be attached to the consumer.
* When getting the topic stats, this metadata will be associated to the consumer stats for easier identification.
* When getting the topic stats, this metadata are associated to the consumer stats for easier identification.
*
* @param key
* the property key
Expand All @@ -525,7 +526,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Add all the properties in the provided map to the consumer.
*
* <p>Properties are application defined metadata that can be attached to the consumer.
* When getting the topic stats, this metadata will be associated to the consumer stats for easier identification.
* When getting the topic stats, this metadata are associated to the consumer stats for easier identification.
*
* @param properties the map with properties
* @return the consumer builder instance
Expand Down Expand Up @@ -560,9 +561,9 @@ public interface ConsumerBuilder<T> extends Cloneable {
/**
* Set dead letter policy for consumer.
*
* <p>By default some message will redelivery so many times possible, even to the extent that it can be never stop.
* By using dead letter mechanism messages will has the max redelivery count, when message exceeding the maximum
* number of redeliveries, message will send to the Dead Letter Topic and acknowledged automatic.
* <p>By default, some message are redelivered so many times possible, even to the extent that it can be never stop.
* By using dead letter mechanism, messages have the max redelivery count. When message exceeds the maximum
* number of redeliveries, message is send to the Dead Letter Topic and acknowledged automatically.
*
* <p>You can enable the dead letter mechanism by setting dead letter policy.
* example:
Expand All @@ -583,12 +584,12 @@ public interface ConsumerBuilder<T> extends Cloneable {
* .subscribe();
* </pre>
* When a dead letter policy is specified, and no ackTimeoutMillis is specified,
* then the ack timeout will be set to 30000 millisecond.
* then the ack timeout is set to 30000 millisecond.
*/
ConsumerBuilder<T> deadLetterPolicy(DeadLetterPolicy deadLetterPolicy);

/**
* If enabled, the consumer will auto subscribe for partitions increasement.
* If enabled, the consumer auto-subscribes for partitions increasement.
* This is only for partitioned consumer.
*
* @param autoUpdate
Expand Down Expand Up @@ -643,7 +644,7 @@ public interface ConsumerBuilder<T> extends Cloneable {

/**
* Set batch receive policy {@link BatchReceivePolicy} for consumer.
* By default, consumer will use {@link BatchReceivePolicy#DEFAULT_POLICY} as batch receive policy.
* By default, consumer uses {@link BatchReceivePolicy#DEFAULT_POLICY} as batch receive policy.
*
* <p>Example:
* <pre>
Expand All @@ -657,8 +658,8 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> batchReceivePolicy(BatchReceivePolicy batchReceivePolicy);

/**
* If enabled, the consumer will auto retry message.
* default disabled.
* If enabled, the consumer auto retries message.
* Default: disabled.
*
* @param retryEnable
* whether to auto retry message
Expand Down Expand Up @@ -746,22 +747,22 @@ public interface ConsumerBuilder<T> extends Cloneable {
* Enable pooling of messages and the underlying data buffers.
* <p/>
* When pooling is enabled, the application is responsible for calling Message.release() after the handling of every
* received message. If “release()” is not called on a received message, there will be a memory leak. If an
* received message. If “release()” is not called on a received message, it causes a memory leak. If an
* application attempts to use and already “released” message, it might experience undefined behavior (eg: memory
* corruption, deserialization error, etc.).
*/
ConsumerBuilder<T> poolMessages(boolean poolMessages);

/**
* If it's configured with a non-null value, the consumer will use the processor to process the payload, including
* If it's configured with a non-null value, the consumer uses the processor to process the payload, including
* decoding it to messages and triggering the listener.
*
* Default: null
*/
ConsumerBuilder<T> messagePayloadProcessor(MessagePayloadProcessor payloadProcessor);

/**
* Notice: the negativeAckRedeliveryBackoff will not work with `consumer.negativeAcknowledge(MessageId messageId)`
* Notice: the negativeAckRedeliveryBackoff doesn't work with `consumer.negativeAcknowledge(MessageId messageId)`
* because we are not able to get the redelivery count from the message ID.
*
* <p>Example:
Expand All @@ -775,7 +776,7 @@ public interface ConsumerBuilder<T> extends Cloneable {
ConsumerBuilder<T> negativeAckRedeliveryBackoff(RedeliveryBackoff negativeAckRedeliveryBackoff);

/**
* Notice: the redeliveryBackoff will not work with `consumer.negativeAcknowledge(MessageId messageId)`
* Notice: the redeliveryBackoff doesn't work with `consumer.negativeAcknowledge(MessageId messageId)`
* because we are not able to get the redelivery count from the message ID.
*
* <p>Example:
Expand Down

0 comments on commit 86ab67f

Please sign in to comment.