diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index ed4ccb3544cc1..b38999b09139e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -32,6 +32,7 @@ @Beta public class ManagedLedgerConfig { + private boolean createIfMissing = true; private int maxUnackedRangesToPersist = 10000; private int maxUnackedRangesToPersistInZk = 1000; private int maxEntriesPerLedger = 50000; @@ -54,6 +55,15 @@ public class ManagedLedgerConfig { private DigestType digestType = DigestType.CRC32C; private byte[] password = "".getBytes(Charsets.UTF_8); + public boolean isCreateIfMissing() { + return createIfMissing; + } + + public ManagedLedgerConfig setCreateIfMissing(boolean createIfMissing) { + this.createIfMissing = createIfMissing; + return this; + } + /** * @return the maxEntriesPerLedger */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java index c6ed6aff1ece1..c4b401adeb431 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java @@ -50,6 +50,13 @@ public BadVersionException(Exception e) { } } + public static class MetadataNotFoundException extends MetaStoreException { + public MetadataNotFoundException(Exception e) { + super(e); + } + } + + public static class ManagedLedgerFencedException extends ManagedLedgerException { public ManagedLedgerFencedException() { super(new Exception("Attempted to use a fenced managed ledger")); @@ -60,6 +67,12 @@ public ManagedLedgerFencedException(Exception e) { } } + public static class ManagedLedgerNotFoundException extends ManagedLedgerException { + public ManagedLedgerNotFoundException(Exception e) { + super(e); + } + } + public static class ManagedLedgerTerminatedException extends ManagedLedgerException { public ManagedLedgerTerminatedException(String msg) { super(msg); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index d148d1dda0282..cb4d2f99e4f0e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -342,7 +342,8 @@ public void getInfoFailed(ManagedLedgerException exception, Object ctx) { @Override public void asyncGetManagedLedgerInfo(String name, ManagedLedgerInfoCallback callback, Object ctx) { - store.getManagedLedgerInfo(name, new MetaStoreCallback() { + store.getManagedLedgerInfo(name, false /* createIfMissing */, + new MetaStoreCallback() { @Override public void operationComplete(MLDataFormats.ManagedLedgerInfo pbInfo, Stat stat) { ManagedLedgerInfo info = new ManagedLedgerInfo(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 4aceb6ca3c8a9..de77a44d85d5b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -70,8 +70,10 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.MetadataNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException; import org.apache.bookkeeper.mledger.ManagedLedgerMXBean; @@ -231,7 +233,7 @@ synchronized void initialize(final ManagedLedgerInitializeLedgerCallback callbac log.info("Opening managed ledger {}", name); // Fetch the list of existing ledgers in the managed ledger - store.getManagedLedgerInfo(name, new MetaStoreCallback() { + store.getManagedLedgerInfo(name, config.isCreateIfMissing(), new MetaStoreCallback() { @Override public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { ledgersStat = stat; @@ -284,7 +286,11 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { @Override public void operationFailed(MetaStoreException e) { - callback.initializeFailed(new ManagedLedgerException(e)); + if (e instanceof MetadataNotFoundException) { + callback.initializeFailed(new ManagedLedgerNotFoundException(e)); + } else { + callback.initializeFailed(new ManagedLedgerException(e)); + } } }); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java index c07ed5552a85c..39405af512813 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerOfflineBacklog.java @@ -141,7 +141,7 @@ private void readLedgerMeta(final ManagedLedgerFactoryImpl factory, final TopicN BookKeeper bk = factory.getBookKeeper(); final CountDownLatch mlMetaCounter = new CountDownLatch(1); - store.getManagedLedgerInfo(managedLedgerName, + store.getManagedLedgerInfo(managedLedgerName, false /* createIfMissing */, new MetaStore.MetaStoreCallback() { @Override public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, MetaStore.Stat version) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java index 27e4acf45cf90..72ef566a9cc7d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStore.java @@ -52,9 +52,11 @@ interface MetaStoreCallback { * * @param ledgerName * the name of the ManagedLedger + * @param createIfMissing + * whether the managed ledger metadata should be created if it doesn't exist already * @throws MetaStoreException */ - void getManagedLedgerInfo(String ledgerName, MetaStoreCallback callback); + void getManagedLedgerInfo(String ledgerName, boolean createIfMissing, MetaStoreCallback callback); /** * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java index 31705c1b6d8b7..354c04f4fc97f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeper.java @@ -30,6 +30,7 @@ import java.util.List; import org.apache.bookkeeper.common.util.OrderedExecutor; +import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.BadVersionException; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo; @@ -128,36 +129,48 @@ private ManagedLedgerInfo updateMLInfoTimestamp(ManagedLedgerInfo info) { } @Override - public void getManagedLedgerInfo(final String ledgerName, final MetaStoreCallback callback) { + public void getManagedLedgerInfo(final String ledgerName, boolean createIfMissing, + final MetaStoreCallback callback) { // Try to get the content or create an empty node zk.getData(prefix + ledgerName, false, (rc, path, ctx, readData, stat) -> executor.executeOrdered(ledgerName, safeRun(() -> { - if (rc == Code.OK.intValue()) { - try { - ManagedLedgerInfo info = parseManagedLedgerInfo(readData); - info = updateMLInfoTimestamp(info); - callback.operationComplete(info, new ZKStat(stat)); - } catch (ParseException | InvalidProtocolBufferException e) { - callback.operationFailed(new MetaStoreException(e)); - } - } else if (rc == Code.NONODE.intValue()) { - log.info("Creating '{}{}'", prefix, ledgerName); + if (rc == Code.OK.intValue()) { + try { + ManagedLedgerInfo info = parseManagedLedgerInfo(readData); + info = updateMLInfoTimestamp(info); + callback.operationComplete(info, new ZKStat(stat)); + } catch (ParseException | InvalidProtocolBufferException e) { + callback.operationFailed(new MetaStoreException(e)); + } + } else if (rc == Code.NONODE.intValue()) { + // Z-node doesn't exist + if (createIfMissing) { + log.info("Creating '{}{}'", prefix, ledgerName); + + StringCallback createcb = (rc1, path1, ctx1, name) -> { + if (rc1 == Code.OK.intValue()) { + ManagedLedgerInfo info = ManagedLedgerInfo.getDefaultInstance(); + callback.operationComplete(info, new ZKStat()); + } else { + callback.operationFailed( + new MetaStoreException(KeeperException.create(Code.get(rc1)))); + } + }; + + ZkUtils.asyncCreateFullPathOptimistic(zk, prefix + ledgerName, new byte[0], Acl, + CreateMode.PERSISTENT, createcb, null); + } else { + // Tried to open a managed ledger but it doesn't exist and we shouldn't creating it at this + // point - StringCallback createcb = (rc1, path1, ctx1, name) -> { - if (rc1 == Code.OK.intValue()) { - ManagedLedgerInfo info = ManagedLedgerInfo.getDefaultInstance(); - callback.operationComplete(info, new ZKStat()); + callback.operationFailed(new ManagedLedgerException.MetadataNotFoundException( + KeeperException.create(Code.get(rc)))); + } } else { - callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc1)))); + // Other ZK error + callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); } - }; - - ZkUtils.asyncCreateFullPathOptimistic(zk, prefix + ledgerName, new byte[0], Acl, CreateMode.PERSISTENT, - createcb, null); - } else { - callback.operationFailed(new MetaStoreException(KeeperException.create(Code.get(rc)))); - } - })), null); + })), null); } @Override diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index f6cbb12f6e07c..a91c159fb5311 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -18,13 +18,20 @@ */ package org.apache.bookkeeper.mledger.impl; -import static org.testng.Assert.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; import com.google.common.base.Charsets; import com.google.common.collect.Sets; + import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.Unpooled; + import java.io.IOException; import java.lang.reflect.Field; import java.lang.reflect.Modifier; @@ -41,6 +48,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; + import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; @@ -56,6 +64,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerException.MetaStoreException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.Position; @@ -1460,7 +1469,7 @@ public void cursorReadsWithDiscardedEmptyLedgersStillListed() throws Exception { // from the list of ledgers final CountDownLatch counter = new CountDownLatch(1); final MetaStore store = factory.getMetaStore(); - store.getManagedLedgerInfo("my_test_ledger", new MetaStoreCallback() { + store.getManagedLedgerInfo("my_test_ledger", false, new MetaStoreCallback() { @Override public void operationComplete(ManagedLedgerInfo result, Stat version) { // Update the list @@ -1756,7 +1765,7 @@ public void testBackwardCompatiblityForMeta() throws Exception { CountDownLatch l1 = new CountDownLatch(1); // obtain the ledger info - store.getManagedLedgerInfo("backward_test_ledger", new MetaStoreCallback() { + store.getManagedLedgerInfo("backward_test_ledger", false, new MetaStoreCallback() { @Override public void operationComplete(ManagedLedgerInfo result, Stat version) { storedMLInfo[0] = result; @@ -2184,4 +2193,25 @@ public void testConsumerSubscriptionInitializePosition() throws Exception{ ledger.close(); } + + @Test + public void testManagedLedgerAutoCreate() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig().setCreateIfMissing(true); + ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test", config); + assertNotNull(ledger); + } + + @Test + public void testManagedLedgerWithoutAutoCreate() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig().setCreateIfMissing(false); + + try { + factory.open("testManagedLedgerWithoutAutoCreate", config); + fail("should have thrown ManagedLedgerNotFoundException"); + } catch (ManagedLedgerNotFoundException e) { + // Expected + } + + assertFalse(factory.getManagedLedgers().containsKey("testManagedLedgerWithoutAutoCreate")); + } } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java index a5f6a3a60f42a..17b996215dc15 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/MetaStoreImplZookeeperTest.java @@ -85,7 +85,7 @@ void readMalformedML() throws Exception { final CountDownLatch latch = new CountDownLatch(1); - store.getManagedLedgerInfo("my_test", new MetaStoreCallback() { + store.getManagedLedgerInfo("my_test", false, new MetaStoreCallback() { public void operationFailed(MetaStoreException e) { // Ok latch.countDown(); @@ -131,7 +131,7 @@ void failInCreatingMLnode() throws Exception { zkc.failAfter(1, Code.CONNECTIONLOSS); - store.getManagedLedgerInfo("my_test", new MetaStoreCallback() { + store.getManagedLedgerInfo("my_test", false, new MetaStoreCallback() { public void operationFailed(MetaStoreException e) { // Ok latch.countDown(); @@ -189,7 +189,7 @@ void updatingMLNode() throws Exception { final CountDownLatch latch = new CountDownLatch(1); - store.getManagedLedgerInfo("my_test", new MetaStoreCallback() { + store.getManagedLedgerInfo("my_test", false, new MetaStoreCallback() { public void operationFailed(MetaStoreException e) { fail("should have succeeded"); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 0fd81a7fa13ed..e5623d4c5174d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -523,7 +523,7 @@ public void loadNamespaceTopics(NamespaceBundle bundle) { try { TopicName topicName = TopicName.get(topic); if (bundle.includes(topicName)) { - CompletableFuture future = brokerService.getTopic(topic); + CompletableFuture future = brokerService.getOrCreateTopic(topic); if (future != null) { persistentTopics.add(future); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 86e68583d3643..aa52c64de7eed 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1136,21 +1136,12 @@ public static CompletableFuture getPartitionedTopicMet * Get the Topic object reference from the Pulsar broker */ private Topic getTopicReference(TopicName topicName) { - try { - Topic topic = pulsar().getBrokerService().getTopicReference(topicName.toString()); - checkNotNull(topic); - return topic; - } catch (Exception e) { - throw new RestException(Status.NOT_FOUND, "Topic not found"); - } + return pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join() + .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic not found")); } private Topic getOrCreateTopic(TopicName topicName) { - try { - return pulsar().getBrokerService().getTopic(topicName.toString()).get(); - } catch (InterruptedException | ExecutionException e) { - throw new RestException(e); - } + return pulsar().getBrokerService().getOrCreateTopic(topicName.toString()).join(); } /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java index 0f9ffe1449d4c..644af2583810d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java @@ -18,7 +18,16 @@ */ package org.apache.pulsar.broker.admin.v1; -import static com.google.common.base.Preconditions.checkNotNull; +import com.google.common.collect.Lists; + +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import javax.ws.rs.DefaultValue; import javax.ws.rs.Encoded; @@ -31,15 +40,14 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response.Status; -import com.google.common.collect.Lists; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.common.naming.Constants; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.NonPersistentTopicStats; import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; @@ -49,15 +57,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; - -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; - /** */ @Path("/non-persistent") @@ -255,12 +254,7 @@ protected void validateAdminOperationOnTopic(TopicName topicName, boolean author } private Topic getTopicReference(TopicName topicName) { - try { - Topic topic = pulsar().getBrokerService().getTopicReference(topicName.toString()); - checkNotNull(topic); - return topic; - } catch (Exception e) { - throw new RestException(Status.NOT_FOUND, "Topic not found"); - } + return pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join() + .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic not found")); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index 493030b7f3d26..bc584adabefd2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -18,8 +18,10 @@ */ package org.apache.pulsar.broker.admin.v2; -import static com.google.common.base.Preconditions.checkNotNull; - +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiResponse; +import io.swagger.annotations.ApiResponses; import javax.ws.rs.DefaultValue; import javax.ws.rs.Encoded; @@ -43,12 +45,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import io.swagger.annotations.Api; -import io.swagger.annotations.ApiOperation; -import io.swagger.annotations.ApiResponse; -import io.swagger.annotations.ApiResponses; - /** */ @Path("/non-persistent") @@ -148,12 +144,7 @@ protected void validateAdminOperationOnTopic(TopicName topicName, boolean author } private Topic getTopicReference(TopicName topicName) { - try { - Topic topic = pulsar().getBrokerService().getTopicReference(topicName.toString()); - checkNotNull(topic); - return topic; - } catch (Exception e) { - throw new RestException(Status.NOT_FOUND, "Topic not found"); - } + return pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join() + .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic not found")); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6c2e011ead519..18afed56c32c6 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -25,6 +25,19 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Queues; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.channel.AdaptiveRecvByteBufAllocator; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.handler.ssl.SslContext; +import io.netty.util.concurrent.DefaultThreadFactory; + import java.io.Closeable; import java.io.IOException; import java.lang.reflect.Field; @@ -50,12 +63,12 @@ import java.util.function.Consumer; import java.util.function.Predicate; -import org.apache.bookkeeper.client.BookKeeper.DigestType; import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerConfig; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.util.ZkUtils; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -72,7 +85,6 @@ import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; -import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -116,19 +128,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Queues; - -import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.PooledByteBufAllocator; -import io.netty.channel.AdaptiveRecvByteBufAllocator; -import io.netty.channel.ChannelOption; -import io.netty.channel.EventLoopGroup; -import io.netty.handler.ssl.SslContext; -import io.netty.util.concurrent.DefaultThreadFactory; - public class BrokerService implements Closeable, ZooKeeperCacheListener { private static final Logger log = LoggerFactory.getLogger(BrokerService.class); @@ -421,11 +420,20 @@ public void unloadNamespaceBundlesGracefully() { } } - public CompletableFuture getTopic(final String topic) { + public CompletableFuture> getTopicIfExists(final String topic) { + return getTopic(topic, false /* createIfMissing */ ).thenApply(t -> Optional.ofNullable(t)); + } + + public CompletableFuture getOrCreateTopic(final String topic) { + return getTopic(topic, true /* createIfMissing */ ); + } + + private CompletableFuture getTopic(final String topic, boolean createIfMissing) { try { CompletableFuture topicFuture = topics.get(topic); if (topicFuture != null) { - if (topicFuture.isCompletedExceptionally()) { + if (topicFuture.isCompletedExceptionally() + || (topicFuture.isDone() && topicFuture.getNow(null) == null)) { // Exceptional topics should be recreated. topics.remove(topic, topicFuture); } else { @@ -434,7 +442,7 @@ public CompletableFuture getTopic(final String topic) { } final boolean isPersistentTopic = TopicName.get(topic).getDomain().equals(TopicDomain.persistent); return topics.computeIfAbsent(topic, (topicName) -> { - return isPersistentTopic ? this.createPersistentTopic(topicName) + return isPersistentTopic ? this.loadOrCreatePersistentTopic(topicName, createIfMissing) : createNonPersistentTopic(topicName); }); } catch (IllegalArgumentException e) { @@ -540,7 +548,7 @@ public PulsarClient getReplicationClient(String cluster) { * @return CompletableFuture * @throws RuntimeException */ - protected CompletableFuture createPersistentTopic(final String topic) throws RuntimeException { + protected CompletableFuture loadOrCreatePersistentTopic(final String topic, boolean createIfMissing) throws RuntimeException { checkTopicNsOwnership(topic); final CompletableFuture topicFuture = new CompletableFuture<>(); @@ -555,7 +563,7 @@ protected CompletableFuture createPersistentTopic(final String topic) thr final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); if (topicLoadSemaphore.tryAcquire()) { - createPersistentTopic(topic, topicFuture); + createPersistentTopic(topic, createIfMissing, topicFuture); topicFuture.handle((persistentTopic, ex) -> { // release permit and process pending topic topicLoadSemaphore.release(); @@ -571,7 +579,7 @@ protected CompletableFuture createPersistentTopic(final String topic) thr return topicFuture; } - private void createPersistentTopic(final String topic, CompletableFuture topicFuture) { + private void createPersistentTopic(final String topic, boolean createIfMissing, CompletableFuture topicFuture) { final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); TopicName topicName = TopicName.get(topic); @@ -585,6 +593,8 @@ private void createPersistentTopic(final String topic, CompletableFuture } getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> { + managedLedgerConfig.setCreateIfMissing(createIfMissing); + // Once we have the configuration, we can proceed with the async open operation managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), managedLedgerConfig, new OpenLedgerCallback() { @@ -625,9 +635,14 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { @Override public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { - log.warn("Failed to create topic {}", topic, exception); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new PersistenceException(exception)); + if (!createIfMissing && exception instanceof ManagedLedgerNotFoundException) { + // We were just trying to load a topic and the topic doesn't exist + topicFuture.complete(null); + } else { + log.warn("Failed to create topic {}", topic, exception); + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + topicFuture.completeExceptionally(new PersistenceException(exception)); + } } }, null); @@ -760,12 +775,17 @@ public void invalidateOfflineTopicStatCache(TopicName topicName) { } } - public Topic getTopicReference(String topic) throws Exception { - CompletableFuture future = topics.get(topic); + /** + * Get a reference to a topic that is currently loaded in the broker. + * + * This method will not make the broker attempt to load the topic if it's not already. + */ + public Optional getTopicReference(String topic) { + CompletableFuture future = topics.get(topic); if (future != null && future.isDone() && !future.isCompletedExceptionally()) { - return future.get(); + return Optional.ofNullable(future.join()); } else { - return null; + return Optional.empty(); } } @@ -1330,7 +1350,7 @@ private void createPendingLoadTopic() { CompletableFuture pendingFuture = pendingTopic.getRight(); final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); - createPersistentTopic(topic, pendingFuture); + createPersistentTopic(topic, true, pendingFuture); pendingFuture.handle((persistentTopic, ex) -> { // release permit and process next pending topic if (acquiredPermit) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 0bc311fa7beca..239756eff63c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -621,7 +621,7 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { } } - service.getTopic(topicName.toString()) + service.getOrCreateTopic(topicName.toString()) .thenCompose(topic -> topic.subscribe(ServerCnx.this, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, initialPosition)) @@ -809,7 +809,7 @@ protected void handleProducer(final CommandProducer cmdProducer) { log.info("[{}][{}] Creating producer. producerId={}", remoteAddress, topicName, producerId); - service.getTopic(topicName.toString()).thenAccept((Topic topic) -> { + service.getOrCreateTopic(topicName.toString()).thenAccept((Topic topic) -> { // Before creating producer, check if backlog quota exceeded // on topic if (topic.isBacklogQuotaExceeded(producerName)) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java index 018657aa34053..2bc1652a78e38 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java @@ -20,6 +20,7 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; @@ -327,7 +328,7 @@ public void testUpdatePersistencePolicyUpdateManagedCursor() throws Exception { Producer producer = pulsarClient.newProducer().topic(topicName).create(); Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) topic.getManagedLedger(); ManagedCursorImpl cursor = (ManagedCursorImpl) managedLedger.getCursors().iterator().next(); @@ -364,31 +365,29 @@ public void testUnloadTopic(final String topicType) throws Exception { Producer producer = pulsarClient.newProducer().topic(topicName).create(); producer.close(); - Topic topic = pulsar.getBrokerService().getTopicReference(topicName); - assertNotNull(topic); + Topic topic = pulsar.getBrokerService().getTopicIfExists(topicName).join().get(); final boolean isPersistentTopic = topic instanceof PersistentTopic; // (1) unload the topic unloadTopic(topicName, isPersistentTopic); - topic = pulsar.getBrokerService().getTopicReference(topicName); - // topic must be removed - assertNull(topic); + + // topic must be removed from map + assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); // recreation of producer will load the topic again producer = pulsarClient.newProducer().topic(topicName).create(); - topic = pulsar.getBrokerService().getTopicReference(topicName); + topic = pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topic); // unload the topic unloadTopic(topicName, isPersistentTopic); // producer will retry and recreate the topic for (int i = 0; i < 5; i++) { - topic = pulsar.getBrokerService().getTopicReference(topicName); - if (topic == null || i != 4) { + if (!pulsar.getBrokerService().getTopicReference(topicName).isPresent() || i != 4) { Thread.sleep(200); } } // topic should be loaded by this time - topic = pulsar.getBrokerService().getTopicReference(topicName); + topic = pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topic); } @@ -715,7 +714,7 @@ public void testNonPersistentTopics() throws Exception { } for (int i = 0; i < totalTopics; i++) { - Topic topic = pulsar.getBrokerService().getTopicReference(topicName + i); + Topic topic = pulsar.getBrokerService().getTopicReference(topicName + i).get(); assertNotNull(topic); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java index 533ce85077c7a..5a9642253c7f1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BatchMessageTest.java @@ -96,7 +96,7 @@ public void testSimpleBatchProducerWithFixedBatchSize(CompressionType compressio } FutureUtil.waitForAll(sendFutureList).get(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); rolloverPerIntervalStats(); assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0); @@ -141,7 +141,7 @@ public void testSimpleBatchProducerWithFixedBatchTime(CompressionType compressio } FutureUtil.waitForAll(sendFutureList).get(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); rolloverPerIntervalStats(); assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0); @@ -177,7 +177,7 @@ public void testSimpleBatchProducerWithFixedBatchSizeAndTime(CompressionType com } FutureUtil.waitForAll(sendFutureList).get(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); rolloverPerIntervalStats(); assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0); @@ -222,7 +222,7 @@ public void testBatchProducerWithLargeMessage(CompressionType compressionType) t FutureUtil.waitForAll(sendFutureList).get(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); rolloverPerIntervalStats(); assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0); @@ -266,7 +266,7 @@ public void testSimpleBatchProducerConsumer(CompressionType compressionType) thr } FutureUtil.waitForAll(sendFutureList).get(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); rolloverPerIntervalStats(); assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0); @@ -313,7 +313,7 @@ public void testSimpleBatchSyncProducerWithFixedBatchSize() throws Exception { producer.send(msg); } - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); rolloverPerIntervalStats(); assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0); @@ -369,7 +369,7 @@ public void testSimpleBatchProducerConsumer1kMessages() throws Exception { } LOG.info("[{}] sent {} messages", subscriptionName, numMsgs); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); // allow stats to be updated.. Thread.sleep(5000); @@ -422,7 +422,7 @@ public void testOutOfOrderAcksForBatchMessage() throws Exception { } FutureUtil.waitForAll(sendFutureList).get(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); rolloverPerIntervalStats(); assertEquals(topic.getSubscription(subscriptionName).getNumberOfEntriesInBacklog(), numMsgs / numMsgsInBatch); @@ -495,7 +495,7 @@ public void testNonBatchCumulativeAckAfterBatchPublish() throws Exception { Message nmsg = MessageBuilder.create().setContent(nobatchmsg).build(); noBatchProducer.sendAsync(nmsg).get(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); rolloverPerIntervalStats(); assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0); @@ -547,7 +547,7 @@ public void testBatchAndNonBatchCumulativeAcks() throws Exception { } FutureUtil.waitForAll(sendFutureList).get(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); rolloverPerIntervalStats(); assertTrue(topic.getProducers().values().iterator().next().getStats().msgRateIn > 0.0); @@ -605,7 +605,7 @@ public void testConcurrentBatchMessageAck() throws Exception { } FutureUtil.waitForAll(sendFutureList).get(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); final Consumer myConsumer = pulsarClient.newConsumer().topic(topicName) .subscriptionName(subscriptionName).subscriptionType(SubscriptionType.Shared).subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 1eaa187416c9c..febdc0789cd5d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -160,7 +160,7 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception { consumer.acknowledge(msg); } - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topic1).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get(); ManagedCursorImpl cursor = (ManagedCursorImpl) topic.getManagedLedger().getCursors().iterator().next(); retryStrategically((test) -> cursor.getState().equals("Open"), 5, 100); @@ -195,7 +195,7 @@ public void testCrashBrokerWithoutCursorLedgerLeak() throws Exception { } // (5) Broker should create new cursor-ledger and remove old cursor-ledger - topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topic1).get(); + topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get(); final ManagedCursorImpl cursor1 = (ManagedCursorImpl) topic.getManagedLedger().getCursors().iterator().next(); retryStrategically((test) -> cursor1.getState().equals("Open"), 5, 100); long newCursorLedgerId = cursor1.getCursorLedger(); @@ -243,7 +243,7 @@ public void testSkipCorruptDataLedger() throws Exception { Consumer consumer = client.newConsumer().topic(topic1).subscriptionName("my-subscriber-name") .receiverQueueSize(5).subscribe(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topic1).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topic1).get(); ManagedLedgerImpl ml = (ManagedLedgerImpl) topic.getManagedLedger(); ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().iterator().next(); Field configField = ManagedCursorImpl.class.getDeclaredField("config"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index f48c082169cc6..13e605702792f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -104,7 +104,7 @@ public void testOwnedNsCheck() throws Exception { BrokerService service = pulsar.getBrokerService(); final CountDownLatch latch1 = new CountDownLatch(1); - service.getTopic(topic).thenAccept(t -> { + service.getOrCreateTopic(topic).thenAccept(t -> { latch1.countDown(); fail("should fail as NS is not owned"); }).exceptionally(exception -> { @@ -117,7 +117,7 @@ public void testOwnedNsCheck() throws Exception { admin.lookups().lookupTopic(topic); final CountDownLatch latch2 = new CountDownLatch(1); - service.getTopic(topic).thenAccept(t -> { + service.getOrCreateTopic(topic).thenAccept(t -> { try { assertNotNull(service.getTopicReference(topic)); } catch (Exception e) { @@ -144,7 +144,7 @@ public void testBrokerServicePersistentTopicStats() throws Exception { .acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); rolloverPerIntervalStats(); @@ -221,7 +221,7 @@ public void testBrokerServicePersistentRedeliverTopicStats() throws Exception { .subscriptionType(SubscriptionType.Shared).acknowledmentGroupTime(0, TimeUnit.SECONDS).subscribe(); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); rolloverPerIntervalStats(); @@ -723,7 +723,7 @@ public void testTopicLoadingOnDisableNamespaceBundle() throws Exception { pulsar.getNamespaceService().getOwnershipCache().updateBundleState(bundle, false); // try to create topic which should fail as bundle is disable - CompletableFuture futureResult = pulsar.getBrokerService().createPersistentTopic(topicName); + CompletableFuture futureResult = pulsar.getBrokerService().loadOrCreatePersistentTopic(topicName, true); try { futureResult.get(); @@ -766,7 +766,7 @@ public void testTopicFailureShouldNotHaveDeadLock() { // create topic async and wait on the future completion executor.submit(() -> { - service.getTopic(deadLockTestTopic).thenAccept(topic -> topicCreation.complete(null)).exceptionally(e -> { + service.getOrCreateTopic(deadLockTestTopic).thenAccept(topic -> topicCreation.complete(null)).exceptionally(e -> { topicCreation.completeExceptionally(e.getCause()); return null; }); @@ -802,7 +802,7 @@ public void testLedgerOpenFailureShouldNotHaveDeadLock() throws Exception { BrokerService service = spy(pulsar.getBrokerService()); // create topic will fail to get managedLedgerConfig CompletableFuture failedManagedLedgerConfig = new CompletableFuture<>(); - failedManagedLedgerConfig.complete(null); + failedManagedLedgerConfig.complete(new ManagedLedgerConfig()); doReturn(failedManagedLedgerConfig).when(service).getManagedLedgerConfig(anyObject()); CompletableFuture topicCreation = new CompletableFuture(); @@ -818,7 +818,7 @@ public void testLedgerOpenFailureShouldNotHaveDeadLock() throws Exception { // create topic async and wait on the future completion executor.submit(() -> { - service.getTopic(deadLockTestTopic).thenAccept(topic -> topicCreation.complete(null)).exceptionally(e -> { + service.getOrCreateTopic(deadLockTestTopic).thenAccept(topic -> topicCreation.complete(null)).exceptionally(e -> { topicCreation.completeExceptionally(e.getCause()); return null; }); @@ -830,7 +830,7 @@ public void testLedgerOpenFailureShouldNotHaveDeadLock() throws Exception { } catch (TimeoutException | InterruptedException e) { fail("there is a dead-lock and it should have been prevented"); } catch (ExecutionException e) { - assertTrue(e.getCause() instanceof PersistenceException); + assertEquals(e.getCause().getClass(), PersistenceException.class); } finally { executor.shutdownNow(); ledgers.clear(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java index 714b74102fe74..5ccf65fd342ab 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java @@ -117,7 +117,7 @@ public void testPeerClusterTopicLookup(String protocol) throws Exception { // set peer-clusters : r3->r1 admin1.clusters().updatePeerClusterNames("r3", Sets.newLinkedHashSet(Lists.newArrayList("r1"))); Producer producer = client3.newProducer().topic(topic1).create(); - PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(topic1).get(); + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topic1).get(); assertNotNull(topic); pulsar1.getBrokerService().updateRates(); // get stats for topic1 using cluster-r3's admin3 @@ -132,7 +132,7 @@ public void testPeerClusterTopicLookup(String protocol) throws Exception { // set peer-clusters : r3->r2 admin2.clusters().updatePeerClusterNames("r3", Sets.newLinkedHashSet(Lists.newArrayList("r2"))); producer = client3.newProducer().topic(topic2).create(); - topic = (PersistentTopic) pulsar2.getBrokerService().getTopic(topic2).get(); + topic = (PersistentTopic) pulsar2.getBrokerService().getOrCreateTopic(topic2).get(); assertNotNull(topic); pulsar2.getBrokerService().updateRates(); // get stats for topic1 using cluster-r3's admin3 diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java index 08cd7f654f4ac..53c49d7bfaeba 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java @@ -148,7 +148,7 @@ public void testSimpleConsumerEventsWithoutPartition() throws Exception { verifyConsumerActive(listener1, -1); verifyConsumerInactive(listener2, -1); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); PersistentSubscription subRef = topicRef.getSubscription(subName); assertNotNull(topicRef); @@ -339,16 +339,16 @@ public void testSimpleConsumerEventsWithPartition() throws Exception { .subscribe(); PersistentTopic topicRef; - topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(destName.getPartition(0).toString()); + topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(destName.getPartition(0).toString()).get(); PersistentDispatcherSingleActiveConsumer disp0 = (PersistentDispatcherSingleActiveConsumer) topicRef .getSubscription(subName).getDispatcher(); - topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(destName.getPartition(1).toString()); + topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(destName.getPartition(1).toString()).get(); PersistentDispatcherSingleActiveConsumer disp1 = (PersistentDispatcherSingleActiveConsumer) topicRef .getSubscription(subName).getDispatcher(); - topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(destName.getPartition(2).toString()); + topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(destName.getPartition(2).toString()).get(); PersistentDispatcherSingleActiveConsumer disp2 = (PersistentDispatcherSingleActiveConsumer) topicRef .getSubscription(subName).getDispatcher(); - topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(destName.getPartition(3).toString()); + topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(destName.getPartition(3).toString()).get(); PersistentDispatcherSingleActiveConsumer disp3 = (PersistentDispatcherSingleActiveConsumer) topicRef .getSubscription(subName).getDispatcher(); @@ -540,7 +540,7 @@ public void testActiveConsumerFailoverWithDelay() throws Exception { // create subscription Consumer consumer = consumerBuilder1.subscribe(); consumer.close(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); PersistentSubscription subRef = topicRef.getSubscription(subName); // enqueue messages diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java index e5c3c7e786e8a..2bc152a2db450 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java @@ -88,7 +88,7 @@ public void testSimpleConsumerEvents() throws Exception { Consumer consumer1 = consumerBuilder.subscribe(); Consumer consumer2 = consumerBuilder.subscribe(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); PersistentSubscription subRef = topicRef.getSubscription(subName); assertNotNull(topicRef); @@ -358,7 +358,7 @@ public void testSharedSingleAckedNormalTopic() throws Exception { // 1. producer connect Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); @@ -424,7 +424,7 @@ public void testCancelReadRequestOnLastDisconnect() throws Exception { // 1. producer connect Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java index 86f2a4ac867bf..8bfd6ef00c300 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicConcurrentTest.java @@ -117,7 +117,7 @@ public void setup(Method m) throws Exception { // @Test public void testConcurrentTopicAndSubscriptionDelete() throws Exception { // create topic - final PersistentTopic topic = (PersistentTopic) brokerService.getTopic(successTopicName).get(); + final PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get(); PulsarApi.CommandSubscribe cmd = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1) .setTopic(successTopicName).setSubscription(successSubName).setRequestId(1) .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build(); @@ -175,7 +175,7 @@ public void run() { // @Test public void testConcurrentTopicGCAndSubscriptionDelete() throws Exception { // create topic - final PersistentTopic topic = (PersistentTopic) brokerService.getTopic(successTopicName).get(); + final PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get(); PulsarApi.CommandSubscribe cmd = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1) .setTopic(successTopicName).setSubscription(successSubName).setRequestId(1) .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build(); @@ -237,7 +237,7 @@ public void run() { // @Test public void testConcurrentTopicDeleteAndUnsubscribe() throws Exception { // create topic - final PersistentTopic topic = (PersistentTopic) brokerService.getTopic(successTopicName).get(); + final PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get(); PulsarApi.CommandSubscribe cmd = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1) .setTopic(successTopicName).setSubscription(successSubName).setRequestId(1) .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build(); @@ -295,7 +295,7 @@ public void run() { // @Test public void testConcurrentTopicDeleteAndSubsUnsubscribe() throws Exception { // create topic - final PersistentTopic topic = (PersistentTopic) brokerService.getTopic(successTopicName).get(); + final PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get(); PulsarApi.CommandSubscribe cmd = PulsarApi.CommandSubscribe.newBuilder().setConsumerId(1) .setTopic(successTopicName).setSubscription(successSubName).setRequestId(1) .setSubType(PulsarApi.CommandSubscribe.SubType.Exclusive).build(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java index 8b82c884a7b3d..e03c240d36b5c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java @@ -28,6 +28,7 @@ import java.lang.reflect.Field; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -94,7 +95,7 @@ public void testSimpleProducerEvents() throws Exception { // 1. producer connect Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); @@ -123,7 +124,7 @@ public void testSimpleConsumerEvents() throws Exception { // 1. client connect Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); PersistentSubscription subRef = topicRef.getSubscription(subName); assertNotNull(topicRef); @@ -209,7 +210,7 @@ public void testConsumerFlowControl() throws Exception { .receiverQueueSize(recvQueueSize).subscribe(); Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); PersistentSubscription subRef = topicRef.getSubscription(subName); assertNotNull(subRef); @@ -261,7 +262,7 @@ public void testActiveSubscriptionWithCache() throws Exception { consumer.acknowledge(msg); } - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); // (3) Get Entry cache ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger(); @@ -331,7 +332,7 @@ public Void call() throws Exception { barrier.await(); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); PersistentSubscription subRef = topicRef.getSubscription(subName); // 1. cumulatively all threads drain the backlog @@ -352,7 +353,7 @@ public void testGracefulClose() throws Exception { Producer producer = pulsarClient.newProducer().topic(topicName).create(); Thread.sleep(ASYNC_EVENT_COMPLETION_WAIT); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); ExecutorService executor = Executors.newCachedThreadPool(); @@ -414,7 +415,7 @@ public void testSimpleCloseTopic() throws Exception { Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); PersistentSubscription subRef = topicRef.getSubscription(subName); assertNotNull(subRef); @@ -431,7 +432,7 @@ public void testSimpleCloseTopic() throws Exception { consumer.close(); topicRef.close().get(); - assertNull(pulsar.getBrokerService().getTopicReference(topicName)); + assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); } @Test @@ -481,7 +482,7 @@ public void testTopicDeleteWithDisconnectedSubscription() throws Exception { // 1. client connect Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); PersistentSubscription subRef = topicRef.getSubscription(subName); assertNotNull(topicRef); @@ -541,28 +542,28 @@ public void testGC() throws Exception { Producer producer = pulsarClient.newProducer().topic(topicName).create(); producer.close(); - assertNotNull(pulsar.getBrokerService().getTopicReference(topicName)); + assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); runGC(); - assertNull(pulsar.getBrokerService().getTopicReference(topicName)); + assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); // 2. Topic is not GCed with live connection String subName = "sub1"; Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); runGC(); - assertNotNull(pulsar.getBrokerService().getTopicReference(topicName)); + assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); // 3. Topic with subscription is not GCed even with no connections consumer.close(); runGC(); - assertNotNull(pulsar.getBrokerService().getTopicReference(topicName)); + assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); // 4. Topic can be GCed after unsubscribe admin.persistentTopics().deleteSubscription(topicName, subName); runGC(); - assertNull(pulsar.getBrokerService().getTopicReference(topicName)); + assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); } /** @@ -606,7 +607,7 @@ public void testGcAndRetentionPolicy() throws Exception { admin.persistentTopics().deleteSubscription(topicName, subName); runGC(); - assertNull(pulsar.getBrokerService().getTopicReference(topicName)); + assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); } /** @@ -649,7 +650,7 @@ public void testInfiniteRetentionPolicy() throws Exception { admin.persistentTopics().deleteSubscription(topicName, subName); runGC(); - assertNull(pulsar.getBrokerService().getTopicReference(topicName)); + assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); } @Test @@ -666,7 +667,7 @@ public void testMessageExpiry() throws Exception { Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); PersistentSubscription subRef = topicRef.getSubscription(subName); consumer.close(); @@ -709,7 +710,7 @@ public void testMessageExpiryWithFewExpiredBacklog() throws Exception { pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); PersistentSubscription subRef = topicRef.getSubscription(subName); assertTrue(subRef.getDispatcher().isConsumerConnected()); @@ -744,7 +745,7 @@ public void testSubscriptionTypeTransitions() throws Exception { Consumer consumer2 = null; Consumer consumer3 = null; - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); PersistentSubscription subRef = topicRef.getSubscription(subName); // 1. shared consumer on an exclusive sub fails @@ -844,7 +845,7 @@ public void testProducerReturnedMessageId() throws Exception { // 1. producer connect Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); @@ -1020,7 +1021,7 @@ public void testCompression(CompressionType compressionType) throws Exception { .create(); Consumer consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("my-sub").subscribe(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); @@ -1193,7 +1194,7 @@ public void testMessageReplay() throws Exception { .subscriptionType(SubscriptionType.Shared).receiverQueueSize(1).subscribe(); Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); PersistentSubscription subRef = topicRef.getSubscription(subName); PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) subRef @@ -1271,4 +1272,27 @@ public void testCreateProducerWithSameName() throws Exception { p2.close(); } + + @Test + public void testGetOrCreateTopic() throws Exception { + String topicName = "persistent://prop/use/ns-abc/testGetOrCreateTopic"; + + admin.lookups().lookupTopic(topicName); + Topic topic = pulsar.getBrokerService().getOrCreateTopic(topicName).get(); + assertNotNull(topic); + + Optional t = pulsar.getBrokerService().getTopicReference(topicName); + assertTrue(t.isPresent()); + } + + @Test + public void testGetTopicIfExists() throws Exception { + String topicName = "persistent://prop/use/ns-abc/testGetTopicIfExists"; + admin.lookups().lookupTopic(topicName); + Optional topic = pulsar.getBrokerService().getTopicIfExists(topicName).join(); + assertFalse(topic.isPresent()); + + Optional t = pulsar.getBrokerService().getTopicReference(topicName); + assertFalse(t.isPresent()); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 41de66498009b..17d282e295568 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -35,6 +35,7 @@ import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -206,7 +207,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { }).when(mlFactoryMock).asyncOpen(anyString(), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), anyObject()); - CompletableFuture future = brokerService.getTopic(topicName).thenAccept(topic -> { + CompletableFuture future = brokerService.getOrCreateTopic(topicName).thenAccept(topic -> { assertTrue(topic.toString().contains(topicName)); }).exceptionally((t) -> { fail("should not fail"); @@ -237,7 +238,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { }).when(mlFactoryMock).asyncOpen(anyString(), any(ManagedLedgerConfig.class), any(OpenLedgerCallback.class), anyObject()); - CompletableFuture future = brokerService.getTopic(jinxedTopicName); + CompletableFuture future = brokerService.getOrCreateTopic(jinxedTopicName); // wait for completion try { @@ -713,15 +714,15 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable { @Test public void testDeleteTopic() throws Exception { // create topic - PersistentTopic topic = (PersistentTopic) brokerService.getTopic(successTopicName).get(); + PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get(); String role = "appid1"; // 1. delete inactive topic topic.delete().get(); - assertNull(brokerService.getTopicReference(successTopicName)); + assertFalse(brokerService.getTopicReference(successTopicName).isPresent()); // 2. delete topic with producer - topic = (PersistentTopic) brokerService.getTopic(successTopicName).get(); + topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get(); Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name", role, false, null, SchemaVersion.Latest); topic.addProducer(producer); @@ -744,7 +745,7 @@ public void testDeleteTopic() throws Exception { @Test public void testDeleteAndUnsubscribeTopic() throws Exception { // create topic - final PersistentTopic topic = (PersistentTopic) brokerService.getTopic(successTopicName).get(); + final PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get(); CommandSubscribe cmd = CommandSubscribe.newBuilder().setConsumerId(1).setTopic(successTopicName) .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build(); @@ -798,7 +799,7 @@ public void run() { // @Test public void testConcurrentTopicAndSubscriptionDelete() throws Exception { // create topic - final PersistentTopic topic = (PersistentTopic) brokerService.getTopic(successTopicName).get(); + final PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get(); CommandSubscribe cmd = CommandSubscribe.newBuilder().setConsumerId(1).setTopic(successTopicName) .setSubscription(successSubName).setRequestId(1).setSubType(SubType.Exclusive).build(); @@ -856,7 +857,7 @@ public void run() { @Test public void testDeleteTopicRaceConditions() throws Exception { - PersistentTopic topic = (PersistentTopic) brokerService.getTopic(successTopicName).get(); + PersistentTopic topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get(); // override ledger deletion callback to slow down deletion doAnswer(new Answer() { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index ac03dc500800a..bc51ed79c0c92 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -212,7 +212,7 @@ public void testConcurrentReplicator() throws Exception { Producer producer = client1.newProducer().topic(topicName.toString()).create(); producer.close(); - PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopic(topicName.toString()).get(); + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName.toString()).get(); PulsarClientImpl pulsarClient = spy((PulsarClientImpl) pulsar1.getBrokerService().getReplicationClient("r3")); final Method startRepl = PersistentTopic.class.getDeclaredMethod("startReplicator", String.class); @@ -488,7 +488,7 @@ public void testReplicatePeekAndSkip() throws Exception { // Produce from cluster1 and consume from the rest producer1.produce(2); producer1.close(); - PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()); + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); PersistentReplicator replicator = (PersistentReplicator) topic.getReplicators() .get(topic.getReplicators().keys().get(0)); replicator.skipMessages(2); @@ -513,7 +513,7 @@ public void testReplicatorClearBacklog() throws Exception { // Produce from cluster1 and consume from the rest producer1.produce(2); producer1.close(); - PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()); + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); PersistentReplicator replicator = (PersistentReplicator) spy( topic.getReplicators().get(topic.getReplicators().keys().get(0))); replicator.readEntriesFailed(new ManagedLedgerException.InvalidCursorPositionException("failed"), null); @@ -656,7 +656,7 @@ public void testDeleteReplicatorFailure() throws Exception { final String topicName = "persistent://pulsar/global/ns/repltopicbatch"; final TopicName dest = TopicName.get(topicName); MessageProducer producer1 = new MessageProducer(url1, dest); - PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName); + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName).get(); final String replicatorClusterName = topic.getReplicators().keys().get(0); ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger(); CountDownLatch latch = new CountDownLatch(1); @@ -697,7 +697,7 @@ public void testReplicatorProducerClosing() throws Exception { final String topicName = "persistent://pulsar/global/ns/repltopicbatch"; final TopicName dest = TopicName.get(topicName); MessageProducer producer1 = new MessageProducer(url1, dest); - PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName); + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(topicName).get(); final String replicatorClusterName = topic.getReplicators().keys().get(0); Replicator replicator = topic.getPersistentReplicator(replicatorClusterName); pulsar2.close(); @@ -741,7 +741,7 @@ public void testResumptionAfterBacklogRelaxed() throws Exception { MessageConsumer consumer2 = new MessageConsumer(url2, dest); // Replicator for r1 -> r2 - PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()); + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); Replicator replicator = topic.getPersistentReplicator("r2"); // Produce 1 message in r1. This message will be replicated immediately into r2 and it will become part of @@ -802,7 +802,7 @@ public void testCloseReplicatorStartProducer() throws Exception { MessageConsumer consumer2 = new MessageConsumer(url2, dest); // Replicator for r1 -> r2 - PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()); + PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getTopicReference(dest.toString()).get(); PersistentReplicator replicator = (PersistentReplicator) topic.getPersistentReplicator("r2"); // close the cursor @@ -901,7 +901,7 @@ public void testReplicatorOnPartitionedTopic(boolean isPartitionedTopic) throws // persistent topic test try { - brokerService.getTopic(persistentTopicName).get(); + brokerService.getOrCreateTopic(persistentTopicName).get(); if (isPartitionedTopic) { fail("Topic creation fails with partitioned topic as replicator init fails"); } @@ -914,7 +914,7 @@ public void testReplicatorOnPartitionedTopic(boolean isPartitionedTopic) throws // non-persistent topic test try { - brokerService.getTopic(nonPersistentTopicName).get(); + brokerService.getOrCreateTopic(nonPersistentTopicName).get(); if (isPartitionedTopic) { fail("Topic creation fails with partitioned topic as replicator init fails"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java index ae414746f976b..2a590eb953666 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ResendRequestTest.java @@ -75,7 +75,7 @@ public void testExclusiveSingleAckedNormalTopic() throws Exception { // 1. producer connect Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); @@ -160,7 +160,7 @@ public void testSharedSingleAckedNormalTopic() throws Exception { final int totalMessages = 10; // 1. producer connect Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); @@ -240,7 +240,7 @@ public void testFailoverSingleAckedNormalTopic() throws Exception { final int totalMessages = 10; // 1. producer connect Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); @@ -357,7 +357,7 @@ public void testExclusiveCumulativeAckedNormalTopic() throws Exception { // 1. producer connect Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); @@ -646,7 +646,7 @@ public void testFailoverInactiveConsumer() throws Exception { final int totalMessages = 10; // 1. producer connect Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java index fe933bbb30eab..db1699d15ce18 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java @@ -37,10 +37,12 @@ import com.google.common.collect.Maps; import com.google.protobuf.ByteString; + import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; + import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; @@ -49,7 +51,9 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; + import javax.naming.AuthenticationException; + import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback; @@ -61,7 +65,6 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.PositionImpl; -import org.apache.bookkeeper.tools.cli.helpers.Command; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; @@ -381,7 +384,7 @@ public void testProducerCommand() throws Exception { channel.writeInbound(clientCommand); assertTrue(getResponse() instanceof CommandProducerSuccess); - PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName); + PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); @@ -392,7 +395,7 @@ public void testProducerCommand() throws Exception { channel.writeInbound(clientCommand); assertTrue(getResponse() instanceof CommandError); - assertNull(brokerService.getTopicReference(failTopicName)); + assertFalse(pulsar.getBrokerService().getTopicReference(failTopicName).isPresent()); channel.finish(); assertEquals(topicRef.getProducers().size(), 0); @@ -404,7 +407,7 @@ public void testDuplicateConcurrentProducerCommand() throws Exception { setChannelConnected(); CompletableFuture delayFuture = new CompletableFuture<>(); - doReturn(delayFuture).when(brokerService).getTopic(any(String.class)); + doReturn(delayFuture).when(brokerService).getOrCreateTopic(any(String.class)); // Create producer first time ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */, "prod-name", Collections.emptyMap()); @@ -440,7 +443,7 @@ public void testProducerOnNotOwnedTopic() throws Exception { CommandError errorResponse = (CommandError) response; assertEquals(errorResponse.getError(), ServerError.ServiceNotReady); - assertNull(brokerService.getTopicReference(nonOwnedTopicName)); + assertFalse(pulsar.getBrokerService().getTopicReference(nonOwnedTopicName).isPresent()); channel.finish(); } @@ -461,7 +464,7 @@ public void testProducerCommandWithAuthorizationPositive() throws Exception { channel.writeInbound(clientCommand); assertEquals(getResponse().getClass(), CommandProducerSuccess.class); - PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName); + PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); @@ -554,7 +557,7 @@ public void testNonExistentTopicSuperUserAccess() throws Exception { channel.writeInbound(newProducerCmd); assertTrue(getResponse() instanceof CommandProducerSuccess); - PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(nonExistentTopicName); + PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(nonExistentTopicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); channel.finish(); @@ -565,7 +568,7 @@ public void testNonExistentTopicSuperUserAccess() throws Exception { ByteBuf newSubscribeCmd = Commands.newSubscribe(nonExistentTopicName, // successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, "test" /* consumer name */); channel.writeInbound(newSubscribeCmd); - topicRef = (PersistentTopic) brokerService.getTopicReference(nonExistentTopicName); + topicRef = (PersistentTopic) brokerService.getTopicReference(nonExistentTopicName).get(); assertNotNull(topicRef); assertTrue(topicRef.getSubscriptions().containsKey(successSubName)); assertTrue(topicRef.getSubscription(successSubName).getDispatcher().isConsumerConnected()); @@ -704,7 +707,7 @@ public void testDuplicateConcurrentSubscribeCommand() throws Exception { setChannelConnected(); CompletableFuture delayFuture = new CompletableFuture<>(); - doReturn(delayFuture).when(brokerService).getTopic(any(String.class)); + doReturn(delayFuture).when(brokerService).getOrCreateTopic(any(String.class)); // Create subscriber first time ByteBuf clientCommand = Commands.newSubscribe(successTopicName, // successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0, "test" /* consumer name */); @@ -1098,7 +1101,7 @@ public void testSubscribeCommand() throws Exception { channel.writeInbound(clientCommand); assertTrue(getResponse() instanceof CommandSuccess); - PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName); + PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName).get(); assertNotNull(topicRef); assertTrue(topicRef.getSubscriptions().containsKey(successSubName)); @@ -1142,7 +1145,7 @@ public void testUnsupportedBatchMsgSubscribeCommand() throws Exception { channel.writeInbound(clientCommand); assertTrue(getResponse() instanceof CommandSuccess); - PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName); + PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(successTopicName).get(); topicRef.markBatchMessagePublished(); // test SUBSCRIBE on topic and cursor creation success @@ -1264,7 +1267,7 @@ public void testProducerSuccessOnEncryptionRequiredTopic() throws Exception { Object response = getResponse(); assertEquals(response.getClass(), CommandProducerSuccess.class); - PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(encryptionRequiredTopicName); + PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(encryptionRequiredTopicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); @@ -1294,7 +1297,7 @@ public void testProducerFailureOnEncryptionRequiredTopic() throws Exception { assertEquals(response.getClass(), CommandError.class); CommandError errorResponse = (CommandError) response; assertEquals(errorResponse.getError(), ServerError.MetadataError); - PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(encryptionRequiredTopicName); + PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(encryptionRequiredTopicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java index 284e6c6ee642a..a66f57ef4dd12 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SubscriptionSeekTest.java @@ -60,7 +60,7 @@ public void testSeek() throws Exception { org.apache.pulsar.client.api.Consumer consumer = pulsarClient.newConsumer().topic(topicName) .subscriptionName("my-subscription").receiverQueueSize(0).subscribe(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); assertEquals(topicRef.getProducers().size(), 1); assertEquals(topicRef.getSubscriptions().size(), 1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java index 94c3762e89df7..e3aac46e9a8d3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ChecksumTest.java @@ -61,7 +61,7 @@ public void verifyChecksumStoredInManagedLedger() throws Exception { Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); ManagedLedger ledger = topic.getManagedLedger(); ManagedCursor cursor = ledger.openCursor("test"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java index 547c1b0ef04e6..696e21fbdf386 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/DispatcherBlockConsumerTest.java @@ -535,7 +535,7 @@ public void testBlockDispatcherStats() throws Exception { .subscriptionType(SubscriptionType.Shared).subscribe(); Thread.sleep(timeWaitToSync); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); rolloverPerIntervalStats(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java index f9f346f35c7a6..7bd6997acbc18 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MessageDispatchThrottlingTest.java @@ -108,7 +108,7 @@ public void testMessageRateDynamicallyChange() throws Exception { admin.namespaces().createNamespace(namespace); // create producer and topic Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); // (1) verify message-rate is -1 initially Assert.assertEquals(topic.getDispatchRateLimiter().getDispatchRateOnMsg(), -1); @@ -178,7 +178,7 @@ public void testMessageRateLimitingNotReceiveAllMessages(SubscriptionType subscr admin.namespaces().setDispatchRate(namespace, dispatchRate); // create producer and topic Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); boolean isMessageRateUpdate = false; int retry = 5; for (int i = 0; i < retry; i++) { @@ -254,7 +254,7 @@ public void testClusterMsgByteRateLimitingClusterConfig() throws Exception { admin.namespaces().createNamespace(namespace); // create producer and topic Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); int numMessages = 500; final AtomicInteger totalReceived = new AtomicInteger(0); @@ -314,7 +314,7 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio admin.namespaces().setDispatchRate(namespace, dispatchRate); // create producer and topic Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); boolean isMessageRateUpdate = false; int retry = 5; for (int i = 0; i < retry; i++) { @@ -385,7 +385,7 @@ public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionT admin.namespaces().setDispatchRate(namespace, dispatchRate); // create producer and topic Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); boolean isMessageRateUpdate = false; int retry = 5; for (int i = 0; i < retry; i++) { @@ -448,7 +448,7 @@ public void testRateLimitingMultipleConsumers() throws Exception { admin.namespaces().setDispatchRate(namespace, dispatchRate); // create producer and topic Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); boolean isMessageRateUpdate = false; int retry = 5; for (int i = 0; i < retry; i++) { @@ -528,7 +528,7 @@ public void testClusterRateLimitingConfiguration(SubscriptionType subscription) admin.namespaces().createNamespace(namespace); // create producer and topic Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); int numMessages = 500; final AtomicInteger totalReceived = new AtomicInteger(0); @@ -581,7 +581,7 @@ public void testMessageByteRateThrottlingCombined(SubscriptionType subscription) admin.namespaces().setDispatchRate(namespace, dispatchRate); // create producer and topic Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); boolean isMessageRateUpdate = false; int retry = 5; for (int i = 0; i < retry; i++) { @@ -658,7 +658,7 @@ public void testGlobalNamespaceThrottling() throws Exception { // create producer and topic Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); boolean isMessageRateUpdate = false; int retry = 5; for (int i = 0; i < retry; i++) { @@ -727,7 +727,7 @@ public void testNonBacklogConsumerWithThrottlingEnabled(SubscriptionType subscri Boolean.TRUE.toString()); // create producer and topic Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); boolean isUpdated = false; int retry = 5; for (int i = 0; i < retry; i++) { @@ -810,7 +810,7 @@ public void testClusterPolicyOverrideConfiguration() throws Exception { admin.namespaces().createNamespace(namespace); // create producer and topic Producer producer = pulsarClient.newProducer().topic(topicName1).create(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName1).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName1).get(); // (1) Update dispatch rate on cluster-config update Assert.assertEquals(clusterMessageRate, topic.getDispatchRateLimiter().getDispatchRateOnMsg()); @@ -838,7 +838,7 @@ public void testClusterPolicyOverrideConfiguration() throws Exception { // (5) Namespace throttling is disabled so, new topic should take cluster throttling limit Producer producer2 = pulsarClient.newProducer().topic(topicName2).create(); - PersistentTopic topic2 = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName2).get(); + PersistentTopic topic2 = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName2).get(); Assert.assertEquals(clusterMessageRate, topic2.getDispatchRateLimiter().getDispatchRateOnMsg()); producer.close(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index 02fe3225c7105..d18c3d2d715f5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -426,7 +426,7 @@ public void testTopicStats() throws Exception { .subscriptionType(SubscriptionType.Shared).subscriptionName(subName).subscribe(); Thread.sleep(timeWaitToSync); - NonPersistentTopic topicRef = (NonPersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); + NonPersistentTopic topicRef = (NonPersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get(); assertNotNull(topicRef); rolloverPerIntervalStats(pulsar); @@ -501,7 +501,7 @@ public void testReplicator() throws Exception { // Replicator for r1 -> r2,r3 NonPersistentTopic topicRef = (NonPersistentTopic) replication.pulsar1.getBrokerService() - .getTopicReference(globalTopicName); + .getTopicReference(globalTopicName).get(); NonPersistentReplicator replicatorR2 = (NonPersistentReplicator) topicRef.getPersistentReplicator("r2"); NonPersistentReplicator replicatorR3 = (NonPersistentReplicator) topicRef.getPersistentReplicator("r3"); assertNotNull(topicRef); @@ -657,8 +657,7 @@ public void testLoadManagerAssignmentForNonPersistentTestAssignment(String loadM } catch (Exception e) { // Ok } - NonPersistentTopic topicRef = (NonPersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); - assertNull(topicRef); + assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); } finally { conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic); @@ -692,8 +691,8 @@ public void testNonPersistentTopicUnderPersistentNamespace() throws Exception { } catch (Exception e) { // Ok } - NonPersistentTopic topicRef = (NonPersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); - assertNull(topicRef); + + assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); } finally { conf.setEnableNonPersistentTopics(defaultENableNonPersistentTopic); } @@ -747,8 +746,8 @@ public void testNonPersistentBrokerModeRejectPersistentTopic(String loadManagerN } catch (Exception e) { // Ok } - NonPersistentTopic topicRef = (NonPersistentTopic) pulsar.getBrokerService().getTopicReference(topicName); - assertNull(topicRef); + + assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent()); } finally { conf.setEnablePersistentTopics(defaultEnablePersistentTopic); @@ -795,7 +794,7 @@ public void testMsgDropStat() throws Exception { } latch.await(); - NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + NonPersistentTopic topic = (NonPersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); pulsar.getBrokerService().updateRates(); NonPersistentTopicStats stats = topic.getStats(); NonPersistentPublisherStats npStats = stats.getPublishers().get(0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 923e65df9d5e7..b7c2fd4b0606a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -669,7 +669,7 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception { } Producer producer = producerBuilder.create(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger(); Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache"); cacheField.setAccessible(true); @@ -771,7 +771,7 @@ public void testDeactivatingBacklogConsumer() throws Exception { } Producer producer = producerBuilder.create(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger(); // reflection to set/get cache-backlog fields value: diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java index 2ed6e11f9eabc..223a40b49e04c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java @@ -62,7 +62,7 @@ public void testMessageRateLimitingNotReceiveAllMessages(SubscriptionType subscr admin.namespaces().setSubscriptionDispatchRate(namespace, dispatchRate); // create producer, topic and consumer Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); // enable throttling for nonBacklog consumers conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true); @@ -158,7 +158,7 @@ public void testMessageRateLimitingReceiveAllMessagesAfterThrottling(Subscriptio }).subscribe(); Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); DispatchRateLimiter subRateLimiter = null; Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher(); @@ -244,7 +244,7 @@ public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionT }).subscribe(); Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); DispatchRateLimiter subRateLimiter = null; Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher(); @@ -327,7 +327,7 @@ public void testRateLimitingMultipleConsumers() throws Exception { Consumer consumer5 = consumerBuilder.subscribe(); Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); DispatchRateLimiter subRateLimiter = null; Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher(); @@ -404,7 +404,7 @@ public void testClusterRateLimitingConfiguration(SubscriptionType subscription) admin.namespaces().createNamespace(namespace); // create producer and topic Producer producer = pulsarClient.newProducer().topic(topicName).create(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); int numMessages = 500; final AtomicInteger totalReceived = new AtomicInteger(0); @@ -478,7 +478,7 @@ public void testClusterPolicyOverrideConfiguration() throws Exception { // create producer and topic Producer producer = pulsarClient.newProducer().topic(topicName1).create(); - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName1).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName1).get(); Consumer consumer1 = pulsarClient.newConsumer().topic(topicName1).subscriptionName(subName1) .subscribe(); @@ -519,7 +519,7 @@ public void testClusterPolicyOverrideConfiguration() throws Exception { // (5) Namespace throttling is disabled so, new topic should take cluster throttling limit Producer producer2 = pulsarClient.newProducer().topic(topicName2).create(); - PersistentTopic topic2 = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName2).get(); + PersistentTopic topic2 = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName2).get(); Consumer consumer2 = pulsarClient.newConsumer().topic(topicName2).subscriptionName(subName2) .subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java index 64df6a41bed1d..74491d7a475e6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java @@ -713,7 +713,7 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception { final String topic = "persistent://my-property/use/my-ns/" + topicName; Producer producer = pulsarClient.createProducer(topic, producerConf); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger(); Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache"); cacheField.setAccessible(true); @@ -816,7 +816,7 @@ public void testDeactivatingBacklogConsumer() throws Exception { Consumer subscriber2 = pulsarClient.subscribe("persistent://my-property/use/my-ns/" + topicName, sub2, conf); Producer producer = pulsarClient.createProducer(topic, producerConf); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); ManagedLedgerImpl ledger = (ManagedLedgerImpl) topicRef.getManagedLedger(); // reflection to set/get cache-backlog fields value: diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java index 79e25130f810e..584fc86da7de1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/BrokerClientIntegrationTest.java @@ -304,7 +304,7 @@ public void testUnsupportedBatchMessageConsumer(SubscriptionType subType) throws .batchingMaxPublishDelay(batchMessageDelayMs, TimeUnit.MILLISECONDS).batchingMaxMessages(20).create(); // update consumer's version to incompatible batch-message version = Version.V3 - Topic topic = pulsar.getBrokerService().getTopic(topicName).get(); + Topic topic = pulsar.getBrokerService().getOrCreateTopic(topicName).get(); org.apache.pulsar.broker.service.Consumer brokerConsumer = topic.getSubscriptions().get(subscriptionName) .getConsumers().get(0); Field cnxField = org.apache.pulsar.broker.service.Consumer.class.getDeclaredField("cnx"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java index 990bcaefa56ac..971a660863c90 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java @@ -314,7 +314,7 @@ public void testAcknowledgeWithProperties() throws Exception { properties.put("foobar", 0xdeadbeefdecaL); reader.acknowledgeCumulativeAsync(lastMessageId, properties).get(); - PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic); + PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic).get(); ManagedLedger ledger = topicRef.getManagedLedger(); for (int i = 0; i < 30; i++) { if (ledger.openCursor(subscription).getProperties().get("foobar") == Long.valueOf(0xdeadbeefdecaL)) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java index eb5d95cb849a7..074571500a854 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/stats/client/PulsarBrokerStatsClientTest.java @@ -122,7 +122,7 @@ public void testTopicInternalStats() throws Exception { } } - PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName).get(); + PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get(); PersistentTopicInternalStats internalStats = topic.getInternalStats(); CursorStats cursor = internalStats.cursors.get(subscriptionName); assertEquals(cursor.numberOfEntriesSinceFirstNotAckedMessage, numberOfMsgs);