|
31 | 31 | import java.util.concurrent.CompletableFuture;
|
32 | 32 | import java.util.concurrent.CountDownLatch;
|
33 | 33 | import java.util.concurrent.ExecutionException;
|
| 34 | +import java.util.concurrent.TimeUnit; |
34 | 35 | import java.util.concurrent.atomic.AtomicReference;
|
35 | 36 | import lombok.Cleanup;
|
36 | 37 | import org.apache.bookkeeper.client.BKException;
|
37 | 38 | import org.apache.bookkeeper.client.BookKeeper;
|
38 | 39 | import org.apache.bookkeeper.client.LedgerHandle;
|
39 | 40 | import org.apache.bookkeeper.client.api.DigestType;
|
| 41 | +import org.apache.bookkeeper.mledger.AsyncCallbacks; |
40 | 42 | import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
|
41 | 43 | import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
|
42 | 44 | import org.apache.bookkeeper.mledger.Entry;
|
@@ -509,6 +511,35 @@ public void recoverAfterWriteError() throws Exception {
|
509 | 511 | entries.forEach(Entry::release);
|
510 | 512 | }
|
511 | 513 |
|
| 514 | + @Test |
| 515 | + public void recoverAfterOpenManagedLedgerFail() throws Exception { |
| 516 | + ManagedLedger ledger = factory.open("recoverAfterOpenManagedLedgerFail"); |
| 517 | + Position position = ledger.addEntry("entry".getBytes()); |
| 518 | + ledger.close(); |
| 519 | + bkc.failAfter(0, BKException.Code.BookieHandleNotAvailableException); |
| 520 | + try { |
| 521 | + factory.open("recoverAfterOpenManagedLedgerFail"); |
| 522 | + } catch (Exception e) { |
| 523 | + // ok |
| 524 | + } |
| 525 | + |
| 526 | + ledger = factory.open("recoverAfterOpenManagedLedgerFail"); |
| 527 | + CompletableFuture<byte[]> future = new CompletableFuture<>(); |
| 528 | + ((ManagedLedgerImpl)ledger).asyncReadEntry((PositionImpl)position, new AsyncCallbacks.ReadEntryCallback() { |
| 529 | + @Override |
| 530 | + public void readEntryComplete(Entry entry, Object ctx) { |
| 531 | + future.complete(entry.getData()); |
| 532 | + } |
| 533 | + |
| 534 | + @Override |
| 535 | + public void readEntryFailed(ManagedLedgerException exception, Object ctx) { |
| 536 | + future.completeExceptionally(exception); |
| 537 | + } |
| 538 | + }, null); |
| 539 | + byte[] bytes = future.get(30, TimeUnit.SECONDS); |
| 540 | + assertEquals(new String(bytes), "entry"); |
| 541 | + } |
| 542 | + |
512 | 543 | @Test
|
513 | 544 | public void recoverLongTimeAfterMultipleWriteErrors() throws Exception {
|
514 | 545 | ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("recoverLongTimeAfterMultipleWriteErrors");
|
|
0 commit comments