Skip to content

Commit

Permalink
[fix][broker] Fix write duplicate entries into the compacted ledger a…
Browse files Browse the repository at this point in the history
…fter RawReader reconnects (#21081)
  • Loading branch information
coderzc authored Sep 4, 2023
1 parent e59c850 commit 2921a41
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId
reader.seekAsync(from).thenCompose((v) -> {
Semaphore outstanding = new Semaphore(MAX_OUTSTANDING);
CompletableFuture<Void> loopPromise = new CompletableFuture<>();
phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise);
phaseTwoLoop(reader, to, latestForKey, ledger, outstanding, loopPromise, MessageId.earliest);
return loopPromise;
}).thenCompose((v) -> closeLedger(ledger))
.thenCompose((v) -> reader.acknowledgeCumulativeAsync(lastReadId,
Expand All @@ -227,7 +227,8 @@ private CompletableFuture<Long> phaseTwoSeekThenLoop(RawReader reader, MessageId
}

private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId> latestForKey,
LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise) {
LedgerHandle lh, Semaphore outstanding, CompletableFuture<Void> promise,
MessageId lastCompactedMessageId) {
if (promise.isDone()) {
return;
}
Expand All @@ -236,6 +237,12 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
m.close();
return;
}

if (m.getMessageId().compareTo(lastCompactedMessageId) <= 0) {
phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise, lastCompactedMessageId);
return;
}

try {
MessageId id = m.getMessageId();
Optional<RawMessage> messageToAdd = Optional.empty();
Expand Down Expand Up @@ -306,7 +313,7 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
}
return;
}
phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise);
phaseTwoLoop(reader, to, latestForKey, lh, outstanding, promise, m.getMessageId());
} finally {
m.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1919,4 +1919,70 @@ public void testDispatcherMaxReadSizeBytes() throws Exception {
consumer.close();
producer.close();
}

@Test
public void testCompactionDuplicate() throws Exception {
String topic = "persistent://my-property/use/my-ns/testCompactionDuplicate";
final int numMessages = 1000;
final int maxKeys = 800;

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

// trigger compaction (create __compaction cursor)
admin.topics().triggerCompaction(topic);

Map<String, byte[]> expected = new HashMap<>();
Random r = new Random(0);

pulsarClient.newConsumer().topic(topic).subscriptionName("sub1").readCompacted(true).subscribe().close();

for (int j = 0; j < numMessages; j++) {
int keyIndex = r.nextInt(maxKeys);
String key = "key" + keyIndex;
byte[] data = ("my-message-" + key + "-" + j).getBytes();
producer.newMessage().key(key).value(data).send();
expected.put(key, data);
}

producer.flush();

// trigger compaction
admin.topics().triggerCompaction(topic);

Awaitility.await().untilAsserted(() -> {
assertEquals(admin.topics().compactionStatus(topic).status,
LongRunningProcessStatus.Status.RUNNING);
});

// Wait for phase one to complete
Thread.sleep(500);

// Unload topic make reader of compaction reconnect
admin.topics().unload(topic);

Awaitility.await().untilAsserted(() -> {
PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic, false);
// Compacted topic ledger should have same number of entry equals to number of unique key.
Assert.assertEquals(internalStats.compactedLedger.entries, expected.size());
Assert.assertTrue(internalStats.compactedLedger.ledgerId > -1);
Assert.assertFalse(internalStats.compactedLedger.offloaded);
});

// consumer with readCompacted enabled only get compacted entries
try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.readCompacted(true).subscribe()) {
while (true) {
Message<byte[]> m = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertEquals(expected.remove(m.getKey()), m.getData());
if (expected.isEmpty()) {
break;
}
}
}
}
}

0 comments on commit 2921a41

Please sign in to comment.