Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1902,20 +1902,25 @@ void ensembleChangeLoop(List<BookieSocketAddress> origEnsemble, Map<Integer, Boo
LOG.debug("{}[attempt:{}] Success updating metadata.", logContext, attempts.get());
}

List<BookieSocketAddress> newEnsemble = null;
Set<Integer> replaced = null;
synchronized (metadataLock) {
if (!delayedWriteFailedBookies.isEmpty()) {
Map<Integer, BookieSocketAddress> toReplace = new HashMap<>(delayedWriteFailedBookies);
delayedWriteFailedBookies.clear();

ensembleChangeLoop(origEnsemble, toReplace);
} else {
List<BookieSocketAddress> newEnsemble = getCurrentEnsemble();
Set<Integer> replaced = EnsembleUtils.diffEnsemble(origEnsemble, newEnsemble);
newEnsemble = getCurrentEnsemble();
replaced = EnsembleUtils.diffEnsemble(origEnsemble, newEnsemble);
LOG.info("New Ensemble: {} for ledger: {}", newEnsemble, ledgerId);
unsetSuccessAndSendWriteRequest(newEnsemble, replaced);

changingEnsemble = false;
}
}
if (newEnsemble != null) { // unsetSuccess outside of lock
unsetSuccessAndSendWriteRequest(newEnsemble, replaced);
}
}
}, clientCtx.getMainWorkerPool().chooseThread(ledgerId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,4 +446,50 @@ public void testNoAddsAreCompletedWhileFailureHandlingInProgress() throws Except
Assert.assertEquals(lh.getLedgerMetadata().getAllEnsembles().get(0L), Lists.newArrayList(b1, b2, b3));
Assert.assertEquals(lh.getLedgerMetadata().getAllEnsembles().get(1L), Lists.newArrayList(b1, b2, b4));
}

@Test
public void testHandleFailureBookieNotInWriteSet() throws Exception {
MockClientContext clientCtx = MockClientContext.create();
Versioned<LedgerMetadata> md = ClientUtil.setupLedger(clientCtx, 10L,
LedgerMetadataBuilder.create()
.withEnsembleSize(3).withWriteQuorumSize(2).withAckQuorumSize(1)
.newEnsembleEntry(0L, Lists.newArrayList(b1, b2, b3)));
clientCtx.getMockRegistrationClient().addBookies(b4).get();

CompletableFuture<Void> b1Delay = new CompletableFuture<>();
// Delay the first write to b1, then error it
clientCtx.getMockBookieClient().setPreWriteHook((bookie, ledgerId, entryId) -> {
if (bookie.equals(b1)) {
return b1Delay;
} else {
return FutureUtils.value(null);
}
});

CompletableFuture<Void> changeInProgress = new CompletableFuture<>();
CompletableFuture<Void> blockEnsembleChange = new CompletableFuture<>();
clientCtx.getMockLedgerManager().setPreWriteHook((ledgerId, metadata) -> {
changeInProgress.complete(null);
return blockEnsembleChange;
});

LedgerHandle lh = new LedgerHandle(clientCtx, 10L, md, BookKeeper.DigestType.CRC32C,
ClientUtil.PASSWD, WriteFlag.NONE);
log.info("b2 should be enough to complete first add");
lh.append("entry1".getBytes());

log.info("when b1 completes with failure, handleFailures should kick off");
b1Delay.completeExceptionally(new BKException.BKWriteException());

log.info("write second entry, should have enough bookies, but blocks completion on failure handling");
CompletableFuture<?> e2 = lh.appendAsync("entry2".getBytes());
changeInProgress.get();
assertEventuallyTrue("e2 should eventually complete", () -> lh.pendingAddOps.peek().completed);
Assert.assertFalse("e2 shouldn't be completed to client", e2.isDone());
blockEnsembleChange.complete(null); // allow ensemble change to continue

log.info("e2 should complete");
e2.get(10, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: we can use 'get()'. The test will fail due to timeout if hanging endlessly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eolivelli I'm trying with raw get() now, not timing out even after 6 minutes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like the default timeout is 30 minutes, which is too much :/

}

}