Skip to content

Commit

Permalink
[fix][broker] Fix multiple race conditions in topic unloading and loa…
Browse files Browse the repository at this point in the history
…ding

- possible related issues are #5284, #14941 and #20526
  • Loading branch information
lhotari committed Dec 26, 2023
1 parent 0dd1672 commit 744581f
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicP
protected volatile boolean transferring = false;
private volatile List<PublishRateLimiter> activeRateLimiters;

private volatile TopicCacheCleanupFunction cleanupFunction;

public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
Expand Down Expand Up @@ -1250,6 +1252,23 @@ public HierarchyTopicPolicies getHierarchyTopicPolicies() {
return topicPolicies;
}

@Override
public void registerTopicCacheCleanupFunction(TopicCacheCleanupFunction cleanupFunction) {
if (this.cleanupFunction != null) {
log.warn("Topic {} has already been cached. It should have been removed before re-adding.", topic);
}
this.cleanupFunction = cleanupFunction;
}

@Override
public void cleanupTopicCache(CompletableFuture<Optional<Topic>> topicFuture) {
TopicCacheCleanupFunction c = this.cleanupFunction;
this.cleanupFunction = null;
if (c != null) {
c.cleanup(topicFuture);
}
}

public void updateBrokerSubscriptionDispatchRate() {
topicPolicies.getSubscriptionDispatchRate().updateBrokerValue(
subscriptionDispatchRateInBroker(brokerService.pulsar().getConfiguration()));
Expand Down
Loading

0 comments on commit 744581f

Please sign in to comment.