diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java index fbf11f1d0ad62..f25dfef966bfd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java @@ -486,8 +486,18 @@ private PublishRate publishRateInBroker(ServiceConfiguration config) { return new PublishRate(config.getMaxPublishRatePerTopicInMessages(), config.getMaxPublishRatePerTopicInBytes()); } + public boolean isProducersExceeded(String producerName) { + String replicatorPrefix = brokerService.getPulsar().getConfig().getReplicatorPrefix() + "."; + boolean isRemote = producerName.startsWith(replicatorPrefix); + return isProducersExceeded(isRemote); + } + protected boolean isProducersExceeded(Producer producer) { - if (isSystemTopic() || producer.isRemote()) { + return isProducersExceeded(producer.isRemote()); + } + + protected boolean isProducersExceeded(boolean isRemote) { + if (isSystemTopic() || isRemote) { return false; } Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get(); @@ -536,7 +546,7 @@ public int getNumberOfSameAddressProducers(final String clientAddress) { return count; } - protected boolean isConsumersExceededOnTopic() { + public boolean isConsumersExceededOnTopic() { if (isSystemTopic()) { return false; } @@ -973,12 +983,6 @@ protected void checkTopicFenced() throws BrokerServiceException { } protected CompletableFuture internalAddProducer(Producer producer) { - if (isProducersExceeded(producer)) { - log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); - return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException( - "Topic '" + topic + "' reached max producers limit")); - } - if (isSameAddressProducersExceeded(producer)) { log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic); return CompletableFuture.failedFuture(new BrokerServiceException.ProducerBusyException( diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index 6690ab4af5fd1..5df276e8f3dd5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1307,6 +1307,16 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { "Topic " + topicName + " does not exist")); } final Topic topic = optTopic.get(); + // Check max consumer limitation to avoid unnecessary ops wasting resources. For example: + // the new consumer reached max producer limitation, but pulsar did schema check first, + // it would waste CPU. + if (((AbstractTopic) topic).isConsumersExceededOnTopic()) { + log.warn("[{}] Attempting to add consumer to topic which reached max" + + " consumers limit", topic); + Throwable t = + new ConsumerBusyException("Topic reached max consumers limit"); + return FutureUtil.failedFuture(t); + } return service.isAllowAutoSubscriptionCreationAsync(topicName) .thenCompose(isAllowedAutoSubscriptionCreation -> { boolean rejectSubscriptionIfDoesNotExist = isDurable @@ -1545,6 +1555,15 @@ protected void handleProducer(final CommandProducer cmdProducer) { } service.getOrCreateTopic(topicName.toString()).thenCompose((Topic topic) -> { + // Check max producer limitation to avoid unnecessary ops wasting resources. For example: the new + // producer reached max producer limitation, but pulsar did schema check first, it would waste CPU + if (((AbstractTopic) topic).isProducersExceeded(producerName)) { + log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); + String errorMsg = "Topic '" + topicName.toString() + "' reached max producers limit"; + Throwable t = new BrokerServiceException.ProducerBusyException(errorMsg); + return CompletableFuture.failedFuture(t); + } + // Before creating producer, check if backlog quota exceeded // on topic for size based limit and time based limit CompletableFuture backlogQuotaCheckFuture = CompletableFuture.allOf( diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 249dd3c4607be..40e2ca8cce905 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -22,6 +22,8 @@ import static org.apache.commons.lang3.StringUtils.isBlank; import static org.apache.pulsar.broker.BrokerTestUtil.newUniqueName; import static org.apache.pulsar.broker.resources.LoadBalanceResources.BUNDLE_DATA_BASE_PATH; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -52,6 +54,7 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import javax.ws.rs.NotAcceptableException; import javax.ws.rs.core.Response.Status; import lombok.AllArgsConstructor; @@ -70,6 +73,7 @@ import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerImpl; import org.apache.pulsar.broker.loadbalance.impl.ModularLoadManagerWrapper; import org.apache.pulsar.broker.loadbalance.impl.SimpleLoadManagerImpl; +import org.apache.pulsar.broker.service.AbstractTopic; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -127,7 +131,13 @@ import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.policies.data.TopicType; import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl; +import org.apache.pulsar.common.protocol.schema.SchemaData; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; +import org.awaitility.reflect.WhiteboxImpl; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; @@ -2870,34 +2880,40 @@ public void testMaxProducersPerTopicUnlimited() throws Exception { final String myNamespace = newUniqueName(defaultTenant + "/ns"); admin.namespaces().createNamespace(myNamespace, Set.of("test")); final String topic = "persistent://" + myNamespace + "/testMaxProducersPerTopicUnlimited"; + admin.topics().createNonPartitionedTopic(topic); + AtomicInteger schemaOpsCounter = injectSchemaCheckCounterForTopic(topic); //the policy is set to 0, so there will be no restrictions admin.namespaces().setMaxProducersPerTopic(myNamespace, 0); Awaitility.await().until(() -> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 0); - List> producers = new ArrayList<>(); + List> producers = new ArrayList<>(); for (int i = 0; i < maxProducersPerTopic + 1; i++) { - Producer producer = pulsarClient.newProducer().topic(topic).create(); + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); producers.add(producer); } + assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 1); admin.namespaces().removeMaxProducersPerTopic(myNamespace); Awaitility.await().until(() -> admin.namespaces().getMaxProducersPerTopic(myNamespace) == null); + try { @Cleanup - Producer producer = pulsarClient.newProducer().topic(topic).create(); + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); fail("should fail"); } catch (PulsarClientException e) { String expectMsg = "Topic '" + topic + "' reached max producers limit"; assertTrue(e.getMessage().contains(expectMsg)); + assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 1); } //set the limit to 3 admin.namespaces().setMaxProducersPerTopic(myNamespace, 3); Awaitility.await().until(() -> admin.namespaces().getMaxProducersPerTopic(myNamespace) == 3); // should success - Producer producer = pulsarClient.newProducer().topic(topic).create(); + Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); producers.add(producer); + assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 2); try { @Cleanup Producer producer1 = pulsarClient.newProducer().topic(topic).create(); @@ -2905,14 +2921,39 @@ public void testMaxProducersPerTopicUnlimited() throws Exception { } catch (PulsarClientException e) { String expectMsg = "Topic '" + topic + "' reached max producers limit"; assertTrue(e.getMessage().contains(expectMsg)); + assertEquals(schemaOpsCounter.get(), maxProducersPerTopic + 2); } //clean up - for (Producer tempProducer : producers) { + for (Producer tempProducer : producers) { tempProducer.close(); } } + private AtomicInteger injectSchemaCheckCounterForTopic(String topicName) { + ConcurrentOpenHashMap>> topics = + WhiteboxImpl.getInternalState(pulsar.getBrokerService(), "topics"); + AbstractTopic topic = (AbstractTopic) topics.get(topicName).join().get(); + AbstractTopic spyTopic = Mockito.spy(topic); + AtomicInteger counter = new AtomicInteger(); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + counter.incrementAndGet(); + return invocation.callRealMethod(); + } + }).when(spyTopic).addSchema(any(SchemaData.class)); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + counter.incrementAndGet(); + return invocation.callRealMethod(); + } + }).when(spyTopic).addSchemaIfIdleOrCheckCompatible(any(SchemaData.class)); + topics.put(topicName, CompletableFuture.completedFuture(Optional.of(spyTopic))); + return counter; + } + @Test public void testMaxConsumersPerTopicUnlimited() throws Exception { restartClusterAfterTest(); @@ -2924,49 +2965,55 @@ public void testMaxConsumersPerTopicUnlimited() throws Exception { final String myNamespace = newUniqueName(defaultTenant + "/ns"); admin.namespaces().createNamespace(myNamespace, Set.of("test")); final String topic = "persistent://" + myNamespace + "/testMaxConsumersPerTopicUnlimited"; + admin.topics().createNonPartitionedTopic(topic); + AtomicInteger schemaOpsCounter = injectSchemaCheckCounterForTopic(topic); assertNull(admin.namespaces().getMaxConsumersPerTopic(myNamespace)); //the policy is set to 0, so there will be no restrictions admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0); Awaitility.await().until(() -> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 0); - List> consumers = new ArrayList<>(); + List> consumers = new ArrayList<>(); for (int i = 0; i < maxConsumersPerTopic + 1; i++) { - Consumer consumer = - pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe(); + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe(); consumers.add(consumer); } + assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 2); admin.namespaces().removeMaxConsumersPerTopic(myNamespace); Awaitility.await().until(() -> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == null); try { @Cleanup - Consumer subscribe = - pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe(); + Consumer subscribe = pulsarClient.newConsumer(Schema.STRING) + .subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe(); fail("should fail"); } catch (PulsarClientException e) { assertTrue(e.getMessage().contains("Topic reached max consumers limit")); + assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 2); } //set the limit to 3 admin.namespaces().setMaxConsumersPerTopic(myNamespace, 3); Awaitility.await().until(() -> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 3); // should success - Consumer consumer = - pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe(); + Consumer consumer = pulsarClient.newConsumer(Schema.STRING) + .subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe(); consumers.add(consumer); + assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 3); try { @Cleanup - Consumer subscribe = - pulsarClient.newConsumer().subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe(); + Consumer subscribe = pulsarClient.newConsumer(Schema.STRING) + .subscriptionName(UUID.randomUUID().toString()).topic(topic).subscribe(); fail("should fail"); } catch (PulsarClientException e) { assertTrue(e.getMessage().contains("Topic reached max consumers limit")); + assertEquals(schemaOpsCounter.get(), maxConsumersPerTopic + 3); } //clean up - for (Consumer subConsumer : consumers) { + for (Consumer subConsumer : consumers) { subConsumer.close(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java index 76f871a6c6035..8c21301c15b4c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java @@ -509,51 +509,6 @@ public void testProducerOverwrite() { topic.getProducers().values().forEach(producer -> Assert.assertEquals(producer.getEpoch(), 3)); } - private void testMaxProducers() { - PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService); - topic.initialize().join(); - String role = "appid1"; - // 1. add producer1 - Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role, - false, null, SchemaVersion.Latest, 0, false, ProducerAccessMode.Shared, Optional.empty(), true); - topic.addProducer(producer, new CompletableFuture<>()); - assertEquals(topic.getProducers().size(), 1); - - // 2. add producer2 - Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name2", role, - false, null, SchemaVersion.Latest, 0, false, ProducerAccessMode.Shared, Optional.empty(), true); - topic.addProducer(producer2, new CompletableFuture<>()); - assertEquals(topic.getProducers().size(), 2); - - // 3. add producer3 but reached maxProducersPerTopic - try { - Producer producer3 = new Producer(topic, serverCnx, 3 /* producer id */, "prod-name3", role, - false, null, SchemaVersion.Latest, 0, false, ProducerAccessMode.Shared, Optional.empty(), true); - topic.addProducer(producer3, new CompletableFuture<>()).join(); - fail("should have failed"); - } catch (Exception e) { - assertEquals(e.getCause().getClass(), BrokerServiceException.ProducerBusyException.class); - } - } - - @Test - public void testMaxProducersForBroker() { - // set max clients - pulsarTestContext.getConfig().setMaxProducersPerTopic(2); - testMaxProducers(); - } - - @Test - public void testMaxProducersForNamespace() throws Exception { - // set max clients - Policies policies = new Policies(); - policies.max_producers_per_topic = 2; - pulsarTestContext.getPulsarResources().getNamespaceResources() - .createPolicies(TopicName.get(successTopicName).getNamespaceObject(), - policies); - testMaxProducers(); - } - private Producer getMockedProducerWithSpecificAddress(Topic topic, long producerId, InetAddress address) { final String producerNameBase = "producer"; final String role = "appid1"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MaxProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MaxProducerTest.java new file mode 100644 index 0000000000000..a34b05280c4f5 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/MaxProducerTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; +import java.util.ArrayList; +import java.util.List; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker-api") +public class MaxProducerTest extends ProducerConsumerBase { + + @BeforeClass(alwaysRun = true) + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setMaxProducersPerTopic(2); + } + + @Test + public void testMaxProducersForBroker() throws Exception { + testMaxProducers(2); + } + + @Test + public void testMaxProducersForNamespace() throws Exception { + // set max clients + admin.namespaces().setMaxProducersPerTopic("public/default", 3); + testMaxProducers(3); + } + + private void testMaxProducers(int maxProducerExpected) throws Exception { + final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp"); + admin.topics().createNonPartitionedTopic(topicName); + + List> producers = new ArrayList<>(); + for (int i = 0; i < maxProducerExpected; i++) { + producers.add(pulsarClient.newProducer().topic(topicName).create()); + } + + try { + pulsarClient.newProducer().topic(topicName).create(); + fail("should have failed"); + } catch (Exception e) { + assertTrue(e instanceof PulsarClientException.ProducerBusyException); + } + + // cleanup. + for (org.apache.pulsar.client.api.Producer p : producers) { + p.close(); + } + admin.topics().delete(topicName, false); + } +}