Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.TimedGenericCallback;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.State;
import org.apache.bookkeeper.proto.PerChannelBookieClientPool;
Expand All @@ -88,6 +89,7 @@
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.util.OrderedGenericCallback;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.versioning.Version;
import org.apache.commons.collections4.IteratorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -96,7 +98,7 @@
* Ledger handle contains ledger metadata and is used to access the read and
* write operations to a ledger.
*/
public class LedgerHandle implements WriteHandle {
public class LedgerHandle implements WriteHandle, LedgerMetadataListener{
static final Logger LOG = LoggerFactory.getLogger(LedgerHandle.class);

static final long PENDINGREQ_NOTWRITABLE_MASK = 0x01L << 62;
Expand Down Expand Up @@ -199,6 +201,7 @@ public Map<Integer, BookieSocketAddress> getDelayedWriteFailedBookies() {
} else {
this.throttler = null;
}
bk.getLedgerManager().registerLedgerMetadataListener(ledgerId, this);

macManager = DigestManager.instantiate(ledgerId, password, BookKeeper.DigestType.toProtoDigestType(digestType),
bk.getConf().getUseV2WireProtocol());
Expand Down Expand Up @@ -461,6 +464,7 @@ public void close()
public CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> result = new CompletableFuture<>();
SyncCloseCallback callback = new SyncCloseCallback(result);
bk.getLedgerManager().unregisterLedgerMetadataListener(ledgerId, this);
asyncClose(callback, null);
explicitLacFlushPolicy.stopExplicitLacFlush();
if (timeoutFuture != null) {
Expand Down Expand Up @@ -2382,4 +2386,60 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
}
}

class MetadataMerger extends SafeRunnable {

final LedgerMetadata m;

MetadataMerger(LedgerMetadata metadata) {
this.m = metadata;
}

@Override
public void safeRun() {
Version.Occurred occurred =
LedgerHandle.this.metadata.getVersion().compare(this.m.getVersion());
if (Version.Occurred.BEFORE == occurred) {
LOG.info("Updated ledger metadata for ledger {} to {}.", ledgerId, this.m.toSafeString());
synchronized (LedgerHandle.this) {
LedgerHandle.this.metadata.setVersion(m.getVersion());
LedgerHandle.this.metadata.mergeEnsembles(m.getEnsembles());
}
}
}

@Override
public String toString() {
return String.format("MetadataMerger(%d)", ledgerId);
}
}

@Override
public void onChanged(long ledgerId, LedgerMetadata newMetadata) {
if (LOG.isDebugEnabled()) {
LOG.debug("Received ledger metadata update on writeHandle {} : {}",
ledgerId, newMetadata);
}
if (this.ledgerId != ledgerId) {
// Ignore
return;
}
if (null == newMetadata) {
// Ignore
return;
}
Version.Occurred occurred =
this.metadata.getVersion().compare(newMetadata.getVersion());
if (LOG.isDebugEnabled()) {
LOG.debug("Try to merge metadata from {} to {} : {}",
this.metadata, newMetadata, occurred);
}
if (Version.Occurred.BEFORE == occurred) { // the metadata is updated
try {
bk.getMainWorkerPool().executeOrdered(ledgerId, new MetadataMerger(newMetadata));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no need to merge. The metadata read from zookeeper must have the same last ensemble as the metadata currently being used, or else we're violating a whole load of properties. So in theory, you should be able to assign newMetadata to metadata. In practice it can be tricky with all the mutation that occurs while handling bookie failure. So leave the merge for now, but I will remove it once the ledger immutable metadata changes are in (I should have remaining patches up today or monday/tuesday next week)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ivankelly where are we now with the whole set of immutable changes?
There is another problem with this patch, the metadata is being accessed with and without lock in the code and that needs to be corrected too; may be covered as part of immutable changes. Also, we need to stop the writer as we discussed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jvrao the stack is blocked, waiting on #1589, but otherwise the change is pretty much ready.

} catch (RejectedExecutionException ree) {
LOG.error("Failed on submitting updater to merge ledger metadata on ledger {} : {}",
ledgerId, newMetadata);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.LedgerMetadataListener;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.bookkeeper.versioning.Version;

Expand All @@ -41,7 +40,7 @@
* the public write operations from LedgerHandle.
* It should be returned for BookKeeper#openLedger operations.
*/
class ReadOnlyLedgerHandle extends LedgerHandle implements LedgerMetadataListener {
class ReadOnlyLedgerHandle extends LedgerHandle {

class MetadataUpdater extends SafeRunnable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@
import static org.apache.bookkeeper.client.BookKeeperClientStats.READ_OP_DM;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Enumeration;
Expand All @@ -54,9 +56,18 @@
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.api.WriteAdvHandle;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.bookkeeper.meta.LedgerUnderreplicationManager;
import org.apache.bookkeeper.meta.LongHierarchicalLedgerManagerFactory;
import org.apache.bookkeeper.meta.MetadataBookieDriver;
import org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.bookkeeper.meta.zk.ZKMetadataDriverBase;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.replication.ReplicationTestUtil;
import org.apache.bookkeeper.replication.ReplicationWorker;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.BookKeeperConstants;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.KeeperException;
import org.junit.Assert;
Expand Down Expand Up @@ -87,6 +98,10 @@ public class BookieWriteLedgerTest extends
ArrayList<byte[]> entries2; // generated entries

private final DigestType digestType;
private MetadataBookieDriver driver;
private LedgerManagerFactory mFactory;
private LedgerUnderreplicationManager underReplicationManager;
private String basePath = "";

private static class SyncObj {
volatile int counter;
Expand All @@ -105,6 +120,31 @@ public void setUp() throws Exception {
// Number Generator
entries1 = new ArrayList<byte[]>(); // initialize the entries list
entries2 = new ArrayList<byte[]>(); // initialize the entries list
setUpReplication();
}

private void setUpReplication() throws Exception {
this.driver = MetadataDrivers.getBookieDriver(
URI.create(baseConf.getMetadataServiceUri()));
this.driver.initialize(
baseConf,
() -> {},
NullStatsLogger.INSTANCE);
// initialize urReplicationManager
this.mFactory = driver.getLedgerManagerFactory();
this.underReplicationManager = this.mFactory.newLedgerUnderreplicationManager();
}

@Override
public void tearDown() throws Exception {
super.tearDown();
if (null != underReplicationManager){
underReplicationManager.close();
underReplicationManager = null;
}
if (null != driver) {
driver.close();
}
}

public BookieWriteLedgerTest() {
Expand Down Expand Up @@ -164,6 +204,93 @@ public void testWithMultipleBookieFailuresInLastEnsemble() throws Exception {
lh.close();
}

/**
* Verify the write ledger handle can receive and process replication worker
* ensemble changes to facilitate reads on the write ledger handle.
*/
@Test
public void testWriteAndReadOnWriteLedgerHandleWithReplication() throws Exception {
// Create a ledger
lh = bkc.createLedger(3, 3, 2, digestType, ledgerPassword);
// write 10 entries
int numEntriesToWrite = 10;
int i;

for (i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
lh.addEntry(entry.array());
}

List<BookieSocketAddress> ensemble1;
// Shutdown a bookie in the last ensemble and continue writing
List<BookieSocketAddress> ensemble2, ensemble1n;
ensemble1 = lh.getLedgerMetadata()
.getEnsembles().entrySet().iterator().next().getValue();

// kill a bookie to force ensemble change
killBookie(ensemble1.get(0));

// write second batch
for (i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
lh.addEntry(entry.array());
}
// Get the latest ensemble
ensemble2 = lh.getLedgerMetadata().currentEnsemble;

assertEquals("Make sure there are two Ensembles", lh.getLedgerMetadata().getEnsembles().entrySet().size(), 2);

// Grab the first ensemble after ensemble change.
ensemble1n = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next().getValue();

assertEquals("first bookie of first ensembe must remain same as Replication did not kickin.",
ensemble1.get(0), ensemble1n.get(0));
assertNotEquals("first bookie of first ensembe and second ensemble must be different.",
ensemble1.get(0), ensemble2.get(0));

// Start the ReplicationWorker
ReplicationWorker rw = new ReplicationWorker(zkc, baseConf);
rw.start();
String zkLedgersRootPath = ZKMetadataDriverBase.resolveZkLedgersRootPath(baseClientConf);

basePath = zkLedgersRootPath + '/'
+ BookKeeperConstants.UNDER_REPLICATION_NODE
+ BookKeeperConstants.DEFAULT_ZK_LEDGERS_ROOT_PATH;

try {
underReplicationManager.markLedgerUnderreplicated(lh.getId(),
ensemble1.get(0).toString());
while (ReplicationTestUtil.isLedgerInUnderReplication(zkc, lh
.getId(), basePath)) {
Thread.sleep(100);
}
} finally {
rw.shutdown();
}

// After replication get the first ensemble set as it must have changed.
ensemble1n = lh.getLedgerMetadata()
.getEnsembles().entrySet().iterator().next().getValue();

assertNotEquals("first bookie of first ensembe must remain same as Replication did not kickin.",
ensemble1.get(0), ensemble1n.get(0));

// write third batch
for (i = 0; i < numEntriesToWrite; i++) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
lh.addEntry(entry.array());
}
lh.close();
assertEquals("At the end, make sure there are two Ensembles",
lh.getLedgerMetadata().getEnsembles().entrySet().size(), 2);
}

/**
* Verify write and Read durability stats.
*/
Expand Down Expand Up @@ -263,6 +390,7 @@ public void testWriteAndReadStats() throws Exception {
.get() > 0);
lh.close();
}

/**
* Verty delayedWriteError causes ensemble changes.
*/
Expand Down Expand Up @@ -332,6 +460,7 @@ public void testDelayedWriteEnsembleChange() throws Exception {
bookie1.equals(bookie2));
lh.close();
}

/**
* Verify the functionality Ledgers with different digests.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class ReplicationTestUtil {
/**
* Checks whether ledger is in under-replication.
*/
static boolean isLedgerInUnderReplication(ZooKeeper zkc, long id,
public static boolean isLedgerInUnderReplication(ZooKeeper zkc, long id,
String basePath) throws KeeperException, InterruptedException {
List<String> children;
try {
Expand Down