Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.bookkeeper.client;

import static com.google.common.base.Preconditions.checkArgument;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.bookkeeper.net.BookieSocketAddress;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class EnsembleUtils {
private static final Logger LOG = LoggerFactory.getLogger(EnsembleUtils.class);

static List<BookieSocketAddress> replaceBookiesInEnsemble(BookieWatcher bookieWatcher,
LedgerMetadata metadata,
List<BookieSocketAddress> oldEnsemble,
Map<Integer, BookieSocketAddress> failedBookies,
String logContext)
throws BKException.BKNotEnoughBookiesException {
List<BookieSocketAddress> newEnsemble = new ArrayList<>(oldEnsemble);

int ensembleSize = metadata.getEnsembleSize();
int writeQ = metadata.getWriteQuorumSize();
int ackQ = metadata.getAckQuorumSize();
Map<String, byte[]> customMetadata = metadata.getCustomMetadata();

Set<BookieSocketAddress> exclude = new HashSet<>(failedBookies.values());

int replaced = 0;
for (Map.Entry<Integer, BookieSocketAddress> entry : failedBookies.entrySet()) {
int idx = entry.getKey();
BookieSocketAddress addr = entry.getValue();
if (LOG.isDebugEnabled()) {
LOG.debug("{} replacing bookie: {} index: {}", logContext, addr, idx);
}

if (!newEnsemble.get(idx).equals(addr)) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} Not changing failed bookie {} at index {}, already changed to {}",
logContext, addr, idx, newEnsemble.get(idx));
}
continue;
}
try {
BookieSocketAddress newBookie = bookieWatcher.replaceBookie(
ensembleSize, writeQ, ackQ, customMetadata, newEnsemble, idx, exclude);
newEnsemble.set(idx, newBookie);

replaced++;
} catch (BKException.BKNotEnoughBookiesException e) {
// if there is no bookie replaced, we throw not enough bookie exception
if (replaced <= 0) {
throw e;
} else {
break;
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a logging message dropped. it would be great not to drop log messages related to ensemble changes. they exist for a reason.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, the final summary message is pretty handy.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will readd. It's not a final summary however, it's the change that we wish to make. It's not final until the zookeeper write completes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, but let us not drop any debug/log messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Readded, but in the calling method, as this method doesn't have all the context.

return newEnsemble;
}

static Set<Integer> diffEnsemble(List<BookieSocketAddress> e1,
List<BookieSocketAddress> e2) {
checkArgument(e1.size() == e2.size(), "Ensembles must be of same size");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this better than Assert?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asserts get turned off by default at runtime.

Set<Integer> diff = new HashSet<>();
for (int i = 0; i < e1.size(); i++) {
if (!e1.get(i).equals(e2.get(i))) {
diff.add(i);
}
}
return diff;
}
}

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,12 @@ protected void doAsyncAddEntry(final PendingAddOp op) {
// synchronized on this to ensure that
// the ledger isn't closed between checking and
// updating lastAddPushed
if (getLedgerMetadata().isClosed()) {
wasClosed = true;
} else {
if (isHandleWritable()) {
long currentLength = addToLength(op.payload.readableBytes());
op.setLedgerLength(currentLength);
pendingAddOps.add(op);
} else {
wasClosed = true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static com.google.common.base.Charsets.UTF_8;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
Expand Down Expand Up @@ -148,9 +149,14 @@ public LedgerMetadata(int ensembleSize,
this.writeQuorumSize = writeQuorumSize;
this.ackQuorumSize = ackQuorumSize;
this.state = state;
lastEntryId.ifPresent((eid) -> this.lastEntryId = eid);
if (lastEntryId.isPresent()) {
this.lastEntryId = lastEntryId.get();
} else {
this.lastEntryId = LedgerHandle.INVALID_ENTRY_ID;
}
length.ifPresent((l) -> this.length = l);
setEnsembles(ensembles);

if (state != LedgerMetadataFormat.State.CLOSED) {
currentEnsemble = this.ensembles.lastEntry().getValue();
}
Expand Down Expand Up @@ -788,11 +794,13 @@ Set<BookieSocketAddress> getBookiesInThisLedger() {
return bookies;
}

java.util.Optional<Long> getLastEnsembleKey() {
if (ensembles.size() > 0) {
return java.util.Optional.of(ensembles.lastKey());
} else {
return java.util.Optional.empty();
}
List<BookieSocketAddress> getLastEnsembleValue() {
checkState(!ensembles.isEmpty(), "Metadata should never be created with no ensembles");
return ensembles.lastEntry().getValue();
}

Long getLastEnsembleKey() {
checkState(!ensembles.isEmpty(), "Metadata should never be created with no ensembles");
return ensembles.lastKey();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,19 @@ LedgerMetadataBuilder withEnsembleSize(int ensembleSize) {
return this;
}

LedgerMetadataBuilder withWriteQuorumSize(int writeQuorumSize) {
checkArgument(ensembleSize >= writeQuorumSize, "Write quorum must be less or equal to ensemble size");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check writeQuorumSize >= ackQuorumSize as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

checkArgument(writeQuorumSize >= ackQuorumSize, "Write quorum must be greater or equal to ack quorum");
this.writeQuorumSize = writeQuorumSize;
return this;
}

LedgerMetadataBuilder withAckQuorumSize(int ackQuorumSize) {
checkArgument(writeQuorumSize >= ackQuorumSize, "Ack quorum must be less or equal to write quorum");
this.ackQuorumSize = ackQuorumSize;
return this;
}

LedgerMetadataBuilder newEnsembleEntry(long firstEntry, List<BookieSocketAddress> ensemble) {
checkArgument(ensemble.size() == ensembleSize,
"Size of passed in ensemble must match the ensembleSize of the builder");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,10 +251,8 @@ public void safeRun() {

// We are about to send. Check if we need to make an ensemble change
// becasue of delayed write errors
Map <Integer, BookieSocketAddress> delayedWriteFailedBookies = lh.getDelayedWriteFailedBookies();
if (!delayedWriteFailedBookies.isEmpty()) {
lh.handleDelayedWriteBookieFailure();
}
lh.maybeHandleDelayedWriteBookieFailure();

// Iterate over set and trigger the sendWriteRequests
DistributionSchedule.WriteSet writeSet = lh.distributionSchedule.getWriteSet(entryId);

Expand Down Expand Up @@ -293,7 +291,7 @@ public void writeComplete(int rc, long ledgerId, long entryId, BookieSocketAddre
clientCtx.getClientStats().getAddOpUrCounter().inc();
if (!clientCtx.getConf().disableEnsembleChangeFeature.isAvailable()
&& !clientCtx.getConf().delayEnsembleChange) {
lh.getDelayedWriteFailedBookies().putIfAbsent(bookieIndex, addr);
lh.notifyWriteFailed(bookieIndex, addr);
}
}
// even the add operation is completed, but because we don't reset completed flag back to false when
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@
*/
package org.apache.bookkeeper.client;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;

import com.google.common.annotations.VisibleForTesting;

import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -209,64 +205,6 @@ public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
}, ctx);
}

List<BookieSocketAddress> replaceBookiesInEnsemble(LedgerMetadata metadata,
List<BookieSocketAddress> oldEnsemble,
Map<Integer, BookieSocketAddress> failedBookies)
throws BKException.BKNotEnoughBookiesException {
List<BookieSocketAddress> newEnsemble = new ArrayList<>(oldEnsemble);

int ensembleSize = metadata.getEnsembleSize();
int writeQ = metadata.getWriteQuorumSize();
int ackQ = metadata.getAckQuorumSize();
Map<String, byte[]> customMetadata = metadata.getCustomMetadata();

Set<BookieSocketAddress> exclude = new HashSet<>(failedBookies.values());

int replaced = 0;
for (Map.Entry<Integer, BookieSocketAddress> entry : failedBookies.entrySet()) {
int idx = entry.getKey();
BookieSocketAddress addr = entry.getValue();
if (LOG.isDebugEnabled()) {
LOG.debug("[EnsembleChange-L{}] replacing bookie: {} index: {}", getId(), addr, idx);
}

if (!newEnsemble.get(idx).equals(addr)) {
if (LOG.isDebugEnabled()) {
LOG.debug("[EnsembleChange-L{}] Not changing failed bookie {} at index {}, already changed to {}",
getId(), addr, idx, newEnsemble.get(idx));
}
continue;
}
try {
BookieSocketAddress newBookie = clientCtx.getBookieWatcher().replaceBookie(
ensembleSize, writeQ, ackQ, customMetadata, newEnsemble, idx, exclude);
newEnsemble.set(idx, newBookie);

replaced++;
} catch (BKException.BKNotEnoughBookiesException e) {
// if there is no bookie replaced, we throw not enough bookie exception
if (replaced <= 0) {
throw e;
} else {
break;
}
}
}
return newEnsemble;
}

private static Set<Integer> diffEnsemble(List<BookieSocketAddress> e1,
List<BookieSocketAddress> e2) {
checkArgument(e1.size() == e2.size(), "Ensembles must be of same size");
Set<Integer> diff = new HashSet<>();
for (int i = 0; i < e1.size(); i++) {
if (!e1.get(i).equals(e2.get(i))) {
diff.add(i);
}
}
return diff;
}

/**
* For a read only ledger handle, this method will only ever be called during recovery,
* when we are reading forward from LAC and writing back those entries. As such,
Expand All @@ -276,21 +214,19 @@ private static Set<Integer> diffEnsemble(List<BookieSocketAddress> e1,
*/
@Override
void handleBookieFailure(final Map<Integer, BookieSocketAddress> failedBookies) {
blockAddCompletions.incrementAndGet();

// handleBookieFailure should always run in the ordered executor thread for this
// ledger, so this synchronized should be unnecessary, but putting it here now
// just in case (can be removed when we validate threads)
synchronized (metadataLock) {
String logContext = String.format("[RecoveryEnsembleChange(ledger:%d)]", ledgerId);

long lac = getLastAddConfirmed();
LedgerMetadata metadata = getLedgerMetadata();
List<BookieSocketAddress> currentEnsemble = getCurrentEnsemble();
try {
List<BookieSocketAddress> newEnsemble = replaceBookiesInEnsemble(metadata, currentEnsemble,
failedBookies);

Set<Integer> replaced = diffEnsemble(currentEnsemble, newEnsemble);
blockAddCompletions.decrementAndGet();
List<BookieSocketAddress> newEnsemble = EnsembleUtils.replaceBookiesInEnsemble(
clientCtx.getBookieWatcher(), metadata, currentEnsemble, failedBookies, logContext);
Set<Integer> replaced = EnsembleUtils.diffEnsemble(currentEnsemble, newEnsemble);
if (!replaced.isEmpty()) {
newEnsemblesFromRecovery.put(lac + 1, newEnsemble);
unsetSuccessAndSendWriteRequest(newEnsemble, replaced);
Expand Down Expand Up @@ -378,16 +314,14 @@ CompletableFuture<LedgerMetadata> closeRecovered() {
(metadata) -> metadata.isInRecovery(),
(metadata) -> {
LedgerMetadataBuilder builder = LedgerMetadataBuilder.from(metadata);
Optional<Long> lastEnsembleKey = metadata.getLastEnsembleKey();
checkState(lastEnsembleKey.isPresent(),
"Metadata shouldn't have been created without at least one ensemble");
Long lastEnsembleKey = metadata.getLastEnsembleKey();
synchronized (metadataLock) {
newEnsemblesFromRecovery.entrySet().forEach(
(e) -> {
checkState(e.getKey() >= lastEnsembleKey.get(),
checkState(e.getKey() >= lastEnsembleKey,
"Once a ledger is in recovery, noone can add ensembles without closing");
// Occurs when a bookie need to be replaced at very start of recovery
if (lastEnsembleKey.get().equals(e.getKey())) {
if (lastEnsembleKey.equals(e.getKey())) {
builder.replaceEnsembleEntry(e.getKey(), e.getValue());
} else {
builder.newEnsembleEntry(e.getKey(), e.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ default long append(byte[] data, int offset, int length) throws BKException, Int
* entry of the ledger is. Once the ledger has been closed, all reads from the
* ledger will return the same set of entries.
*
* <p>The close operation can error if it finds conflicting metadata when it
* tries to write to the metadata store. On close, the metadata state is set to
* closed and lastEntry and length of the ledger are fixed in the metadata. A
* conflict occurs if the metadata in the metadata store has a different value for
* the lastEntry or length. If another process has updated the metadata, setting it
* to closed, but have fixed the lastEntry and length to the same values as this
* process is trying to write, the operation completes successfully.
*
* @return an handle to access the result of the operation
*/
@Override
Expand All @@ -152,6 +160,14 @@ default long append(byte[] data, int offset, int length) throws BKException, Int
* <p>Closing a ledger will ensure that all clients agree on what the last
* entry of the ledger is. Once the ledger has been closed, all reads from the
* ledger will return the same set of entries.
*
* <p>The close operation can error if it finds conflicting metadata when it
* tries to write to the metadata store. On close, the metadata state is set to
* closed and lastEntry and length of the ledger are fixed in the metadata. A
* conflict occurs if the metadata in the metadata store has a different value for
* the lastEntry or length. If another process has updated the metadata, setting it
* to closed, but have fixed the lastEntry and length to the same values as this
* process is trying to write, the operation completes successfully.
*/
@Override
default void close() throws BKException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -536,12 +536,9 @@ public void testReadAfterLastAddConfirmed() throws Exception {
}
}

try {
writeLh.close();
fail("should not be able to close the first LedgerHandler as a recovery has been performed");
} catch (BKException.BKMetadataVersionException expected) {
}

// should still be able to close as long as recovery closed the ledger
// with the same last entryId and length as in the write handle.
writeLh.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm this changes the closing behavior. I am not sure how it would impact the applications. so I would suggest keep the original behavior if closing a ledger hit metadata version exception don't attempt. If you want to change the behavior, do a separate PR for it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a desirable change to me. Moreover, it protects against the case where the racing update came from the same client due to ZooKeeperClient resending the update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sijie the previous behaviour is not documented, nor is it well defined. In some cases a metadata version exception allowed a close to succeed, and in others it did not. I would not expect any application is relying on this behaviour, and if they are, they are probably broken in many other ways.
I can revert this my putting throwing an exception in the Predicate part of the loop if the metadata is closed. There'd be no guarantee that behaviour is still exactly matching though, because it isn't well defined currently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understood it is not documented. but if we are changing any existing behavior, I would suggest either doing it in a separate PR, or if it is difficult to do it in a separate PR, then update the javadoc of this method to make things clear "this method will not throw any exceptions anymore if hitting metadata version exceptions"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may still throw an exception on metadata version exception. However, it will only throw the exception if the length or last entry id in the conflicting write is different to what the caller of #close believed it to be. I strongly prefer making this change as part of this patch, as to do otherwise would be to insert arbitrary strange behaviour into the new implementation. I'll add a javadoc for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added javadoc

}
}

Expand Down
Loading