Skip to content

Commit

Permalink
Make max clients per topic/subscription configurable (apache#1234)
Browse files Browse the repository at this point in the history
  • Loading branch information
hrsakai authored and merlimat committed Feb 14, 2018
1 parent 3c9e281 commit 5613c2b
Show file tree
Hide file tree
Showing 12 changed files with 406 additions and 22 deletions.
15 changes: 15 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,21 @@ enableRunBookieTogether=false
# Enable to run bookie autorecovery along with broker
enableRunBookieAutoRecoveryTogether=false

// Max number of producers allowed to connect to topic. Once this limit reaches, Broker will reject new producers
// until the number of connected producers decrease.
// Using a value of 0, is disabling maxProducersPerTopic-limit check.
maxProducersPerTopic=0

// Max number of consumers allowed to connect to topic. Once this limit reaches, Broker will reject new consumers
// until the number of connected consumers decrease.
// Using a value of 0, is disabling maxConsumersPerTopic-limit check.
maxConsumersPerTopic=0

// Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers
// until the number of connected consumers decrease.
// Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
maxConsumersPerSubscription=0

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
Expand Down
15 changes: 15 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,21 @@ enablePersistentTopics=true
# Enable broker to load non-persistent topics
enableNonPersistentTopics=true

// Max number of producers allowed to connect to topic. Once this limit reaches, Broker will reject new producers
// until the number of connected producers decrease.
// Using a value of 0, is disabling maxProducersPerTopic-limit check.
maxProducersPerTopic=0

// Max number of consumers allowed to connect to topic. Once this limit reaches, Broker will reject new consumers
// until the number of connected consumers decrease.
// Using a value of 0, is disabling maxConsumersPerTopic-limit check.
maxConsumersPerTopic=0

// Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers
// until the number of connected consumers decrease.
// Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
maxConsumersPerSubscription=0

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,21 @@ public class ServiceConfiguration implements PulsarConfiguration {
// Enable to run bookie autorecovery along with broker
private boolean enableRunBookieAutoRecoveryTogether = false;

// Max number of producers allowed to connect to topic. Once this limit reaches, Broker will reject new producers
// until the number of connected producers decrease.
// Using a value of 0, is disabling maxProducersPerTopic-limit check.
private int maxProducersPerTopic = 0;

// Max number of consumers allowed to connect to topic. Once this limit reaches, Broker will reject new consumers
// until the number of connected consumers decrease.
// Using a value of 0, is disabling maxConsumersPerTopic-limit check.
private int maxConsumersPerTopic = 0;

// Max number of consumers allowed to connect to subscription. Once this limit reaches, Broker will reject new consumers
// until the number of connected consumers decrease.
// Using a value of 0, is disabling maxConsumersPerSubscription-limit check.
private int maxConsumersPerSubscription = 0;

/***** --- TLS --- ****/
// Enable TLS
private boolean tlsEnabled = false;
Expand Down Expand Up @@ -408,7 +423,6 @@ public class ServiceConfiguration implements PulsarConfiguration {
// If true, export topic level metrics otherwise namespace level
private boolean exposeTopicLevelMetricsInPrometheus = true;


public String getZookeeperServers() {
return zookeeperServers;
}
Expand Down Expand Up @@ -744,6 +758,30 @@ public void setEnableRunBookieAutoRecoveryTogether(boolean enableRunBookieAutoRe
this.enableRunBookieAutoRecoveryTogether = enableRunBookieAutoRecoveryTogether;
}

public int getMaxProducersPerTopic() {
return maxProducersPerTopic;
}

public void setMaxProducersPerTopic(int maxProducersPerTopic) {
this.maxProducersPerTopic = maxProducersPerTopic;
}

public int getMaxConsumersPerTopic() {
return maxConsumersPerTopic;
}

public void setMaxConsumersPerTopic(int maxConsumersPerTopic) {
this.maxConsumersPerTopic = maxConsumersPerTopic;
}

public int getMaxConsumersPerSubscription() {
return maxConsumersPerSubscription;
}

public void setMaxConsumersPerSubscription(int maxConsumersPerSubscription) {
this.maxConsumersPerSubscription = maxConsumersPerSubscription;
}

public boolean isTlsEnabled() {
return tlsEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,16 @@
package org.apache.pulsar.broker.service;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.utils.CopyOnWriteArrayList;
import org.slf4j.Logger;
Expand Down Expand Up @@ -81,6 +68,10 @@ public AbstractDispatcherSingleActiveConsumer(SubType subscriptionType, int part

protected abstract void cancelPendingRead();

protected abstract boolean isConsumersExceededOnTopic();

protected abstract boolean isConsumersExceededOnSubscription();

protected void pickAndScheduleActiveConsumer() {
checkArgument(!consumers.isEmpty());

Expand All @@ -106,6 +97,16 @@ public synchronized void addConsumer(Consumer consumer) throws BrokerServiceExce
throw new ConsumerBusyException("Exclusive consumer is already connected");
}

if (isConsumersExceededOnTopic()) {
log.warn("[{}] Attempting to add consumer to topic which reached max consumers limit", this.topicName);
throw new ConsumerBusyException("Topic reached max consumers limit");
}

if (subscriptionType == SubType.Failover && isConsumersExceededOnSubscription()) {
log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", this.topicName);
throw new ConsumerBusyException("Subscription reached max consumers limit");
}

consumers.add(consumer);

// Pick an active consumer and start it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ public ConsumerBusyException(String msg) {
}
}

public static class ProducerBusyException extends BrokerServiceException {
public ProducerBusyException(String msg) {
super(msg);
}
}

public static class ServiceUnitNotReadyException extends BrokerServiceException {
public ServiceUnitNotReadyException(String msg) {
super(msg);
Expand Down Expand Up @@ -155,4 +161,4 @@ public static PulsarApi.ServerError getClientErrorCode(Throwable t) {
return PulsarApi.ServerError.UnknownError;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
Expand All @@ -40,6 +42,9 @@
public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers
implements NonPersistentDispatcher {

private final NonPersistentTopic topic;
private final Subscription subscription;

private CompletableFuture<Void> closeFuture = null;
private final String name;
private final Rate msgDrop;
Expand All @@ -48,26 +53,54 @@ public class NonPersistentDispatcherMultipleConsumers extends AbstractDispatcher
@SuppressWarnings("unused")
private volatile int totalAvailablePermits = 0;

private final Subscription subscription;
private final ServiceConfiguration serviceConfig;

public NonPersistentDispatcherMultipleConsumers(NonPersistentTopic topic, Subscription subscription) {
this.topic = topic;
this.subscription = subscription;
this.name = topic.getName() + " / " + subscription.getName();
this.msgDrop = new Rate();
this.subscription = subscription;
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
}

@Override
public synchronized void addConsumer(Consumer consumer) {
public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException {
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher is already closed. Closing consumer ", name, consumer);
consumer.disconnect();
return;
}

if (isConsumersExceededOnTopic()) {
log.warn("[{}] Attempting to add consumer to topic which reached max consumers limit", name);
throw new ConsumerBusyException("Topic reached max consumers limit");
}

if (isConsumersExceededOnSubscription()) {
log.warn("[{}] Attempting to add consumer to subscription which reached max consumers limit", name);
throw new ConsumerBusyException("Subscription reached max consumers limit");
}

consumerList.add(consumer);
consumerSet.add(consumer);
}

private boolean isConsumersExceededOnTopic() {
final int maxConsumersPerTopic = serviceConfig.getMaxConsumersPerTopic();
if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= topic.getNumberOfConsumers()) {
return true;
}
return false;
}

private boolean isConsumersExceededOnSubscription() {
final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription();
if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumerList.size()) {
return true;
}
return false;
}

@Override
public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
if (consumerSet.removeAll(consumer) == 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,26 @@

import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.util.Rate;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;

public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer implements NonPersistentDispatcher {

private final NonPersistentTopic topic;
private final Rate msgDrop;
private final Subscription subscription;
private final ServiceConfiguration serviceConfig;

public NonPersistentDispatcherSingleActiveConsumer(SubType subscriptionType, int partitionIndex,
NonPersistentTopic topic, Subscription subscription) {
super(subscriptionType, partitionIndex, topic.getName());
this.topic = topic;
this.subscription = subscription;
this.msgDrop = new Rate();
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
}

@Override
Expand All @@ -57,6 +62,22 @@ public void sendMessages(List<Entry> entries) {
}
}

protected boolean isConsumersExceededOnTopic() {
final int maxConsumersPerTopic = serviceConfig.getMaxConsumersPerTopic();
if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= topic.getNumberOfConsumers()) {
return true;
}
return false;
}

protected boolean isConsumersExceededOnSubscription() {
final int maxConsumersPerSubscription = serviceConfig.getMaxConsumersPerSubscription();
if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) {
return true;
}
return false;
}

@Override
public Rate getMesssageDropRate() {
return msgDrop;
Expand Down Expand Up @@ -86,5 +107,4 @@ protected void readMoreEntries(Consumer consumer) {
protected void cancelPendingRead() {
// No-op
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
Expand Down Expand Up @@ -235,6 +236,11 @@ public void addProducer(Producer producer) throws BrokerServiceException {
throw new TopicFencedException("Topic is temporarily unavailable");
}

if (isProducersExceeded()) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
throw new ProducerBusyException("Topic reached max producers limit");
}

if (log.isDebugEnabled()) {
log.debug("[{}] {} Got request to create producer ", topic, producer.getProducerName());
}
Expand All @@ -255,6 +261,14 @@ public void addProducer(Producer producer) throws BrokerServiceException {
}
}

private boolean isProducersExceeded() {
final int maxProducers = brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
if (maxProducers > 0 && maxProducers <= producers.size()) {
return true;
}
return false;
}

@Override
public void checkMessageDeduplicationInfo() {
// No-op
Expand Down Expand Up @@ -592,6 +606,14 @@ public ConcurrentOpenHashSet<Producer> getProducers() {
return producers;
}

public int getNumberOfConsumers() {
int count = 0;
for (NonPersistentSubscription subscription : subscriptions.values()) {
count += subscription.getConsumers().size();
}
return count;
}

@Override
public ConcurrentOpenHashMap<String, NonPersistentSubscription> getSubscriptions() {
return subscriptions;
Expand Down Expand Up @@ -920,5 +942,4 @@ public void markBatchMessagePublished() {
}

private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);

}
Loading

0 comments on commit 5613c2b

Please sign in to comment.