[improve][broker] Support consumer side delayed messages#24372
[improve][broker] Support consumer side delayed messages#24372Denovo1998 wants to merge 1 commit intoapache:masterfrom
Conversation
|
Currently, there seem to be some issues with this implementation. (Note, this PIP only discusses the implementation of BucketDelayedDeliveryTracker) If a delayed message in the LastMutableBucket has not been flushed to the Bookie, and a failure occurs, the data in the LastMutableBucket will be lost. However, this will not have any impact, as after restarting, the messages will still be read from the MackDelete position onwards, and the Bucket will be rebuilt. This is why the data in the Bucket can be deleted as long as it is read (without requiring client Ack). If we send a command to add a delayed message from the consumer side, and it fails to be persisted (sealBucketAndAsyncPersistent) in the LastMutableBucket, and the Broker crashes, the command will be lost. We cannot wait until the Seal Bucket condition is triggered to return the delayed message command cancellation success, because we do not know how long it will take. This part is very important and needs to be resolved. I will think about how to solve this part, and everyone is welcome to discuss it with me. |
Fixes #xyz
Main Issue: #xyz
PIP: #xyz
Motivation
When consumers encounter transient failures while processing messages, a common requirement is to retry processing after a certain delay. Before this feature, developers typically resorted to:
These existing solutions introduced varying degrees of overhead, complexity, or inflexibility. This change aims to provide a native, efficient mechanism for consumer-side delayed message redelivery.
Modifications
This change introduces the capability for Pulsar consumers to negatively acknowledge (Nack) messages with a specified custom delay. The key modifications include:
Client API Extension:
negativeAcknowledge(MessageId messageId, long delay, TimeUnit unit)andnegativeAcknowledge(Message<?> message, long delay, TimeUnit unit)have been added to theorg.apache.pulsar.client.api.Consumerinterface.Protocol Enhancement:
CommandRedeliverUnacknowledgedMessagesprotobuf command has been augmented with an optionaldelay_at_time(uint64) field. This field carries the absolute timestamp at which the message is expected to be redelivered.Broker-Side Core Logic Adjustments:
ServerCnx: Updated to recognize and process thedelay_at_timefield inCommandRedeliverUnacknowledgedMessages.Consumer/Subscription/Dispatcher:delayAtTime.PersistentDispatcherMultipleConsumers(and its classic variant) now leverage the topic-levelDelayedDeliveryTrackerto manage Nacked messages with a specified delay. The message's position and the targetdelayAtTimeare added to the tracker, which triggers redelivery when the specified time is reached.SharedandKey_Sharedsubscription types to provide precise delay semantics.With these modifications, consumers can directly request the broker to redeliver a message after a specific delay, leading to:
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: