Skip to content

Commit

Permalink
Avoid consumers receiving acknowledged messages from a compacted topi…
Browse files Browse the repository at this point in the history
…c upon reconnection
  • Loading branch information
coderzc committed Sep 21, 2023
1 parent b8ebfe3 commit 9335ac7
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3572,7 +3572,7 @@ public boolean checkAndUpdateReadPositionChanged() {
return isReadPositionOnTail || isReadPositionChanged;
}

private boolean isCompactionCursor() {
public boolean isCompactionCursor() {
return COMPACTION_CURSOR_NAME.equals(name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.ConcurrentWaitCallbackException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
Expand Down Expand Up @@ -349,7 +350,11 @@ protected void readMoreEntries(Consumer consumer) {
}
havePendingRead = true;
if (consumer.readCompacted()) {
boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId());
boolean readFromEarliest = false;
if (!cursor.isDurable() || ((ManagedCursorImpl) cursor).isCompactionCursor()
|| cursor.getPersistentMarkDeletedPosition() == null) {
readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId());
}
CompactedTopicUtils.asyncReadCompactedEntries(topic.getTopicCompactionService(), cursor,
messagesToRead, bytesToRead, readFromEarliest, this, true, consumer);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,12 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,
boolean isFirstRead,
ReadEntriesCallback callback, Consumer consumer) {
PositionImpl cursorPosition;
if (isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId())){
boolean readFromEarliest = false;
if (!cursor.isDurable() || ((ManagedCursorImpl) cursor).isCompactionCursor()
|| cursor.getPersistentMarkDeletedPosition() == null) {
readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId());
}
if (readFromEarliest){
cursorPosition = PositionImpl.EARLIEST;
} else {
cursorPosition = (PositionImpl) cursor.getReadPosition();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1985,4 +1985,55 @@ public void testCompactionDuplicate() throws Exception {
}
}
}

@Test(timeOut = 100000)
public void testAcknowledge() throws Exception {
final String topicName = "persistent://my-property/use/my-ns/testAcknowledge" + UUID.randomUUID();
final String subName = "my-sub";
@Cleanup
PulsarClient client = newPulsarClient(lookupUrl.toString(), 100);
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(false).topic(topicName).create();

pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).readCompacted(true).subscribe().close();

Map<String, String> expected = new HashMap<>();
for (int i = 0; i < 10; i++) {
producer.newMessage().key(String.valueOf(i)).value(String.valueOf(i)).send();
expected.put(String.valueOf(i), String.valueOf(i));
}
producer.flush();

admin.topics().triggerCompaction(topicName);

Awaitility.await().untilAsserted(() -> {
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
});

ConsumerImpl<String> consumer = (ConsumerImpl<String>) client.newConsumer(Schema.STRING)
.topic(topicName).readCompacted(true).receiverQueueSize(1).subscriptionName(subName)
.isAckReceiptEnabled(true)
.subscribe();

for (int i = 0; i < 10; i++) {
Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
if (message == null) {
break;
}

consumer.acknowledge(message);
String remove = expected.remove(message.getKey());
Assert.assertEquals(remove, message.getValue());
}

// Make consumer reconnect to broker
admin.topics().unload(topicName);

Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
Assert.assertNull(message);

consumer.close();
producer.close();
}
}

0 comments on commit 9335ac7

Please sign in to comment.