diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index 837f073b00dba..e6055c71f79e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -158,6 +158,8 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener activeRateLimiters; + private volatile TopicCacheCleanupFunction cleanupFunction; + public AbstractTopic(String topic, BrokerService brokerService) { this.topic = topic; this.brokerService = brokerService; @@ -1250,6 +1252,23 @@ public HierarchyTopicPolicies getHierarchyTopicPolicies() { return topicPolicies; } + @Override + public void registerTopicCacheCleanupFunction(TopicCacheCleanupFunction cleanupFunction) { + if (this.cleanupFunction != null) { + log.warn("Topic {} has already been cached. It should have been removed before re-adding.", topic); + } + this.cleanupFunction = cleanupFunction; + } + + @Override + public void cleanupTopicCache(CompletableFuture> topicFuture) { + TopicCacheCleanupFunction c = this.cleanupFunction; + this.cleanupFunction = null; + if (c != null) { + c.cleanup(topicFuture); + } + } + public void updateBrokerSubscriptionDispatchRate() { topicPolicies.getSubscriptionDispatchRate().updateBrokerValue( subscriptionDispatchRateInBroker(brokerService.pulsar().getConfiguration())); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 4077762bb0640..7125a106b3820 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -711,6 +711,9 @@ public CompletableFuture closeAsync() { // unloads all namespaces gracefully without disrupting mutually unloadNamespaceBundlesGracefully(); + // fail all pending topic loading requests + failPendingTopics(); + // close replication clients replicationClients.forEach((cluster, client) -> { try { @@ -1200,12 +1203,11 @@ private CompletableFuture> createNonPersistentTopic(String topic long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs; pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic); - topicFuture.complete(Optional.of(nonPersistentTopic)); + completeTopicFuture(topicFuture, nonPersistentTopic); }).exceptionally(ex -> { log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause()); nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> { - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(ex); + failTopicFuture(topic, topicFuture, ex); }); return null; }); @@ -1217,7 +1219,7 @@ private CompletableFuture> createNonPersistentTopic(String topic // so we should add checkTopicNsOwnership logic otherwise the topic will be created // if it dont own by this broker,we should return success // otherwise it will keep retrying getPartitionedTopicMetadata - topicFuture.complete(Optional.of(nonPersistentTopic)); + completeTopicFuture(topicFuture, nonPersistentTopic); // after get metadata return success, we should delete this topic from this broker, because this topic not // owner by this broker and it don't initialize and checkReplication pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); @@ -1455,7 +1457,7 @@ public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional c protected CompletableFuture> loadOrCreatePersistentTopic(final String topic, boolean createIfMissing, Map properties, @Nullable TopicPolicies topicPolicies) { final CompletableFuture> topicFuture = FutureUtil.createFutureWithTimeout( - Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(), + Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), pulsar().getExecutor(), () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION); if (!pulsar.getConfiguration().isEnablePersistentTopics()) { if (log.isDebugEnabled()) { @@ -1468,6 +1470,15 @@ protected CompletableFuture> loadOrCreatePersistentTopic(final S checkTopicNsOwnership(topic) .thenRun(() -> { + if (topicFuture.isDone()) { + log.warn("Topic future for topic {} is done. Skipping loading.", topic); + return; + } + if (!pulsar().isRunning()) { + topicFuture.completeExceptionally(new ServiceUnitNotReadyException("Broker is shutting down")); + return; + } + final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); if (topicLoadSemaphore.tryAcquire()) { @@ -1530,8 +1541,16 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean CompletableFuture> topicFuture, Map properties, @Nullable TopicPolicies topicPolicies) { TopicName topicName = TopicName.get(topic); + if (topicFuture.isDone()) { + log.warn("Topic future for topic {} is done. Skipping creating.", topicName); + return; + } pulsar.getNamespaceService().isServiceUnitActiveAsync(topicName) .thenAccept(isActive -> { + if (topicFuture.isDone()) { + log.warn("Topic future for topic {} is done. Skipping creating.", topicName); + return; + } if (isActive) { CompletableFuture> propertiesFuture; if (properties == null) { @@ -1546,19 +1565,17 @@ private void checkOwnershipAndCreatePersistentTopic(final String topic, boolean finalProperties, topicPolicies) ).exceptionally(throwable -> { log.warn("[{}] Read topic property failed", topic, throwable); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(throwable); + failTopicFuture(topic, topicFuture, throwable); return null; }); } else { // namespace is being unloaded String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); + failTopicFuture(topic, topicFuture, new ServiceUnitNotReadyException(msg)); } }).exceptionally(ex -> { - topicFuture.completeExceptionally(ex); + failTopicFuture(topic, topicFuture, ex); return null; }); } @@ -1574,6 +1591,10 @@ public void createPersistentTopic0(final String topic, boolean createIfMissing, private void createPersistentTopic(final String topic, boolean createIfMissing, CompletableFuture> topicFuture, Map properties, @Nullable TopicPolicies topicPolicies) { + if (topicFuture.isDone()) { + return; + } + TopicName topicName = TopicName.get(topic); final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); @@ -1585,8 +1606,7 @@ private void createPersistentTopic(final String topic, boolean createIfMissing, if (isTransactionInternalName(topicName)) { String msg = String.format("Can not create transaction system topic %s", topic); log.warn(msg); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new NotAllowedException(msg)); + failTopicFuture(topic, topicFuture, new NotAllowedException(msg)); return; } @@ -1675,7 +1695,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { }); } else { addTopicToStatsMaps(topicName, persistentTopic); - topicFuture.complete(Optional.of(persistentTopic)); + completeTopicFuture(topicFuture, persistentTopic); } }) .exceptionally((ex) -> { @@ -1694,8 +1714,7 @@ public void openLedgerComplete(ManagedLedger ledger, Object ctx) { }); } catch (PulsarServerException e) { log.warn("Failed to create topic {}: {}", topic, e.getMessage()); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(e); + failTopicFuture(topic, topicFuture, e); } } @@ -1707,8 +1726,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { topicFuture.complete(Optional.empty()); } else { log.warn("Failed to create topic {}", topic, exception); - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(new PersistenceException(exception)); + failTopicFuture(topic, topicFuture, new PersistenceException(exception)); } } }, () -> isTopicNsOwnedByBrokerAsync(topicName), null); @@ -1717,8 +1735,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception); // remove topic from topics-map in different thread to avoid possible deadlock if // createPersistentTopic-thread only tries to handle this future-result - pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); - topicFuture.completeExceptionally(exception); + failTopicFuture(topic, topicFuture, exception); return null; }); } @@ -1737,6 +1754,32 @@ private CompletableFuture checkTopicAlreadyMigrated(TopicName topicName) { return result; } + private void failTopicFuture(String topic, CompletableFuture> topicFuture, Throwable exception) { + if (topicFuture.completeExceptionally(exception)) { + pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); + } else { + if (topicFuture.isDone() && !topicFuture.isCompletedExceptionally()) { + topicFuture.join().ifPresent(topicObject -> { + log.warn( + "Topic future for topic {} was already completed successfully. Another exception happened. " + + "Closing the topic to ensure consistency.", + topic, exception); + topicObject.close(); + }); + } + } + } + + private static void completeTopicFuture(CompletableFuture> topicFuture, Topic topic) { + if (!topicFuture.complete(Optional.of(topic))) { + log.warn( + "Failed to complete future for topic {}, it was already completed (failed={}). " + + "Closing the topic to ensure consistency.", + topic.getName(), topicFuture.isCompletedExceptionally()); + topic.close(); + } + } + public CompletableFuture getManagedLedgerConfig(@Nonnull TopicName topicName) { final CompletableFuture> topicPoliciesFuture = getTopicPoliciesBypassSystemTopic(topicName); @@ -1907,7 +1950,8 @@ private void addTopicToStatsMaps(TopicName topicName, Topic topic) { .thenAccept(namespaceBundle -> { if (namespaceBundle != null) { synchronized (multiLayerTopicsMap) { - String serviceUnit = namespaceBundle.toString(); + final String serviceUnit = namespaceBundle.toString(); + final String topicNameString = topicName.toString(); multiLayerTopicsMap // .computeIfAbsent(topicName.getNamespace(), k -> ConcurrentOpenHashMap. ConcurrentOpenHashMap.newBuilder().build()) // - .put(topicName.toString(), topic); + .put(topicNameString, topic); + topic.registerTopicCacheCleanupFunction(topicFuture -> + removeTopicFromCache(topicNameString, serviceUnit, topicFuture)); } } invalidateOfflineTopicStatCache(topicName); @@ -1935,6 +1981,7 @@ public void refreshTopicToStatsMaps(NamespaceBundle oldBundle) { if (!isEmpty(topics)) { // add topic under new split bundles which already updated into NamespaceBundleFactory.bundleCache topics.stream().forEach(t -> { + t.cleanupTopicCache(null); addTopicToStatsMaps(TopicName.get(t.getName()), t); }); // remove old bundle from the map @@ -2195,7 +2242,7 @@ public void cleanUnloadedTopicFromCache(NamespaceBundle serviceUnit) { TopicName topicName = TopicName.get(topic); if (serviceUnit.includes(topicName) && getTopicReference(topic).isPresent()) { log.info("[{}][{}] Clean unloaded topic from cache.", serviceUnit.toString(), topic); - pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), serviceUnit, null); + pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), serviceUnit.toString(), null); } } } @@ -2205,11 +2252,8 @@ public AuthorizationService getAuthorizationService() { } public CompletableFuture removeTopicFromCache(Topic topic) { - Optional>> createTopicFuture = findTopicFutureInCache(topic); - if (createTopicFuture.isEmpty()){ - return CompletableFuture.completedFuture(null); - } - return removeTopicFutureFromCache(topic.getName(), createTopicFuture.get()); + topic.cleanupTopicCache(findTopicFutureInCache(topic).orElse(null)); + return CompletableFuture.completedFuture(null); } private Optional>> findTopicFutureInCache(Topic topic){ @@ -2239,18 +2283,8 @@ private Optional>> findTopicFutureInCache(Topi } } - private CompletableFuture removeTopicFutureFromCache(String topic, - CompletableFuture> createTopicFuture) { - TopicName topicName = TopicName.get(topic); - return pulsar.getNamespaceService().getBundleAsync(topicName) - .thenAccept(namespaceBundle -> { - removeTopicFromCache(topic, namespaceBundle, createTopicFuture); - }); - } - - private void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle, - CompletableFuture> createTopicFuture) { - String bundleName = namespaceBundle.toString(); + private void removeTopicFromCache(String topic, + String bundleName, CompletableFuture> createTopicFuture) { String namespaceName = TopicName.get(topic).getNamespaceObject().toString(); topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, EventStage.BEFORE); @@ -2279,16 +2313,18 @@ private void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle, } } - if (createTopicFuture == null) { - topics.remove(topic); - } else { + if (createTopicFuture != null) { topics.remove(topic, createTopicFuture); + } else { + topics.remove(topic); } + // Remove topic from compactor, there is a possibility for a race condition here Compactor compactor = pulsar.getNullableCompactor(); if (compactor != null) { compactor.getStats().removeTopic(topic); } + topicEventsDispatcher.notify(topic, TopicEvent.UNLOAD, EventStage.SUCCESS); } @@ -2972,7 +3008,13 @@ private ConcurrentOpenHashMap getRuntimeConfigurationMap() { * permit if it was successful to acquire it. */ private void createPendingLoadTopic() { - TopicLoadingContext pendingTopic = pendingTopicLoadingQueue.poll(); + if (!pulsar().isRunning()) { + log.warn("Pulsar is not running, skip create pending topic"); + failPendingTopics(); + return; + } + + TopicLoadingContext pendingTopic = getNextPendingTopic(); if (pendingTopic == null) { return; } @@ -2996,14 +3038,35 @@ private void createPendingLoadTopic() { }); }).exceptionally(e -> { log.error("Failed to create pending topic {}", topic, e); - pendingTopic.getTopicFuture() - .completeExceptionally((e instanceof RuntimeException && e.getCause() != null) ? e.getCause() : e); + Throwable cause = (e instanceof RuntimeException && e.getCause() != null) ? e.getCause() : e; + failTopicFuture(topic, pendingTopic.getTopicFuture(), cause); // schedule to process next pending topic inactivityMonitor.schedule(this::createPendingLoadTopic, 100, TimeUnit.MILLISECONDS); return null; }); } + private void failPendingTopics() { + for (TopicLoadingContext topicLoadingContext = getNextPendingTopic(); topicLoadingContext != null; + topicLoadingContext = getNextPendingTopic()) { + topicLoadingContext.getTopicFuture() + .completeExceptionally(new NotAllowedException("Broker is shutting down")); + } + } + + private TopicLoadingContext getNextPendingTopic() { + while (true) { + TopicLoadingContext pendingTopic = pendingTopicLoadingQueue.poll(); + if (pendingTopic == null) { + return null; + } + // check that the topic future is not completed + if (!pendingTopic.getTopicFuture().isDone()) { + return pendingTopic; + } + } + } + public CompletableFuture fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync( TopicName topicName) { if (pulsar.getNamespaceService() == null) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java index 343aef09c1c55..1600d08e1760b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java @@ -360,4 +360,18 @@ default boolean isSystemTopic() { */ HierarchyTopicPolicies getHierarchyTopicPolicies(); + + /** + * Closes the topic and releases resources. + * @return a future that is completed when the topic is closed + */ + CompletableFuture close(); + + void registerTopicCacheCleanupFunction(TopicCacheCleanupFunction cleanupFunction); + + void cleanupTopicCache(CompletableFuture> topicFuture); + + interface TopicCacheCleanupFunction { + void cleanup(CompletableFuture> topicFuture); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 00cf3a6583b9a..e32d315b1dea4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -1261,4 +1261,9 @@ protected boolean isMigrated() { public boolean isPersistent() { return false; } + + @Override + public CompletableFuture close() { + return deleteForcefully(); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 48069cf555448..d21d669c0d2e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1460,6 +1460,7 @@ public void deleteLedgerComplete(Object ctx) { } + @Override public CompletableFuture close() { return close(true, false); }