diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index fc3ac89443995f..65f53562495716 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1008,7 +1008,8 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs } private CompletableFuture getDurableSubscription(String subscriptionName, - InitialPosition initialPosition, long startMessageRollbackDurationSec, + InitialPosition initialPosition, + long startMessageRollbackDurationSec, boolean readCompacted, boolean replicated, Map subscriptionProperties) { CompletableFuture subscriptionFuture = new CompletableFuture<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index eba7f1e8c73c3e..dfb19363b84fdf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1893,6 +1893,7 @@ public void testReadCommittedWithCompaction() throws Exception{ .topic(topic) .subscriptionName("sub") .subscriptionType(SubscriptionType.Exclusive) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .readCompacted(true) .subscribe(); List result = new ArrayList<>(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 7d7cbc1759ac1b..7cd3be4f6efaf3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -2190,9 +2190,11 @@ public void testCompactionWithTTL() throws Exception { }); @Cleanup - Consumer consumer = - pulsarClient.newConsumer(Schema.STRING).topic(topicName).subscriptionName(subName).readCompacted(true) - .subscribe(); + Consumer consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName) + .subscriptionName("sub-2") + .readCompacted(true) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); List result = new ArrayList<>(); while (true) {