Skip to content

Commit 6ecccdd

Browse files
liangyepianzhouTechnoboy-
authored andcommitted
[improve][broker] Do not retain the data in the system topic (#22022)
### Motivation For some use case, the users need to store all the messages even though these message are acked by all subscription. So they set the retention policy of the namespace to infinite retention (setting both time and size limits to `-1`). But the data in the system topic does not need for infinite retention. ### Modifications For system topics, do not retain messages that have already been acknowledged.
1 parent 1790438 commit 6ecccdd

File tree

2 files changed

+59
-4
lines changed

2 files changed

+59
-4
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1850,10 +1850,17 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T
18501850
}
18511851

18521852
if (retentionPolicies == null) {
1853-
retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
1854-
() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
1855-
serviceConfig.getDefaultRetentionSizeInMB())
1856-
);
1853+
if (SystemTopicNames.isSystemTopic(topicName)) {
1854+
if (log.isDebugEnabled()) {
1855+
log.debug("{} Disable data retention policy for system topic.", topicName);
1856+
}
1857+
retentionPolicies = new RetentionPolicies(0, 0);
1858+
} else {
1859+
retentionPolicies = policies.map(p -> p.retention_policies).orElseGet(
1860+
() -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(),
1861+
serviceConfig.getDefaultRetentionSizeInMB())
1862+
);
1863+
}
18571864
}
18581865

18591866
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();

pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionRetentionTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,21 @@
3838
import lombok.Cleanup;
3939
import lombok.extern.slf4j.Slf4j;
4040
import org.apache.bookkeeper.client.BookKeeper;
41+
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
4142
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
4243
import org.apache.pulsar.client.api.Message;
4344
import org.apache.pulsar.client.api.MessageId;
4445
import org.apache.pulsar.client.api.Producer;
4546
import org.apache.pulsar.client.api.PulsarClient;
4647
import org.apache.pulsar.client.api.Reader;
4748
import org.apache.pulsar.client.api.Schema;
49+
import org.apache.pulsar.common.naming.SystemTopicNames;
50+
import org.apache.pulsar.common.naming.TopicName;
4851
import org.apache.pulsar.common.policies.data.ClusterData;
52+
import org.apache.pulsar.common.policies.data.RetentionPolicies;
4953
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
5054
import org.awaitility.Awaitility;
55+
import org.testng.Assert;
5156
import org.testng.annotations.AfterMethod;
5257
import org.testng.annotations.BeforeMethod;
5358
import org.testng.annotations.Test;
@@ -212,6 +217,49 @@ public void testCompactionRetentionOnTopicCreationWithTopicPolicies() throws Exc
212217
);
213218
}
214219

220+
@Test
221+
public void testRetentionPolicesForSystemTopic() throws Exception {
222+
String namespace = "my-tenant/my-ns";
223+
String topicPrefix = "persistent://" + namespace + "/";
224+
admin.namespaces().setRetention(namespace, new RetentionPolicies(-1, -1));
225+
// Check event topics and transaction internal topics.
226+
for (String eventTopic : SystemTopicNames.EVENTS_TOPIC_NAMES) {
227+
checkSystemTopicRetentionPolicy(topicPrefix + eventTopic);
228+
}
229+
checkSystemTopicRetentionPolicy(topicPrefix + SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN);
230+
checkSystemTopicRetentionPolicy(topicPrefix + SystemTopicNames.TRANSACTION_COORDINATOR_LOG);
231+
checkSystemTopicRetentionPolicy(topicPrefix + SystemTopicNames.PENDING_ACK_STORE_SUFFIX);
232+
233+
// Check common topics.
234+
checkCommonTopicRetentionPolicy(topicPrefix + "my-topic" + System.nanoTime());
235+
// Specify retention policies for system topic.
236+
pulsar.getConfiguration().setTopicLevelPoliciesEnabled(true);
237+
pulsar.getConfiguration().setSystemTopicEnabled(true);
238+
admin.topics().createNonPartitionedTopic(topicPrefix + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
239+
admin.topicPolicies().setRetention(topicPrefix + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT,
240+
new RetentionPolicies(10, 10));
241+
Awaitility.await().untilAsserted(() -> {
242+
checkTopicRetentionPolicy(topicPrefix + SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT,
243+
new RetentionPolicies(10, 10));
244+
});
245+
}
246+
247+
private void checkSystemTopicRetentionPolicy(String topicName) throws Exception {
248+
checkTopicRetentionPolicy(topicName, new RetentionPolicies(0, 0));
249+
250+
}
251+
252+
private void checkCommonTopicRetentionPolicy(String topicName) throws Exception {
253+
checkTopicRetentionPolicy(topicName, new RetentionPolicies(-1, -1));
254+
}
255+
256+
private void checkTopicRetentionPolicy(String topicName, RetentionPolicies retentionPolicies) throws Exception {
257+
ManagedLedgerConfig config = pulsar.getBrokerService()
258+
.getManagedLedgerConfig(TopicName.get(topicName)).get();
259+
Assert.assertEquals(config.getRetentionSizeInMB(), retentionPolicies.getRetentionSizeInMB());
260+
Assert.assertEquals(config.getRetentionTimeMillis(),retentionPolicies.getRetentionTimeInMinutes() * 60000L);
261+
}
262+
215263
private void testCompactionCursorRetention(String topic) throws Exception {
216264
Set<String> keys = Sets.newHashSet("a", "b", "c");
217265
Set<String> keysToExpire = Sets.newHashSet("x1", "x2");

0 commit comments

Comments
 (0)