Skip to content

Commit

Permalink
[fix][broker] Fix uncompleted future when get the topic policies of a…
Browse files Browse the repository at this point in the history
… deleted topic (apache#18824)

Fix the uncompleted future when getting the topic policies of a deleted topic.
https://github.com/apache/pulsar/blob/30b52a1ac11b4be485258140a167b5e635586a36/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java#L513-L535
`future.complete(null);` when `msg.getValue() == null`.

(cherry picked from commit b311961)
(cherry picked from commit 1d98196)
  • Loading branch information
liangyepianzhou authored and nicoloboschi committed Jan 10, 2023
1 parent 0d31b28 commit d8cc4ca
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,8 @@ private void fetchTopicPoliciesAsyncAndCloseReader(SystemTopicClient.Reader<Puls
} else {
fetchTopicPoliciesAsyncAndCloseReader(reader, topicName, policies, future);
}
} else {
future.complete(null);
}
});
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3077,4 +3077,22 @@ public void testProduceChangesWithEncryptionRequired() throws Exception {
assertNotEquals(newLac, beforeLac);
});
}

@Test
public void testGetTopicPoliciesWhenDeleteTopicPolicy() throws Exception {
admin.topics().createNonPartitionedTopic(persistenceTopic);
admin.topicPolicies().setMaxConsumers(persistenceTopic, 5);

int maxConsumerPerTopic = pulsar
.getTopicPoliciesService()
.getTopicPoliciesBypassCacheAsync(TopicName.get(persistenceTopic)).get()
.getMaxConsumerPerTopic();

assertEquals(maxConsumerPerTopic, 5);
admin.topics().delete(persistenceTopic, true);
TopicPolicies topicPolicies =pulsar.getTopicPoliciesService()
.getTopicPoliciesBypassCacheAsync(TopicName.get(persistenceTopic)).get(5, TimeUnit.SECONDS);
assertNull(topicPolicies);
}

}

0 comments on commit d8cc4ca

Please sign in to comment.