Skip to content

Commit

Permalink
[fix] [test] Wrong mock-fail of the test ManagedLedgerErrorsTest.reco…
Browse files Browse the repository at this point in the history
…verLongTimeAfterMultipleWriteErrors (apache#19545)
  • Loading branch information
poorbarcode authored Feb 21, 2023
1 parent e0b50c9 commit 954f406
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import java.util.concurrent.atomic.AtomicReference;
import lombok.Cleanup;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
Expand All @@ -48,6 +50,7 @@
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -511,9 +514,10 @@ public void recoverAfterWriteError() throws Exception {
public void recoverLongTimeAfterMultipleWriteErrors() throws Exception {
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("recoverLongTimeAfterMultipleWriteErrors");
ManagedCursor cursor = ledger.openCursor("c1");
LedgerHandle firstLedger = ledger.currentLedger;

bkc.failAfter(0, BKException.Code.BookieHandleNotAvailableException);
bkc.failAfter(1, BKException.Code.BookieHandleNotAvailableException);
bkc.addEntryFailAfter(0, BKException.Code.BookieHandleNotAvailableException);
bkc.addEntryFailAfter(1, BKException.Code.BookieHandleNotAvailableException);

CountDownLatch counter = new CountDownLatch(2);
AtomicReference<ManagedLedgerException> ex = new AtomicReference<>();
Expand All @@ -540,6 +544,18 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
counter.await();
assertNull(ex.get());

Awaitility.await().untilAsserted(() -> {
try {
bkc.openLedger(firstLedger.getId(),
BookKeeper.DigestType.fromApiDigestType(ledger.getConfig().getDigestType()),
ledger.getConfig().getPassword());
fail("The expected behavior is that the first ledger will be deleted, but it still exists.");
} catch (Exception ledgerDeletedEx){
// Expected LedgerNotExistsEx: the first ledger will be deleted after add entry fail.
assertTrue(ledgerDeletedEx instanceof BKException.BKNoSuchLedgerExistsException);
}
});

assertEquals(cursor.getNumberOfEntriesInBacklog(false), 2);

// Ensure that we are only creating one new ledger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public static Collection<BookieId> getMockEnsemble() {

final Queue<Long> addEntryDelaysMillis = new ConcurrentLinkedQueue<>();
final List<CompletableFuture<Void>> failures = new ArrayList<>();
final List<CompletableFuture<Void>> addEntryFailures = new ArrayList<>();

public PulsarMockBookKeeper(OrderedExecutor orderedExecutor) throws Exception {
this.orderedExecutor = orderedExecutor;
Expand Down Expand Up @@ -317,6 +318,13 @@ synchronized boolean checkReturnEmptyLedger() {
return shouldFailNow;
}

synchronized CompletableFuture<Void> getAddEntryFailure() {
if (!addEntryFailures.isEmpty()){
return addEntryFailures.remove(0);
}
return failures.isEmpty() ? defaultResponse : failures.remove(0);
}

synchronized CompletableFuture<Void> getProgrammedFailure() {
return failures.isEmpty() ? defaultResponse : failures.remove(0);
}
Expand All @@ -326,7 +334,11 @@ public void failNow(int rc) {
}

public void failAfter(int steps, int rc) {
promiseAfter(steps).completeExceptionally(BKException.create(rc));
promiseAfter(steps, failures).completeExceptionally(BKException.create(rc));
}

public void addEntryFailAfter(int steps, int rc) {
promiseAfter(steps, addEntryFailures).completeExceptionally(BKException.create(rc));
}

private int emptyLedgerAfter = -1;
Expand All @@ -339,6 +351,10 @@ public synchronized void returnEmptyLedgerAfter(int steps) {
}

public synchronized CompletableFuture<Void> promiseAfter(int steps) {
return promiseAfter(steps, failures);
}

public synchronized CompletableFuture<Void> promiseAfter(int steps, List<CompletableFuture<Void>> failures) {
while (failures.size() <= steps) {
failures.add(defaultResponse);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void asyncAddEntry(final byte[] data, final int offset, final int length,

@Override
public void asyncAddEntry(final ByteBuf data, final AddCallback cb, final Object ctx) {
bk.getProgrammedFailure().thenComposeAsync((res) -> {
bk.getAddEntryFailure().thenComposeAsync((res) -> {
Long delayMillis = bk.addEntryDelaysMillis.poll();
if (delayMillis == null) {
delayMillis = 1L;
Expand Down

0 comments on commit 954f406

Please sign in to comment.