Skip to content

Commit

Permalink
[pulsar-broker] Fix expiry monitor to continue on non-recoverable err…
Browse files Browse the repository at this point in the history
…or (#4818)

### Motivation

In #1046, we have added a flag (`autoSkipNonRecoverableData`) and mechanism to recover cursor if ledger data is deleted. However, expiery-monitor doesn't use that flag and it gets stuck when it finds non-recoverable ml-error while cleaning up expired message.

### Modification
Expiry-monitor can skip non-recoverable managed-ledger exception (eg: data/ledger doesn't exist anymore) when `autoSkipNonRecoverableData` flag is enabled.
  • Loading branch information
rdhabalia authored and sijie committed Aug 14, 2019
1 parent 4a9f2f7 commit c5ba529
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.common.annotations.Beta;
import java.util.List;
import java.util.Optional;

/**
* Definition of all the callbacks used for the ManagedLedger asynchronous API.
Expand Down Expand Up @@ -116,7 +117,7 @@ interface TerminateCallback {
interface FindEntryCallback {
void findEntryComplete(Position position, Object ctx);

void findEntryFailed(ManagedLedgerException exception, Object ctx);
void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx);
}

interface ResetCursorCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,4 +587,11 @@ Set<? extends Position> asyncReplayEntries(
*/
void setThrottleMarkDelete(double throttleMarkDelete);

/**
* Get {@link ManagedLedger} attached with cursor
*
* @return ManagedLedger
*/
ManagedLedger getManagedLedger();

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -71,6 +72,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException;
Expand Down Expand Up @@ -766,7 +768,8 @@ public void findEntryComplete(Position position, Object ctx) {
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition,
Object ctx) {
result.exception = exception;
counter.countDown();
}
Expand Down Expand Up @@ -796,11 +799,12 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate
max = getNumberOfEntriesInStorage();
break;
default:
callback.findEntryFailed(new ManagedLedgerException("Unknown position constraint"), ctx);
callback.findEntryFailed(new ManagedLedgerException("Unknown position constraint"), Optional.empty(), ctx);
return;
}
if (startPosition == null) {
callback.findEntryFailed(new ManagedLedgerException("Couldn't find start position"), ctx);
callback.findEntryFailed(new ManagedLedgerException("Couldn't find start position"),
Optional.empty(), ctx);
return;
}
op = new OpFindNewest(this, startPosition, condition, max, callback, ctx);
Expand Down Expand Up @@ -2581,5 +2585,10 @@ public void setThrottleMarkDelete(double throttleMarkDelete) {
}
}

@Override
public ManagedLedger getManagedLedger() {
return this.ledger;
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.UUID;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
Expand Down Expand Up @@ -3023,6 +3024,9 @@ public static ManagedLedgerException createManagedLedgerException(int bkErrorCod
public static ManagedLedgerException createManagedLedgerException(Throwable t) {
if (t instanceof org.apache.bookkeeper.client.api.BKException) {
return createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException) t).getCode());
} else if (t instanceof CompletionException
&& !(t.getCause() instanceof CompletionException) /* check to avoid stackoverlflow */) {
return createManagedLedgerException(t.getCause());
} else {
return new ManagedLedgerException("Unknown exception");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import com.google.common.base.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;

import java.util.Optional;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
Expand Down Expand Up @@ -107,7 +110,7 @@ public void readEntryComplete(Entry entry, Object ctx) {

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
callback.findEntryFailed(exception, OpFindNewest.this.ctx);
callback.findEntryFailed(exception, Optional.ofNullable(searchPosition), OpFindNewest.this.ctx);
}

public void find() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.SkipEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -308,6 +309,12 @@ public void setThrottleMarkDelete(double throttleMarkDelete) {
public double getThrottleMarkDelete() {
return -1;
}

@Override
public ManagedLedger getManagedLedger() {
return null;
}

}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -2077,7 +2078,8 @@ public void findEntryComplete(Position position, Object ctx) {
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition,
Object ctx) {
result.exception = exception;
counter.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
*/
package org.apache.pulsar.broker.service.persistent;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import org.apache.bookkeeper.mledger.AsyncCallbacks.FindEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
Expand All @@ -37,6 +39,7 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback {
private final String subName;
private final String topicName;
private final Rate msgExpired;
private final boolean autoSkipNonRecoverableData;

private static final int FALSE = 0;
private static final int TRUE = 1;
Expand All @@ -50,6 +53,9 @@ public PersistentMessageExpiryMonitor(String topicName, String subscriptionName,
this.cursor = cursor;
this.subName = subscriptionName;
this.msgExpired = new Rate();
this.autoSkipNonRecoverableData = cursor.getManagedLedger() != null // check to avoid test failures
? cursor.getManagedLedger().getConfig().isAutoSkipNonRecoverableData()
: false;
}

public void expireMessages(int messageTTLInSeconds) {
Expand Down Expand Up @@ -124,10 +130,16 @@ public void findEntryComplete(Position position, Object ctx) {
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Finding expired entry operation failed", topicName, subName, exception);
}
if (autoSkipNonRecoverableData && failedReadPosition.isPresent()
&& (exception instanceof NonRecoverableLedgerException)) {
log.warn("[{}][{}] read failed from ledger at position:{} : {}", topicName, subName, failedReadPosition,
exception.getMessage());
findEntryComplete(failedReadPosition.get(), ctx);
}
expirationCheckInProgress = FALSE;
updateRates();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;

import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -83,7 +84,7 @@ public void findMessages(final long timestamp, AsyncCallbacks.FindEntryCallback
}
callback.findEntryFailed(
new ManagedLedgerException.ConcurrentFindCursorPositionException("last find is still running"),
null);
Optional.empty(), null);
}
}

Expand All @@ -106,14 +107,14 @@ public void findEntryComplete(Position position, Object ctx) {
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
checkArgument(ctx instanceof AsyncCallbacks.FindEntryCallback);
AsyncCallbacks.FindEntryCallback callback = (AsyncCallbacks.FindEntryCallback) ctx;
if (log.isDebugEnabled()) {
log.debug("[{}][{}] message position find operation failed for provided timestamp {}", topicName, subName,
timestamp, exception);
}
messageFindInProgress = FALSE;
callback.findEntryFailed(exception, null);
callback.findEntryFailed(exception, failedReadPosition, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand Down Expand Up @@ -593,7 +594,7 @@ public void findEntryComplete(Position position, Object ctx) {
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition, Object ctx) {
// todo - what can go wrong here that needs to be retried?
if (exception instanceof ConcurrentFindCursorPositionException) {
future.completeExceptionally(new SubscriptionBusyException(exception.getMessage()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

import java.lang.reflect.Field;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -41,7 +42,11 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import static org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest.retryStrategically;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
Expand Down Expand Up @@ -98,7 +103,8 @@ public void findEntryComplete(Position position, Object ctx) {
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition,
Object ctx) {
result.exception = exception;
future.completeExceptionally(exception);
}
Expand Down Expand Up @@ -167,20 +173,23 @@ void testPersistentMessageFinder() throws Exception {

PersistentMessageFinder messageFinder = new PersistentMessageFinder("topicname", c1);
final AtomicBoolean ex = new AtomicBoolean(false);
messageFinder.findEntryFailed(new ManagedLedgerException("failed"), new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
}
messageFinder.findEntryFailed(new ManagedLedgerException("failed"), Optional.empty(),
new AsyncCallbacks.FindEntryCallback() {
@Override
public void findEntryComplete(Position position, Object ctx) {
}

@Override
public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
ex.set(true);
}
});
@Override
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition,
Object ctx) {
ex.set(true);
}
});
assertTrue(ex.get());

PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1);
monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), null);
monitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"),
Optional.empty(), null);
Field field = monitor.getClass().getDeclaredField("expirationCheckInProgress");
field.setAccessible(true);
assertEquals(0, field.get(monitor));
Expand All @@ -190,4 +199,62 @@ public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
ledger.close();
factory.shutdown();
}

/**
* It tests that message expiry doesn't get stuck if it can't read deleted ledger's entry.
*
* @throws Exception
*/
@Test
void testMessageExpiryWithNonRecoverableException() throws Exception {

final String ledgerAndCursorName = "testPersistentMessageExpiryWithNonRecoverableLedgers";
final int entriesPerLedger = 2;
final int totalEntries = 10;
final int ttlSeconds = 1;

ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(10);
config.setMaxEntriesPerLedger(entriesPerLedger);
config.setRetentionTime(1, TimeUnit.HOURS);
config.setAutoSkipNonRecoverableData(true);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerAndCursorName, config);
ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(ledgerAndCursorName);

for (int i = 0; i < totalEntries; i++) {
ledger.addEntry(createMessageWrittenToLedger("msg" + i));
}

List<LedgerInfo> ledgers = ledger.getLedgersInfoAsList();
LedgerInfo lastLedgerInfo = ledgers.get(ledgers.size() - 1);

assertEquals(ledgers.size(), totalEntries / entriesPerLedger);

// this will make sure that all entries should be deleted
Thread.sleep(ttlSeconds);

bkc.deleteLedger(ledgers.get(0).getLedgerId());
bkc.deleteLedger(ledgers.get(1).getLedgerId());
bkc.deleteLedger(ledgers.get(2).getLedgerId());

PersistentMessageExpiryMonitor monitor = new PersistentMessageExpiryMonitor("topicname", c1.getName(), c1);
Position previousMarkDelete = null;
for (int i = 0; i < totalEntries; i++) {
monitor.expireMessages(1);
Position previousPos = previousMarkDelete;
retryStrategically(
(test) -> c1.getMarkDeletedPosition() != null && !c1.getMarkDeletedPosition().equals(previousPos),
5, 100);
previousMarkDelete = c1.getMarkDeletedPosition();
}

PositionImpl markDeletePosition = (PositionImpl) c1.getMarkDeletedPosition();
assertEquals(lastLedgerInfo.getLedgerId(), markDeletePosition.getLedgerId());
assertEquals(lastLedgerInfo.getEntries() - 1, markDeletePosition.getEntryId());

c1.close();
ledger.close();
factory.shutdown();

}
}

0 comments on commit c5ba529

Please sign in to comment.