Skip to content

[Enhancement] Optimize delayed message delivery by reducing unnecessary reads to storage in InMemoryDelayedDeliveryTracker #23912

Open
@lhotari

Description

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

In the current delayed message delivery, there's an opportunity to reduce unnecessary reads to storage.

In Pulsar, there are 2 implementations for the delayed delivery tracker, InMemoryDelayedDeliveryTracker and BucketDelayedDeliveryTracker.
This is configured by the delayedDeliveryTrackerFactoryClassName configuration key. The default setting chooses the in memory implementation:

pulsar/conf/broker.conf

Lines 614 to 617 in b02d52c

# Class name of the factory that implements the delayed deliver tracker.
# If value is "org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory",
# will create bucket based delayed message index tracker.
delayedDeliveryTrackerFactoryClassName=org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory

The BucketDelayedDeliveryTracker contains an optimization to skip messages in reading which have been "indexed":

protected Predicate<Position> createReadEntriesSkipConditionForNormalRead() {
Predicate<Position> skipCondition = null;
// Filter out and skip read delayed messages exist in DelayedDeliveryTracker
if (delayedDeliveryTracker.isPresent()) {
final DelayedDeliveryTracker deliveryTracker = delayedDeliveryTracker.get();
if (deliveryTracker instanceof BucketDelayedDeliveryTracker) {
skipCondition = position -> ((BucketDelayedDeliveryTracker) deliveryTracker)
.containsMessage(position.getLedgerId(), position.getEntryId());
}
}
return skipCondition;
}

This already reduces reads when BucketDelayedDeliveryTracker is in use.

The state of the InMemoryDelayedDeliveryTracker gets cleared after all consumers have disconnected:

if (consumerList.isEmpty()) {
if (havePendingRead || havePendingReplayRead) {
// There is a pending read from previous run. We must wait for it to complete and then rewind
shouldRewindBeforeReadingOrReplaying = true;
} else {
cursor.rewind();
shouldRewindBeforeReadingOrReplaying = false;
}
redeliveryMessages.clear();
delayedDeliveryTracker.ifPresent(tracker -> {
// Don't clean up BucketDelayedDeliveryTracker, otherwise we will lose the bucket snapshot
if (tracker instanceof InMemoryDelayedDeliveryTracker) {
tracker.clear();
}
});
}

Solution

It would be useful to keep state also in the InMemoryDelayedDeliveryTracker and skip reading delayed messages when the information is already available for the delivery time of a specific entry.

Alternatives

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/enhancementThe enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions