Skip to content

Commit

Permalink
When we try to get information for a topic, we should try to load it …
Browse files Browse the repository at this point in the history
…if exists instead of return "not found" (apache#1500)

* When we try to get information for a topic, we should try to load it if exists instead of return "not found"

* Fixed tests

* fixed broker unit tests
  • Loading branch information
merlimat authored Apr 5, 2018
1 parent 683ee95 commit fb3511d
Show file tree
Hide file tree
Showing 39 changed files with 354 additions and 257 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
@Beta
public class ManagedLedgerConfig {

private boolean createIfMissing = true;
private int maxUnackedRangesToPersist = 10000;
private int maxUnackedRangesToPersistInZk = 1000;
private int maxEntriesPerLedger = 50000;
Expand All @@ -54,6 +55,15 @@ public class ManagedLedgerConfig {
private DigestType digestType = DigestType.CRC32C;
private byte[] password = "".getBytes(Charsets.UTF_8);

public boolean isCreateIfMissing() {
return createIfMissing;
}

public ManagedLedgerConfig setCreateIfMissing(boolean createIfMissing) {
this.createIfMissing = createIfMissing;
return this;
}

/**
* @return the maxEntriesPerLedger
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@ public BadVersionException(Exception e) {
}
}

public static class MetadataNotFoundException extends MetaStoreException {
public MetadataNotFoundException(Exception e) {
super(e);
}
}


public static class ManagedLedgerFencedException extends ManagedLedgerException {
public ManagedLedgerFencedException() {
super(new Exception("Attempted to use a fenced managed ledger"));
Expand All @@ -60,6 +67,12 @@ public ManagedLedgerFencedException(Exception e) {
}
}

public static class ManagedLedgerNotFoundException extends ManagedLedgerException {
public ManagedLedgerNotFoundException(Exception e) {
super(e);
}
}

public static class ManagedLedgerTerminatedException extends ManagedLedgerException {
public ManagedLedgerTerminatedException(String msg) {
super(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) {

@Override
public void asyncGetManagedLedgerInfo(String name, ManagedLedgerInfoCallback callback, Object ctx) {
store.getManagedLedgerInfo(name, new MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
store.getManagedLedgerInfo(name, false /* createIfMissing */,
new MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
@Override
public void operationComplete(MLDataFormats.ManagedLedgerInfo pbInfo, Stat stat) {
ManagedLedgerInfo info = new ManagedLedgerInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,10 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.ManagedLedgerMXBean;
Expand Down Expand Up @@ -231,7 +233,7 @@ synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callbac
log.info("Opening managed ledger {}", name);

// Fetch the list of existing ledgers in the managed ledger
store.getManagedLedgerInfo(name, new MetaStoreCallback<ManagedLedgerInfo>() {
store.getManagedLedgerInfo(name, config.isCreateIfMissing(), new MetaStoreCallback<ManagedLedgerInfo>() {
@Override
public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {
ledgersStat = stat;
Expand Down Expand Up @@ -284,7 +286,11 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) {

@Override
public void operationFailed(MetaStoreException e) {
callback.initializeFailed(new ManagedLedgerException(e));
if (e instanceof MetadataNotFoundException) {
callback.initializeFailed(new ManagedLedgerNotFoundException(e));
} else {
callback.initializeFailed(new ManagedLedgerException(e));
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ private void readLedgerMeta(final ManagedLedgerFactoryImpl factory, final TopicN
BookKeeper bk = factory.getBookKeeper();
final CountDownLatch mlMetaCounter = new CountDownLatch(1);

store.getManagedLedgerInfo(managedLedgerName,
store.getManagedLedgerInfo(managedLedgerName, false /* createIfMissing */,
new MetaStore.MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
@Override
public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, MetaStore.Stat version) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,11 @@ interface MetaStoreCallback<T> {
*
* @param ledgerName
* the name of the ManagedLedger
* @param createIfMissing
* whether the managed ledger metadata should be created if it doesn't exist already
* @throws MetaStoreException
*/
void getManagedLedgerInfo(String ledgerName, MetaStoreCallback<ManagedLedgerInfo> callback);
void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, MetaStoreCallback<ManagedLedgerInfo> callback);

/**
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;

import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
Expand Down Expand Up @@ -128,36 +129,48 @@ private ManagedLedgerInfo updateMLInfoTimestamp(ManagedLedgerInfo info) {
}

@Override
public void getManagedLedgerInfo(final String ledgerName, final MetaStoreCallback<ManagedLedgerInfo> callback) {
public void getManagedLedgerInfo(final String ledgerName, boolean createIfMissing,
final MetaStoreCallback<ManagedLedgerInfo> callback) {
// Try to get the content or create an empty node
zk.getData(prefix + ledgerName, false,
(rc, path, ctx, readData, stat) -> executor.executeOrdered(ledgerName, safeRun(() -> {
if (rc == Code.OK.intValue()) {
try {
ManagedLedgerInfo info = parseManagedLedgerInfo(readData);
info = updateMLInfoTimestamp(info);
callback.operationComplete(info, new ZKStat(stat));
} catch (ParseException | InvalidProtocolBufferException e) {
callback.operationFailed(new MetaStoreException(e));
}
} else if (rc == Code.NONODE.intValue()) {
log.info("Creating '{}{}'", prefix, ledgerName);
if (rc == Code.OK.intValue()) {
try {
ManagedLedgerInfo info = parseManagedLedgerInfo(readData);
info = updateMLInfoTimestamp(info);
callback.operationComplete(info, new ZKStat(stat));
} catch (ParseException | InvalidProtocolBufferException e) {
callback.operationFailed(new MetaStoreException(e));
}
} else if (rc == Code.NONODE.intValue()) {
// Z-node doesn't exist
if (createIfMissing) {
log.info("Creating '{}{}'", prefix, ledgerName);

StringCallback createcb = (rc1, path1, ctx1, name) -> {
if (rc1 == Code.OK.intValue()) {
ManagedLedgerInfo info = ManagedLedgerInfo.getDefaultInstance();
callback.operationComplete(info, new ZKStat());
} else {
callback.operationFailed(
new MetaStoreException(KeeperException.create(Code.get(rc1))));
}
};

ZkUtils.asyncCreateFullPathOptimistic(zk, prefix + ledgerName, new byte[0], Acl,
CreateMode.PERSISTENT, createcb, null);
} else {
// Tried to open a managed ledger but it doesn't exist and we shouldn't creating it at this
// point

StringCallback createcb = (rc1, path1, ctx1, name) -> {
if (rc1 == Code.OK.intValue()) {
ManagedLedgerInfo info = ManagedLedgerInfo.getDefaultInstance();
callback.operationComplete(info, new ZKStat());
callback.operationFailed(new ManagedLedgerException.MetadataNotFoundException(
KeeperException.create(Code.get(rc))));
}
} else {
callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc1))));
// Other ZK error
callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
}
};

ZkUtils.asyncCreateFullPathOptimistic(zk, prefix + ledgerName, new byte[0], Acl, CreateMode.PERSISTENT,
createcb, null);
} else {
callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc))));
}
})), null);
})), null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,20 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.testng.Assert.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import com.google.common.base.Charsets;
import com.google.common.collect.Sets;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
Expand All @@ -41,6 +48,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
Expand All @@ -56,6 +64,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.Position;
Expand Down Expand Up @@ -1460,7 +1469,7 @@ public void cursorReadsWithDiscardedEmptyLedgersStillListed() throws Exception {
// from the list of ledgers
final CountDownLatch counter = new CountDownLatch(1);
final MetaStore store = factory.getMetaStore();
store.getManagedLedgerInfo("my_test_ledger", new MetaStoreCallback<ManagedLedgerInfo>() {
store.getManagedLedgerInfo("my_test_ledger", false, new MetaStoreCallback<ManagedLedgerInfo>() {
@Override
public void operationComplete(ManagedLedgerInfo result, Stat version) {
// Update the list
Expand Down Expand Up @@ -1756,7 +1765,7 @@ public void testBackwardCompatiblityForMeta() throws Exception {
CountDownLatch l1 = new CountDownLatch(1);

// obtain the ledger info
store.getManagedLedgerInfo("backward_test_ledger", new MetaStoreCallback<ManagedLedgerInfo>() {
store.getManagedLedgerInfo("backward_test_ledger", false, new MetaStoreCallback<ManagedLedgerInfo>() {
@Override
public void operationComplete(ManagedLedgerInfo result, Stat version) {
storedMLInfo[0] = result;
Expand Down Expand Up @@ -2184,4 +2193,25 @@ public void testConsumerSubscriptionInitializePosition() throws Exception{
ledger.close();

}

@Test
public void testManagedLedgerAutoCreate() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig().setCreateIfMissing(true);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test", config);
assertNotNull(ledger);
}

@Test
public void testManagedLedgerWithoutAutoCreate() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig().setCreateIfMissing(false);

try {
factory.open("testManagedLedgerWithoutAutoCreate", config);
fail("should have thrown ManagedLedgerNotFoundException");
} catch (ManagedLedgerNotFoundException e) {
// Expected
}

assertFalse(factory.getManagedLedgers().containsKey("testManagedLedgerWithoutAutoCreate"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ void readMalformedML() throws Exception {

final CountDownLatch latch = new CountDownLatch(1);

store.getManagedLedgerInfo("my_test", new MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
store.getManagedLedgerInfo("my_test", false, new MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
public void operationFailed(MetaStoreException e) {
// Ok
latch.countDown();
Expand Down Expand Up @@ -131,7 +131,7 @@ void failInCreatingMLnode() throws Exception {

zkc.failAfter(1, Code.CONNECTIONLOSS);

store.getManagedLedgerInfo("my_test", new MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
store.getManagedLedgerInfo("my_test", false, new MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>() {
public void operationFailed(MetaStoreException e) {
// Ok
latch.countDown();
Expand Down Expand Up @@ -189,7 +189,7 @@ void updatingMLNode() throws Exception {

final CountDownLatch latch = new CountDownLatch(1);

store.getManagedLedgerInfo("my_test", new MetaStoreCallback<ManagedLedgerInfo>() {
store.getManagedLedgerInfo("my_test", false, new MetaStoreCallback<ManagedLedgerInfo>() {
public void operationFailed(MetaStoreException e) {
fail("should have succeeded");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ public void loadNamespaceTopics(NamespaceBundle bundle) {
try {
TopicName topicName = TopicName.get(topic);
if (bundle.includes(topicName)) {
CompletableFuture<Topic> future = brokerService.getTopic(topic);
CompletableFuture<Topic> future = brokerService.getOrCreateTopic(topic);
if (future != null) {
persistentTopics.add(future);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1136,21 +1136,12 @@ public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMet
* Get the Topic object reference from the Pulsar broker
*/
private Topic getTopicReference(TopicName topicName) {
try {
Topic topic = pulsar().getBrokerService().getTopicReference(topicName.toString());
checkNotNull(topic);
return topic;
} catch (Exception e) {
throw new RestException(Status.NOT_FOUND, "Topic not found");
}
return pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join()
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic not found"));
}

private Topic getOrCreateTopic(TopicName topicName) {
try {
return pulsar().getBrokerService().getTopic(topicName.toString()).get();
} catch (InterruptedException | ExecutionException e) {
throw new RestException(e);
}
return pulsar().getBrokerService().getOrCreateTopic(topicName.toString()).join();
}

/**
Expand Down
Loading

0 comments on commit fb3511d

Please sign in to comment.