Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce config to skip non-recoverable data-ledger #1046

Merged
merged 1 commit into from
Jan 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,9 @@ managedLedgerMaxUnackedRangesToPersist=10000
# zookeeper.
managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000

# Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
# corrupted at bookkeeper and managed-cursor is stuck at that ledger.
autoSkipNonRecoverableData=false

### --- Load balancer --- ###

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,9 @@ managedLedgerMaxUnackedRangesToPersist=10000
# zookeeper.
managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000

# Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
# corrupted at bookkeeper and managed-cursor is stuck at that ledger.
autoSkipNonRecoverableData=false

### --- Load balancer --- ###

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public class ManagedLedgerConfig {
private double throttleMarkDelete = 0;
private long retentionTimeMs = 0;
private long retentionSizeInMB = 0;
private boolean autoSkipNonRecoverableData;

private DigestType digestType = DigestType.MAC;
private byte[] password = "".getBytes(Charsets.UTF_8);
Expand Down Expand Up @@ -353,6 +354,20 @@ public long getRetentionSizeInMB() {
return retentionSizeInMB;
}

/**
* Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list. It helps when data-ledgers gets
* corrupted at bookkeeper and managed-cursor is stuck at that ledger.
*
* @param autoSkipNonRecoverableData
*/
public boolean isAutoSkipNonRecoverableData() {
return autoSkipNonRecoverableData;
}

public void setAutoSkipNonRecoverableData(boolean skipNonRecoverableData) {
this.autoSkipNonRecoverableData = skipNonRecoverableData;
}

/**
* @return max unacked message ranges that will be persisted and recovered.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ public ManagedLedgerException(Throwable e) {
super(e);
}

public static ManagedLedgerException getManagedLedgerException(Throwable e) {
if (e instanceof ManagedLedgerException) {
return (ManagedLedgerException) e;
}
return new ManagedLedgerException(e);
}

public static class MetaStoreException extends ManagedLedgerException {
public MetaStoreException(Exception e) {
super(e);
Expand Down Expand Up @@ -89,6 +96,12 @@ public TooManyRequestsException(String msg) {
}
}

public static class NonRecoverableLedgerException extends ManagedLedgerException {
public NonRecoverableLedgerException(String msg) {
super(msg);
}
}

public static class InvalidReplayPositionException extends ManagedLedgerException {
public InvalidReplayPositionException(String msg) {
super(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,13 @@
import java.util.Collection;
import java.util.List;

import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import org.apache.bookkeeper.mledger.util.Pair;
import org.apache.bookkeeper.mledger.util.RangeCache;
import org.apache.bookkeeper.mledger.util.RangeCache.Weighter;
Expand Down Expand Up @@ -184,7 +183,7 @@ public void asyncReadEntry(LedgerHandle lh, PositionImpl position, final ReadEnt
lh.asyncReadEntries(position.getEntryId(), position.getEntryId(), (rc, ledgerHandle, sequence, obj) -> {
if (rc != BKException.Code.OK) {
ml.invalidateLedgerHandle(ledgerHandle, rc);
callback.readEntryFailed(new ManagedLedgerException(BKException.create(rc)), obj);
callback.readEntryFailed(createManagedLedgerException(rc), obj);
return;
}

Expand Down Expand Up @@ -253,11 +252,11 @@ public void asyncReadEntry(LedgerHandle lh, long firstEntry, long lastEntry, boo

if (rc != BKException.Code.OK) {
if (rc == BKException.Code.TooManyRequestsException) {
callback.readEntriesFailed(new TooManyRequestsException("Too many request error from bookies"),
ctx);
callback.readEntriesFailed(createManagedLedgerException(rc), ctx);
} else {
ml.invalidateLedgerHandle(lh1, rc);
callback.readEntriesFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx);
ManagedLedgerException mlException = createManagedLedgerException(rc);
callback.readEntriesFailed(mlException, ctx);
}
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;

import java.util.Enumeration;
Expand Down Expand Up @@ -197,7 +198,7 @@ public void asyncReadEntry(LedgerHandle lh, long firstEntry, long lastEntry, boo
lh.asyncReadEntries(firstEntry, lastEntry, new ReadCallback() {
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object bkctx) {
if (rc != BKException.Code.OK) {
callback.readEntriesFailed(new ManagedLedgerException(BKException.create(rc)), ctx);
callback.readEntriesFailed(createManagedLedgerException(rc), ctx);
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@
import com.google.protobuf.InvalidProtocolBufferException;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;

public class ManagedCursorImpl implements ManagedCursor {

Expand Down Expand Up @@ -281,7 +283,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
log.warn("[{}] Error reading from metadata ledger {} for consumer {}: {}", ledger.getName(),
ledgerId, name, BKException.getMessage(rc1));

callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc1)));
callback.operationFailed(createManagedLedgerException(rc1));
return;
}

Expand Down Expand Up @@ -330,8 +332,12 @@ private void recoveredCursor(PositionImpl position, Map<String, Long> properties
// if the position was at a ledger that didn't exist (since it will be deleted if it was previously empty),
// we need to move to the next existing ledger
if (!ledger.ledgerExists(position.getLedgerId())) {
long nextExistingLedger = ledger.getNextValidLedger(position.getLedgerId());
position = PositionImpl.get(nextExistingLedger, -1);
Long nextExistingLedger = ledger.getNextValidLedger(position.getLedgerId());
if (nextExistingLedger == null) {
log.info("[{}-{}] Couldn't find next next valid ledger for recovery {}", ledger.getName(), name,
position);
}
position = nextExistingLedger != null ? PositionImpl.get(nextExistingLedger, -1) : position;
}
log.info("[{}] Cursor {} recovered to position {}", ledger.getName(), name, position);

Expand Down Expand Up @@ -1311,7 +1317,7 @@ public void asyncMarkDelete(final Position position, Map<String, Long> propertie
try {
newPosition = setAcknowledgedPosition(newPosition);
} catch (IllegalArgumentException e) {
callback.markDeleteFailed(new ManagedLedgerException(e), ctx);
callback.markDeleteFailed(getManagedLedgerException(e), ctx);
return;
} finally {
lock.writeLock().unlock();
Expand Down Expand Up @@ -1555,7 +1561,7 @@ public void asyncDelete(Position pos, final AsyncCallbacks.DeleteCallback callba
} catch (Exception e) {
log.warn("[{}] [{}] Error while updating individualDeletedMessages [{}]", ledger.getName(), name,
e.getMessage(), e);
callback.deleteFailed(new ManagedLedgerException(e), ctx);
callback.deleteFailed(getManagedLedgerException(e), ctx);
return;
} finally {
lock.writeLock().unlock();
Expand Down Expand Up @@ -2040,7 +2046,7 @@ void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, fin
// If we've had a write error, the ledger will be automatically closed, we need to create a new one,
// in the meantime the mark-delete will be queued.
STATE_UPDATER.compareAndSet(ManagedCursorImpl.this, State.Open, State.NoLedger);
callback.operationFailed(new ManagedLedgerException(BKException.getMessage(rc)));
callback.operationFailed(createManagedLedgerException(rc));
}
}, null);
}
Expand Down Expand Up @@ -2127,7 +2133,7 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
if (rc == BKException.Code.OK) {
callback.closeComplete(ctx);
} else {
callback.closeFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx);
callback.closeFailed(createManagedLedgerException(rc), ctx);
}
}
}, ctx);
Expand Down Expand Up @@ -2301,6 +2307,11 @@ public PositionImpl getNextAvailablePosition(PositionImpl position) {
return position.getNext();
}

public Position getNextLedgerPosition(long currentLedgerId) {
Long nextExistingLedger = ledger.getNextValidLedger(currentLedgerId);
return nextExistingLedger!=null ? PositionImpl.get(nextExistingLedger, 0) : null;
}

public boolean isIndividuallyDeletedEntriesEmpty() {
lock.readLock().lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import com.google.common.collect.Maps;

import io.netty.util.concurrent.DefaultThreadFactory;
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;

public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final MetaStore store;
Expand Down Expand Up @@ -432,7 +433,7 @@ public void operationFailed(MetaStoreException e) {
// Completed all the cursors info
callback.getInfoComplete(info, ctx);
}).exceptionally((ex) -> {
callback.getInfoFailed(new ManagedLedgerException(ex), ctx);
callback.getInfoFailed(getManagedLedgerException(ex.getCause()), ctx);
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;

import java.util.Iterator;
Expand Down Expand Up @@ -61,6 +62,8 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
Expand Down Expand Up @@ -89,6 +92,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;

public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final static long MegaByte = 1024 * 1024;
Expand Down Expand Up @@ -263,7 +267,7 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
initializeBookKeeper(callback);
} else {
log.error("[{}] Failed to open ledger {}: {}", name, id, BKException.getMessage(rc));
callback.initializeFailed(new ManagedLedgerException(BKException.getMessage(rc)));
callback.initializeFailed(createManagedLedgerException(rc));
return;
}
}));
Expand Down Expand Up @@ -337,7 +341,7 @@ public void operationFailed(MetaStoreException e) {
executor.submitOrdered(name, safeRun(() -> {
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
callback.initializeFailed(new ManagedLedgerException(BKException.getMessage(rc)));
callback.initializeFailed(createManagedLedgerException(rc));
return;
}

Expand Down Expand Up @@ -908,7 +912,7 @@ public synchronized void asyncTerminate(TerminateCallback callback, Object ctx)
}
mbean.endDataLedgerCloseOp();
if (rc != BKException.Code.OK) {
callback.terminateFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx);
callback.terminateFailed(createManagedLedgerException(rc), ctx);
} else {
lastConfirmedEntry = new PositionImpl(lh.getId(), lh.getLastAddConfirmed());
// Store the new state in metadata
Expand Down Expand Up @@ -1042,7 +1046,7 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c
}
mbean.endDataLedgerCloseOp();
if (rc != BKException.Code.OK) {
callback.closeFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx);
callback.closeFailed(createManagedLedgerException(rc), ctx);
return;
}

Expand All @@ -1062,7 +1066,7 @@ private void closeAllCursors(CloseCallback callback, final Object ctx) {
Futures.waitForAll(futures).thenRun(() -> {
callback.closeComplete(ctx);
}).exceptionally(exception -> {
callback.closeFailed(new ManagedLedgerException(exception), ctx);
callback.closeFailed(getManagedLedgerException(exception.getCause()), ctx);
return null;
});
}
Expand All @@ -1078,7 +1082,7 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct
mbean.endDataLedgerCreateOp();
if (rc != BKException.Code.OK) {
log.error("[{}] Error creating ledger rc={} {}", name, rc, BKException.getMessage(rc));
ManagedLedgerException status = new ManagedLedgerException(BKException.getMessage(rc));
ManagedLedgerException status = createManagedLedgerException(rc);

// Empty the list of pending requests and make all of them fail
clearPendingAddEntries(status);
Expand Down Expand Up @@ -1278,7 +1282,7 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
}).exceptionally(ex -> {
log.error("[{}] Error opening ledger for reading at position {} - {}", name, opReadEntry.readPosition,
ex.getMessage());
opReadEntry.readEntriesFailed(new ManagedLedgerException(ex), opReadEntry.ctx);
opReadEntry.readEntriesFailed(getManagedLedgerException(ex.getCause()), opReadEntry.ctx);
return null;
});
}
Expand Down Expand Up @@ -1306,7 +1310,7 @@ CompletableFuture<LedgerHandle> getLedgerHandle(long ledgerId) {
if (rc != BKException.Code.OK) {
// Remove the ledger future from cache to give chance to reopen it later
ledgerCache.remove(ledgerId, future);
future.completeExceptionally(new ManagedLedgerException(BKException.getMessage(rc)));
future.completeExceptionally(createManagedLedgerException(rc));
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Successfully opened ledger {} for reading", name, lh.getId());
Expand Down Expand Up @@ -1347,7 +1351,7 @@ void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ct
entryCache.asyncReadEntry(ledger, position, callback, ctx);
}).exceptionally(ex -> {
log.error("[{}] Error opening ledger for reading at position {} - {}", name, position, ex.getMessage());
callback.readEntryFailed(new ManagedLedgerException(ex), ctx);
callback.readEntryFailed(getManagedLedgerException(ex.getCause()), ctx);
return null;
});
}
Expand Down Expand Up @@ -1744,7 +1748,7 @@ private void deleteAllLedgers(DeleteLedgerCallback callback, Object ctx) {
int toDelete = ledgersToDelete.get();
if (toDelete != -1 && ledgersToDelete.compareAndSet(toDelete, -1)) {
// Trigger callback only once
callback.deleteLedgerFailed(new ManagedLedgerException(BKException.getMessage(rc)), ctx);
callback.deleteLedgerFailed(createManagedLedgerException(rc), ctx);
}
}
}, null);
Expand Down Expand Up @@ -1956,7 +1960,7 @@ boolean ledgerExists(long ledgerId) {
return ledgers.get(ledgerId) != null;
}

long getNextValidLedger(long ledgerId) {
Long getNextValidLedger(long ledgerId) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why changing type? Do we need null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, because long was causing NPE if we try to get next ledger after very last ledger in the list. at that time ledgers.ceilingKey(ledgerId + 1) returns null and it gives NPE if we try to cast it in long so, changed the data type to Long and handling at caller.

return ledgers.ceilingKey(ledgerId + 1);
}

Expand Down Expand Up @@ -2172,6 +2176,30 @@ public long getCacheSize() {
return entryCache.getSize();
}

/**
* return BK error codes that are considered not likely to be recoverable
*/
private static boolean isBkErrorNotRecoverable(int rc) {
switch (rc) {
case BKException.Code.NoSuchLedgerExistsException:
case BKException.Code.NoSuchEntryException:
return true;

default:
return false;
}
}

public static ManagedLedgerException createManagedLedgerException(int bkErrorCode) {
if (bkErrorCode == BKException.Code.TooManyRequestsException) {
return new TooManyRequestsException("Too many request error from bookies");
} else if (isBkErrorNotRecoverable(bkErrorCode)) {
return new NonRecoverableLedgerException(BKException.getMessage(bkErrorCode));
} else {
return new ManagedLedgerException(BKException.getMessage(bkErrorCode));
}
}

private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);

}
Loading