From 917f5bd28c5c44c92a06a4a99823be8902807064 Mon Sep 17 00:00:00 2001 From: rdhabalia Date: Mon, 8 Jan 2018 17:22:21 -0800 Subject: [PATCH] Introduce config to skip non-recoverable data-ledger --- conf/broker.conf | 3 + conf/standalone.conf | 3 + .../mledger/ManagedLedgerConfig.java | 15 +++ .../mledger/ManagedLedgerException.java | 13 ++ .../mledger/impl/EntryCacheImpl.java | 11 +- .../mledger/impl/EntryCacheManager.java | 3 +- .../mledger/impl/ManagedCursorImpl.java | 25 +++- .../impl/ManagedLedgerFactoryImpl.java | 3 +- .../mledger/impl/ManagedLedgerImpl.java | 50 +++++-- .../bookkeeper/mledger/impl/OpReadEntry.java | 25 +++- .../pulsar/broker/ServiceConfiguration.java | 12 ++ .../pulsar/broker/service/BrokerService.java | 26 ++++ .../service/BrokerBkEnsemblesTests.java | 125 +++++++++++++++++- site/_data/config/broker.yaml | 3 + site/_data/config/standalone.yaml | 2 + 15 files changed, 287 insertions(+), 32 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index 20da0a9104b0d..d983b3f2df656 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -300,6 +300,9 @@ managedLedgerMaxUnackedRangesToPersist=10000 # zookeeper. managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000 +# Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets +# corrupted at bookkeeper and managed-cursor is stuck at that ledger. +autoSkipNonRecoverableData=false ### --- Load balancer --- ### diff --git a/conf/standalone.conf b/conf/standalone.conf index ce35f2ebdb71c..ec400fbc61d4f 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -266,6 +266,9 @@ managedLedgerMaxUnackedRangesToPersist=10000 # zookeeper. managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000 +# Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets +# corrupted at bookkeeper and managed-cursor is stuck at that ledger. +autoSkipNonRecoverableData=false ### --- Load balancer --- ### diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 391a484f90fe4..6f9847bb19be4 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -51,6 +51,7 @@ public class ManagedLedgerConfig { private double throttleMarkDelete = 0; private long retentionTimeMs = 0; private long retentionSizeInMB = 0; + private boolean autoSkipNonRecoverableData; private DigestType digestType = DigestType.MAC; private byte[] password = "".getBytes(Charsets.UTF_8); @@ -353,6 +354,20 @@ public long getRetentionSizeInMB() { return retentionSizeInMB; } + /** + * Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets + * corrupted at bookkeeper and managed-cursor is stuck at that ledger. + * + * @param autoSkipNonRecoverableData + */ + public boolean isAutoSkipNonRecoverableData() { + return autoSkipNonRecoverableData; + } + + public void setAutoSkipNonRecoverableData(boolean skipNonRecoverableData) { + this.autoSkipNonRecoverableData = skipNonRecoverableData; + } + /** * @return max unacked message ranges that will be persisted and recovered. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index 1817aaf59ba79..f5c4243e58c9d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -31,6 +31,13 @@ public ManagedLedgerException(Throwable e) { super(e); } + public static ManagedLedgerException getManagedLedgerException(Throwable e) { + if (e instanceof ManagedLedgerException) { + return (ManagedLedgerException) e; + } + return new ManagedLedgerException(e); + } + public static class MetaStoreException extends ManagedLedgerException { public MetaStoreException(Exception e) { super(e); @@ -89,6 +96,12 @@ public TooManyRequestsException(String msg) { } } + public static class NonRecoverableLedgerException extends ManagedLedgerException { + public NonRecoverableLedgerException(String msg) { + super(msg); + } + } + public static class InvalidReplayPositionException extends ManagedLedgerException { public InvalidReplayPositionException(String msg) { super(msg); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java index b89dfb5691ac3..469a0e9492d08 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheImpl.java @@ -25,14 +25,13 @@ import java.util.Collection; import java.util.List; -import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.LedgerEntry; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback; import org.apache.bookkeeper.mledger.ManagedLedgerException; -import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; import org.apache.bookkeeper.mledger.util.Pair; import org.apache.bookkeeper.mledger.util.RangeCache; import org.apache.bookkeeper.mledger.util.RangeCache.Weighter; @@ -184,7 +183,7 @@ public void asyncReadEntry(LedgerHandle lh, PositionImpl position, final ReadEnt lh.asyncReadEntries(position.getEntryId(), position.getEntryId(), (rc, ledgerHandle, sequence, obj) -> { if (rc != BKException.Code.OK) { ml.invalidateLedgerHandle(ledgerHandle, rc); - callback.readEntryFailed(new ManagedLedgerException(BKException.create(rc)), obj); + callback.readEntryFailed(createManagedLedgerException(rc), obj); return; } @@ -253,11 +252,11 @@ public void asyncReadEntry(LedgerHandle lh, long firstEntry, long lastEntry, boo if (rc != BKException.Code.OK) { if (rc == BKException.Code.TooManyRequestsException) { - callback.readEntriesFailed(new TooManyRequestsException("Too many request error from bookies"), - ctx); + callback.readEntriesFailed(createManagedLedgerException(rc), ctx); } else { ml.invalidateLedgerHandle(lh1, rc); - callback.readEntriesFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx); + ManagedLedgerException mlException = createManagedLedgerException(rc); + callback.readEntriesFailed(mlException, ctx); } return; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java index a1e42193d46ac..7c536b76f9e27 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java @@ -18,6 +18,7 @@ */ package org.apache.bookkeeper.mledger.impl; +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import java.util.Enumeration; @@ -197,7 +198,7 @@ public void asyncReadEntry(LedgerHandle lh, long firstEntry, long lastEntry, boo lh.asyncReadEntries(firstEntry, lastEntry, new ReadCallback() { public void readComplete(int rc, LedgerHandle lh, Enumeration seq, Object bkctx) { if (rc != BKException.Code.OK) { - callback.readEntriesFailed(new ManagedLedgerException(BKException.create(rc)), ctx); + callback.readEntriesFailed(createManagedLedgerException(rc), ctx); return; } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 07f6f7dff0473..22cdf3d0f4f0e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -82,6 +82,8 @@ import com.google.protobuf.InvalidProtocolBufferException; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC; +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; +import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; public class ManagedCursorImpl implements ManagedCursor { @@ -281,7 +283,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac log.warn("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name, BKException.getMessage(rc1)); - callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc1))); + callback.operationFailed(createManagedLedgerException(rc1)); return; } @@ -330,8 +332,12 @@ private void recoveredCursor(PositionImpl position, Map properties // if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty), // we need to move to the next existing ledger if (!ledger.ledgerExists(position.getLedgerId())) { - long nextExistingLedger = ledger.getNextValidLedger(position.getLedgerId()); - position = PositionImpl.get(nextExistingLedger, -1); + Long nextExistingLedger = ledger.getNextValidLedger(position.getLedgerId()); + if (nextExistingLedger == null) { + log.info("[{}-{}] Couldn't find next next valid ledger for recovery {}", ledger.getName(), name, + position); + } + position = nextExistingLedger != null ? PositionImpl.get(nextExistingLedger, -1) : position; } log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position); @@ -1311,7 +1317,7 @@ public void asyncMarkDelete(final Position position, Map propertie try { newPosition = setAcknowledgedPosition(newPosition); } catch (IllegalArgumentException e) { - callback.markDeleteFailed(new ManagedLedgerException(e), ctx); + callback.markDeleteFailed(getManagedLedgerException(e), ctx); return; } finally { lock.writeLock().unlock(); @@ -1555,7 +1561,7 @@ public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callba } catch (Exception e) { log.warn("[{}] [{}] Error while updating individualDeletedMessages [{}]", ledger.getName(), name, e.getMessage(), e); - callback.deleteFailed(new ManagedLedgerException(e), ctx); + callback.deleteFailed(getManagedLedgerException(e), ctx); return; } finally { lock.writeLock().unlock(); @@ -2040,7 +2046,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin // If we've had a write error, the ledger will be automatically closed, we need to create a new one, // in the meantime the mark-delete will be queued. STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger); - callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc))); + callback.operationFailed(createManagedLedgerException(rc)); } }, null); } @@ -2127,7 +2133,7 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) { if (rc == BKException.Code.OK) { callback.closeComplete(ctx); } else { - callback.closeFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx); + callback.closeFailed(createManagedLedgerException(rc), ctx); } } }, ctx); @@ -2301,6 +2307,11 @@ public PositionImpl getNextAvailablePosition(PositionImpl position) { return position.getNext(); } + public Position getNextLedgerPosition(long currentLedgerId) { + Long nextExistingLedger = ledger.getNextValidLedger(currentLedgerId); + return nextExistingLedger!=null ? PositionImpl.get(nextExistingLedger, 0) : null; + } + public boolean isIndividuallyDeletedEntriesEmpty() { lock.readLock().lock(); try { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 07c24114fcd76..d461f9af38553 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -71,6 +71,7 @@ import com.google.common.collect.Maps; import io.netty.util.concurrent.DefaultThreadFactory; +import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private final MetaStore store; @@ -432,7 +433,7 @@ public void operationFailed(MetaStoreException e) { // Completed all the cursors info callback.getInfoComplete(info, ctx); }).exceptionally((ex) -> { - callback.getInfoFailed(new ManagedLedgerException(ex), ctx); + callback.getInfoFailed(getManagedLedgerException(ex.getCause()), ctx); return null; }); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 84a4d8e2d55df..767d3dc7e727e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.lang.Math.min; import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC; +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException; import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; import java.util.Iterator; @@ -61,6 +62,8 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.ManagedLedgerMXBean; import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback; @@ -89,6 +92,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final static long MegaByte = 1024 * 1024; @@ -263,7 +267,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { initializeBookKeeper(callback); } else { log.error("[{}] Failed to open ledger {}: {}", name, id, BKException.getMessage(rc)); - callback.initializeFailed(new ManagedLedgerException(BKException.getMessage(rc))); + callback.initializeFailed(createManagedLedgerException(rc)); return; } })); @@ -337,7 +341,7 @@ public void operationFailed(MetaStoreException e) { executor.submitOrdered(name, safeRun(() -> { mbean.endDataLedgerCreateOp(); if (rc != BKException.Code.OK) { - callback.initializeFailed(new ManagedLedgerException(BKException.getMessage(rc))); + callback.initializeFailed(createManagedLedgerException(rc)); return; } @@ -908,7 +912,7 @@ public synchronized void asyncTerminate(TerminateCallback callback, Object ctx) } mbean.endDataLedgerCloseOp(); if (rc != BKException.Code.OK) { - callback.terminateFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx); + callback.terminateFailed(createManagedLedgerException(rc), ctx); } else { lastConfirmedEntry = new PositionImpl(lh.getId(), lh.getLastAddConfirmed()); // Store the new state in metadata @@ -1042,7 +1046,7 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c } mbean.endDataLedgerCloseOp(); if (rc != BKException.Code.OK) { - callback.closeFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx); + callback.closeFailed(createManagedLedgerException(rc), ctx); return; } @@ -1062,7 +1066,7 @@ private void closeAllCursors(CloseCallback callback, final Object ctx) { Futures.waitForAll(futures).thenRun(() -> { callback.closeComplete(ctx); }).exceptionally(exception -> { - callback.closeFailed(new ManagedLedgerException(exception), ctx); + callback.closeFailed(getManagedLedgerException(exception.getCause()), ctx); return null; }); } @@ -1078,7 +1082,7 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct mbean.endDataLedgerCreateOp(); if (rc != BKException.Code.OK) { log.error("[{}] Error creating ledger rc={} {}", name, rc, BKException.getMessage(rc)); - ManagedLedgerException status = new ManagedLedgerException(BKException.getMessage(rc)); + ManagedLedgerException status = createManagedLedgerException(rc); // Empty the list of pending requests and make all of them fail clearPendingAddEntries(status); @@ -1278,7 +1282,7 @@ void asyncReadEntries(OpReadEntry opReadEntry) { }).exceptionally(ex -> { log.error("[{}] Error opening ledger for reading at position {} - {}", name, opReadEntry.readPosition, ex.getMessage()); - opReadEntry.readEntriesFailed(new ManagedLedgerException(ex), opReadEntry.ctx); + opReadEntry.readEntriesFailed(getManagedLedgerException(ex.getCause()), opReadEntry.ctx); return null; }); } @@ -1306,7 +1310,7 @@ CompletableFuture getLedgerHandle(long ledgerId) { if (rc != BKException.Code.OK) { // Remove the ledger future from cache to give chance to reopen it later ledgerCache.remove(ledgerId, future); - future.completeExceptionally(new ManagedLedgerException(BKException.getMessage(rc))); + future.completeExceptionally(createManagedLedgerException(rc)); } else { if (log.isDebugEnabled()) { log.debug("[{}] Successfully opened ledger {} for reading", name, lh.getId()); @@ -1347,7 +1351,7 @@ void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ct entryCache.asyncReadEntry(ledger, position, callback, ctx); }).exceptionally(ex -> { log.error("[{}] Error opening ledger for reading at position {} - {}", name, position, ex.getMessage()); - callback.readEntryFailed(new ManagedLedgerException(ex), ctx); + callback.readEntryFailed(getManagedLedgerException(ex.getCause()), ctx); return null; }); } @@ -1744,7 +1748,7 @@ private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) { int toDelete = ledgersToDelete.get(); if (toDelete != -1 && ledgersToDelete.compareAndSet(toDelete, -1)) { // Trigger callback only once - callback.deleteLedgerFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx); + callback.deleteLedgerFailed(createManagedLedgerException(rc), ctx); } } }, null); @@ -1956,7 +1960,7 @@ boolean ledgerExists(long ledgerId) { return ledgers.get(ledgerId) != null; } - long getNextValidLedger(long ledgerId) { + Long getNextValidLedger(long ledgerId) { return ledgers.ceilingKey(ledgerId + 1); } @@ -2172,6 +2176,30 @@ public long getCacheSize() { return entryCache.getSize(); } + /** + * return BK error codes that are considered not likely to be recoverable + */ + private static boolean isBkErrorNotRecoverable(int rc) { + switch (rc) { + case BKException.Code.NoSuchLedgerExistsException: + case BKException.Code.NoSuchEntryException: + return true; + + default: + return false; + } + } + + public static ManagedLedgerException createManagedLedgerException(int bkErrorCode) { + if (bkErrorCode == BKException.Code.TooManyRequestsException) { + return new TooManyRequestsException("Too many request error from bookies"); + } else if (isBkErrorNotRecoverable(bkErrorCode)) { + return new NonRecoverableLedgerException(BKException.getMessage(bkErrorCode)); + } else { + return new ManagedLedgerException(BKException.getMessage(bkErrorCode)); + } + } + private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java index e7083b9cd3fb4..aa6cda20508ab 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java @@ -25,6 +25,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback; import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.Position; import org.slf4j.Logger; @@ -80,7 +81,7 @@ public void readEntriesComplete(List returnedEntries, Object ctx) { } @Override - public void readEntriesFailed(ManagedLedgerException status, Object ctx) { + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { cursor.readOperationCompleted(); if (!entries.isEmpty()) { @@ -89,10 +90,24 @@ public void readEntriesFailed(ManagedLedgerException status, Object ctx) { callback.readEntriesComplete(entries, ctx); recycle(); })); + } else if (cursor.config.isAutoSkipNonRecoverableData() && exception instanceof NonRecoverableLedgerException) { + log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(), + readPosition, exception.getMessage()); + // try to find and move to next valid ledger + final Position nexReadPosition = cursor.getNextLedgerPosition(readPosition.getLedgerId()); + // fail callback if it couldn't find next valid ledger + if (nexReadPosition == null) { + callback.readEntriesFailed(exception, ctx); + cursor.ledger.mbean.recordReadEntriesError(); + recycle(); + return; + } + updateReadPosition(nexReadPosition); + checkReadCompletion(); } else { - if (!(status instanceof TooManyRequestsException)) { - log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), cursor.getName(), - readPosition, status.getMessage()); + if (!(exception instanceof TooManyRequestsException)) { + log.warn("[{}][{}] read failed from ledger at position:{} : {}", cursor.ledger.getName(), + cursor.getName(), readPosition, exception.getMessage()); } else { if (log.isDebugEnabled()) { log.debug("[{}][{}] read throttled failed from ledger at position:{}", cursor.ledger.getName(), @@ -100,7 +115,7 @@ public void readEntriesFailed(ManagedLedgerException status, Object ctx) { } } - callback.readEntriesFailed(status, ctx); + callback.readEntriesFailed(exception, ctx); cursor.ledger.mbean.recordReadEntriesError(); recycle(); } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 069933cf1df91..2585f33e6aa8c 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -285,6 +285,10 @@ public class ServiceConfiguration implements PulsarConfiguration { // than this limit then broker will persist unacked ranges into bookkeeper to avoid additional data overhead into // zookeeper. private int managedLedgerMaxUnackedRangesToPersistInZooKeeper = 1000; + // Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets + // corrupted at bookkeeper and managed-cursor is stuck at that ledger. + @FieldContext(dynamic = true) + private boolean autoSkipNonRecoverableData = false; /*** --- Load balancer --- ****/ // Enable load balancer @@ -1033,6 +1037,14 @@ public void setManagedLedgerMaxUnackedRangesToPersistInZooKeeper( this.managedLedgerMaxUnackedRangesToPersistInZooKeeper = managedLedgerMaxUnackedRangesToPersistInZookeeper; } + public boolean isAutoSkipNonRecoverableData() { + return autoSkipNonRecoverableData; + } + + public void setAutoSkipNonRecoverableData(boolean skipNonRecoverableLedger) { + this.autoSkipNonRecoverableData = skipNonRecoverableLedger; + } + public boolean isLoadBalancerEnabled() { return loadBalancerEnabled; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index bf7bdbd3a6e94..74ca915e3b3ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1063,6 +1063,10 @@ private void updateConfigurationAndRegisterListeners() { registerConfigurationListener("dispatchThrottlingRatePerTopicInByte", (dispatchRatePerTopicInByte) -> { updateTopicMessageDispatchRate(); }); + // add listener to update managed-ledger config to skipNonRecoverableLedgers + registerConfigurationListener("autoSkipNonRecoverableData", (skipNonRecoverableLedger) -> { + updateManagedLedgerConfig(); + }); // add more listeners here } @@ -1087,6 +1091,28 @@ private void updateTopicMessageDispatchRate() { }); } + private void updateManagedLedgerConfig() { + this.pulsar().getExecutor().submit(() -> { + // update managed-ledger config of each topic + topics.forEach((name, topicFuture) -> { + if (topicFuture.isDone()) { + String topicName = null; + try { + if (topicFuture.getNow(null) instanceof PersistentTopic) { + PersistentTopic topic = (PersistentTopic) topicFuture.get(); + topicName = topicFuture.get().getName(); + // update skipNonRecoverableLedger configuration + topic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData( + pulsar.getConfiguration().isAutoSkipNonRecoverableData()); + } + } catch (Exception e) { + log.warn("[{}] failed to update managed-ledger config", topicName, e); + } + } + }); + }); + } + /** * Allows a listener to listen on update of {@link ServiceConfiguration} change, so listener can take appropriate * action if any specific config-field value has been changed. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 347ffaf5640c9..8e8f31da5d03b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -18,15 +18,23 @@ */ package org.apache.pulsar.broker.service; +import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; + import java.lang.reflect.Field; import java.net.URL; +import java.util.Map.Entry; +import java.util.NavigableMap; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import org.apache.bookkeeper.client.BookKeeper; +import org.apache.bookkeeper.mledger.ManagedLedgerConfig; +import org.apache.bookkeeper.mledger.impl.EntryCache; import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.util.StringUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; @@ -35,9 +43,11 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.ClientConfiguration; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerConfiguration; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.impl.MessageImpl; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PropertyAdmin; import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble; @@ -51,7 +61,6 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; -import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically; /** */ @@ -85,6 +94,7 @@ void setup() throws Exception { config.setAuthenticationEnabled(false); config.setManagedLedgerMaxEntriesPerLedger(5); config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); + config.setAdvertisedAddress("127.0.0.1"); pulsar = new PulsarService(config); pulsar.start(); @@ -206,5 +216,118 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception { } + /** + * It verifies broker-configuration using which broker can skip non-recoverable data-ledgers. + * + *
+     * 1. publish messages in 5 data-ledgers each with 20 entries under managed-ledger
+     * 2. delete first 4 data-ledgers
+     * 3. consumer will fail to consume any message as first data-ledger is non-recoverable
+     * 4. enable dynamic config to skip non-recoverable data-ledgers
+     * 5. consumer will be able to consume 20 messages from last non-deleted ledger
+     * 
+     * 
+ * + * @throws Exception + */ + @Test(timeOut = 6000) + public void testSkipCorruptDataLedger() throws Exception { + ClientConfiguration clientConf = new ClientConfiguration(); + clientConf.setStatsInterval(0, TimeUnit.SECONDS); + PulsarClient client = PulsarClient.create(adminUrl.toString(), clientConf); + + final String ns1 = "prop/usc/crash-broker"; + final int totalMessages = 100; + final int totalDataLedgers = 5; + final int entriesPerLedger = totalMessages / totalDataLedgers; + + admin.namespaces().createNamespace(ns1); + + final String dn1 = "persistent://" + ns1 + "/my-topic"; + + // Create subscription + ConsumerConfiguration consumerConfig = new ConsumerConfiguration(); + consumerConfig.setReceiverQueueSize(5); + Consumer consumer = client.subscribe(dn1, "my-subscriber-name", consumerConfig); + + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(dn1).get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger(); + ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next(); + Field configField = ManagedCursorImpl.class.getDeclaredField("config"); + configField.setAccessible(true); + // Create multiple data-ledger + ManagedLedgerConfig config = (ManagedLedgerConfig) configField.get(cursor); + config.setMaxEntriesPerLedger(entriesPerLedger); + config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS); + // bookkeeper client + Field bookKeeperField = ManagedLedgerImpl.class.getDeclaredField("bookKeeper"); + bookKeeperField.setAccessible(true); + // Create multiple data-ledger + BookKeeper bookKeeper = (BookKeeper) bookKeeperField.get(ml); + + // (1) publish messages in 5 data-ledgers each with 20 entries under managed-ledger + Producer producer = client.createProducer(dn1); + for (int i = 0; i < totalMessages; i++) { + String message = "my-message-" + i; + producer.send(message.getBytes()); + } + + // validate: consumer is able to consume msg and close consumer after reading 1 entry + Assert.assertNotNull(consumer.receive(1, TimeUnit.SECONDS)); + consumer.close(); + + NavigableMap ledgerInfo = ml.getLedgersInfo(); + Assert.assertEquals(ledgerInfo.size(), totalDataLedgers); + Entry lastLedger = ledgerInfo.lastEntry(); + + // (2) delete first 4 data-ledgers + ledgerInfo.entrySet().forEach(entry -> { + if (!entry.equals(lastLedger)) { + try { + bookKeeper.deleteLedger(entry.getKey()); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + // clean managed-ledger and recreate topic to clean any data from the cache + producer.close(); + pulsar.getBrokerService().removeTopicFromCache(dn1); + ManagedLedgerFactoryImpl factory = (ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory(); + Field field = ManagedLedgerFactoryImpl.class.getDeclaredField("ledgers"); + field.setAccessible(true); + @SuppressWarnings("unchecked") + ConcurrentHashMap> ledgers = (ConcurrentHashMap>) field + .get(factory); + ledgers.clear(); + + // (3) consumer will fail to consume any message as first data-ledger is non-recoverable + Message msg = null; + // start consuming message + consumer = client.subscribe(dn1, "my-subscriber-name"); + msg = consumer.receive(1, TimeUnit.SECONDS); + Assert.assertNull(msg); + consumer.close(); + + // (4) enable dynamic config to skip non-recoverable data-ledgers + admin.brokers().updateDynamicConfiguration("autoSkipNonRecoverableData", "true"); + + retryStrategically((test) -> config.isAutoSkipNonRecoverableData(), 5, 100); + + // (5) consumer will be able to consume 20 messages from last non-deleted ledger + consumer = client.subscribe(dn1, "my-subscriber-name"); + for (int i = 0; i < entriesPerLedger; i++) { + msg = consumer.receive(5, TimeUnit.SECONDS); + System.out.println(i); + consumer.acknowledge(msg); + } + + producer.close(); + consumer.close(); + client.close(); + + } + private static final Logger LOG = LoggerFactory.getLogger(BrokerBkEnsemblesTests.class); } diff --git a/site/_data/config/broker.yaml b/site/_data/config/broker.yaml index d7a74ff7d4ea0..77df630ca669a 100644 --- a/site/_data/config/broker.yaml +++ b/site/_data/config/broker.yaml @@ -206,6 +206,9 @@ configs: default: '1000' description: | Max number of "acknowledgment holes" that are going to be persistently stored. When acknowledging out of order, a consumer will leave holes that are supposed to be quickly filled by acking all the messages. The information of which messages are acknowledged is persisted by compressing in "ranges" of messages that were acknowledged. After the max number of ranges is reached, the information will only be tracked in memory and messages will be redelivered in case of crashes. +- name: autoSkipNonRecoverableData + default: 'false' + description: Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list.It helps when data-ledgers gets corrupted at bookkeeper and managed-cursor is stuck at that ledger. - name: loadBalancerEnabled default: 'true' description: Enable load balancer diff --git a/site/_data/config/standalone.yaml b/site/_data/config/standalone.yaml index c63811bdb0dc1..22bcceb18502b 100644 --- a/site/_data/config/standalone.yaml +++ b/site/_data/config/standalone.yaml @@ -155,6 +155,8 @@ configs: default: '50000' - name: managedLedgerCursorRolloverTimeInSeconds default: '14400' +- name: autoSkipNonRecoverableData + default: 'false' - name: loadBalancerEnabled default: 'false' - name: loadBalancerPlacementStrategy