diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java index e861fad9474e7..8a21385cef457 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/AsyncCallbacks.java @@ -20,6 +20,7 @@ import com.google.common.annotations.Beta; import java.util.List; +import java.util.Optional; /** * Definition of all the callbacks used for the ManagedLedger asynchronous API. @@ -116,7 +117,7 @@ interface TerminateCallback { interface FindEntryCallback { void findEntryComplete(Position position, Object ctx); - void findEntryFailed(ManagedLedgerException exception, Object ctx); + void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, Object ctx); } interface ResetCursorCallback { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index fc3f6a4797028..03a68ba98aeab 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -587,4 +587,11 @@ Set asyncReplayEntries( */ void setThrottleMarkDelete(double throttleMarkDelete); + /** + * Get {@link ManagedLedger} attached with cursor + * + * @return ManagedLedger + */ + ManagedLedger getManagedLedger(); + } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index d18527462985a..52926888b46f1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -45,6 +45,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -71,6 +72,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; @@ -766,7 +768,8 @@ public void findEntryComplete(Position position, Object ctx) { } @Override - public void findEntryFailed(ManagedLedgerException exception, Object ctx) { + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, + Object ctx) { result.exception = exception; counter.countDown(); } @@ -796,11 +799,12 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate max = getNumberOfEntriesInStorage(); break; default: - callback.findEntryFailed(new ManagedLedgerException("Unknown position constraint"), ctx); + callback.findEntryFailed(new ManagedLedgerException("Unknown position constraint"), Optional.empty(), ctx); return; } if (startPosition == null) { - callback.findEntryFailed(new ManagedLedgerException("Couldn't find start position"), ctx); + callback.findEntryFailed(new ManagedLedgerException("Couldn't find start position"), + Optional.empty(), ctx); return; } op = new OpFindNewest(this, startPosition, condition, max, callback, ctx); @@ -2581,5 +2585,10 @@ public void setThrottleMarkDelete(double throttleMarkDelete) { } } + @Override + public ManagedLedger getManagedLedger() { + return this.ledger; + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 749e560918d4d..a1a5a1527c6d7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -49,6 +49,7 @@ import java.util.UUID; import java.util.Objects; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentSkipListMap; @@ -3023,6 +3024,9 @@ public static ManagedLedgerException createManagedLedgerException(int bkErrorCod public static ManagedLedgerException createManagedLedgerException(Throwable t) { if (t instanceof org.apache.bookkeeper.client.api.BKException) { return createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException) t).getCode()); + } else if (t instanceof CompletionException + && !(t.getCause() instanceof CompletionException) /* check to avoid stackoverlflow */) { + return createManagedLedgerException(t.getCause()); } else { return new ManagedLedgerException("Unknown exception"); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java index 57e8044ad87ed..4bce5690ad35f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java @@ -21,6 +21,9 @@ import com.google.common.base.Predicate; import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; + +import java.util.Optional; + import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; @@ -107,7 +110,7 @@ public void readEntryComplete(Entry entry, Object ctx) { @Override public void readEntryFailed(ManagedLedgerException exception, Object ctx) { - callback.findEntryFailed(exception, OpFindNewest.this.ctx); + callback.findEntryFailed(exception, Optional.ofNullable(searchPosition), OpFindNewest.this.ctx); } public void find() { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 60d3dc9e7b898..c415320ddf5a4 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -40,6 +40,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.testng.annotations.Test; @@ -308,6 +309,12 @@ public void setThrottleMarkDelete(double throttleMarkDelete) { public double getThrottleMarkDelete() { return -1; } + + @Override + public ManagedLedger getManagedLedger() { + return null; + } + } @Test diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 4f2cce5950eb6..70db43c9ff4e0 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -39,6 +39,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; @@ -2077,7 +2078,8 @@ public void findEntryComplete(Position position, Object ctx) { } @Override - public void findEntryFailed(ManagedLedgerException exception, Object ctx) { + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, + Object ctx) { result.exception = exception; counter.countDown(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 85d3785d7bb96..deb274457bf92 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -18,10 +18,12 @@ */ package org.apache.pulsar.broker.service.persistent; +import java.util.Optional; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback; +import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; @@ -37,6 +39,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback { private final String subName; private final String topicName; private final Rate msgExpired; + private final boolean autoSkipNonRecoverableData; private static final int FALSE = 0; private static final int TRUE = 1; @@ -50,6 +53,9 @@ public PersistentMessageExpiryMonitor(String topicName, String subscriptionName, this.cursor = cursor; this.subName = subscriptionName; this.msgExpired = new Rate(); + this.autoSkipNonRecoverableData = cursor.getManagedLedger() != null // check to avoid test failures + ? cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData() + : false; } public void expireMessages(int messageTTLInSeconds) { @@ -124,10 +130,16 @@ public void findEntryComplete(Position position, Object ctx) { } @Override - public void findEntryFailed(ManagedLedgerException exception, Object ctx) { + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}][{}] Finding expired entry operation failed", topicName, subName, exception); } + if (autoSkipNonRecoverableData && failedReadPosition.isPresent() + && (exception instanceof NonRecoverableLedgerException)) { + log.warn("[{}][{}] read failed from ledger at position:{} : {}", topicName, subName, failedReadPosition, + exception.getMessage()); + findEntryComplete(failedReadPosition.get(), ctx); + } expirationCheckInProgress = FALSE; updateRates(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java index a0882008dba9b..9e7514987bf56 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageFinder.java @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Optional; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import static com.google.common.base.Preconditions.checkArgument; @@ -83,7 +84,7 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback } callback.findEntryFailed( new ManagedLedgerException.ConcurrentFindCursorPositionException("last find is still running"), - null); + Optional.empty(), null); } } @@ -106,7 +107,7 @@ public void findEntryComplete(Position position, Object ctx) { } @Override - public void findEntryFailed(ManagedLedgerException exception, Object ctx) { + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, Object ctx) { checkArgument(ctx instanceof AsyncCallbacks.FindEntryCallback); AsyncCallbacks.FindEntryCallback callback = (AsyncCallbacks.FindEntryCallback) ctx; if (log.isDebugEnabled()) { @@ -114,6 +115,6 @@ public void findEntryFailed(ManagedLedgerException exception, Object ctx) { timestamp, exception); } messageFindInProgress = FALSE; - callback.findEntryFailed(exception, null); + callback.findEntryFailed(exception, failedReadPosition, null); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index d4adf453370a9..34864a88e985e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -26,6 +26,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -593,7 +594,7 @@ public void findEntryComplete(Position position, Object ctx) { } @Override - public void findEntryFailed(ManagedLedgerException exception, Object ctx) { + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, Object ctx) { // todo - what can go wrong here that needs to be retried? if (exception instanceof ConcurrentFindCursorPositionException) { future.completeExceptionally(new SubscriptionBusyException(exception.getMessage())); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java index ae30b29e83166..086c087cc9864 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentMessageFinderTest.java @@ -29,6 +29,7 @@ import java.lang.reflect.Field; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -41,7 +42,11 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor; import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder; import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; @@ -98,7 +103,8 @@ public void findEntryComplete(Position position, Object ctx) { } @Override - public void findEntryFailed(ManagedLedgerException exception, Object ctx) { + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, + Object ctx) { result.exception = exception; future.completeExceptionally(exception); } @@ -167,20 +173,23 @@ void testPersistentMessageFinder() throws Exception { PersistentMessageFinder messageFinder = new PersistentMessageFinder("topicname", c1); final AtomicBoolean ex = new AtomicBoolean(false); - messageFinder.findEntryFailed(new ManagedLedgerException("failed"), new AsyncCallbacks.FindEntryCallback() { - @Override - public void findEntryComplete(Position position, Object ctx) { - } + messageFinder.findEntryFailed(new ManagedLedgerException("failed"), Optional.empty(), + new AsyncCallbacks.FindEntryCallback() { + @Override + public void findEntryComplete(Position position, Object ctx) { + } - @Override - public void findEntryFailed(ManagedLedgerException exception, Object ctx) { - ex.set(true); - } - }); + @Override + public void findEntryFailed(ManagedLedgerException exception, Optional failedReadPosition, + Object ctx) { + ex.set(true); + } + }); assertTrue(ex.get()); PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1); - monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), null); + monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), + Optional.empty(), null); Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress"); field.setAccessible(true); assertEquals(0, field.get(monitor)); @@ -190,4 +199,62 @@ public void findEntryFailed(ManagedLedgerException exception, Object ctx) { ledger.close(); factory.shutdown(); } + + /** + * It tests that message expiry doesn't get stuck if it can't read deleted ledger's entry. + * + * @throws Exception + */ + @Test + void testMessageExpiryWithNonRecoverableException() throws Exception { + + final String ledgerAndCursorName = "testPersistentMessageExpiryWithNonRecoverableLedgers"; + final int entriesPerLedger = 2; + final int totalEntries = 10; + final int ttlSeconds = 1; + + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setRetentionSizeInMB(10); + config.setMaxEntriesPerLedger(entriesPerLedger); + config.setRetentionTime(1, TimeUnit.HOURS); + config.setAutoSkipNonRecoverableData(true); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config); + ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName); + + for (int i = 0; i < totalEntries; i++) { + ledger.addEntry(createMessageWrittenToLedger("msg" + i)); + } + + List ledgers = ledger.getLedgersInfoAsList(); + LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1); + + assertEquals(ledgers.size(), totalEntries / entriesPerLedger); + + // this will make sure that all entries should be deleted + Thread.sleep(ttlSeconds); + + bkc.deleteLedger(ledgers.get(0).getLedgerId()); + bkc.deleteLedger(ledgers.get(1).getLedgerId()); + bkc.deleteLedger(ledgers.get(2).getLedgerId()); + + PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1); + Position previousMarkDelete = null; + for (int i = 0; i < totalEntries; i++) { + monitor.expireMessages(1); + Position previousPos = previousMarkDelete; + retryStrategically( + (test) -> c1.getMarkDeletedPosition() != null && !c1.getMarkDeletedPosition().equals(previousPos), + 5, 100); + previousMarkDelete = c1.getMarkDeletedPosition(); + } + + PositionImpl markDeletePosition = (PositionImpl) c1.getMarkDeletedPosition(); + assertEquals(lastLedgerInfo.getLedgerId(), markDeletePosition.getLedgerId()); + assertEquals(lastLedgerInfo.getEntries() - 1, markDeletePosition.getEntryId()); + + c1.close(); + ledger.close(); + factory.shutdown(); + + } }