Skip to content

Commit

Permalink
Fix a couple of bugs in exclusive producer (apache#9554)
Browse files Browse the repository at this point in the history
* Fix a couple of bugs in exclusive producer
1. Fix uncaught exception in exclusive producer
2. Incorrect logic when producer is removed

Co-authored-by: Jerry Peng <jerryp@splunk.com>
  • Loading branch information
jerrypeng and Jerry Peng authored Feb 11, 2021
1 parent f53d180 commit d9c5b9b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ public abstract class AbstractTopic implements Topic {

protected volatile Optional<Long> topicEpoch = Optional.empty();
private volatile boolean hasExclusiveProducer;
// pointer to the exclusive producer
private volatile String exclusiveProducerName;

private final Queue<Pair<Producer, CompletableFuture<Optional<Long>>>> waitingExclusiveProducers =
new ConcurrentLinkedQueue<>();

Expand Down Expand Up @@ -382,17 +385,19 @@ protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer
switch (producer.getAccessMode()) {
case Shared:
if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) {
return FutureUtil.failedFuture(new ProducerBusyException(
"Topic has an existing exclusive producer: " + producers.keys().nextElement()));
return FutureUtil.failedFuture(
new ProducerBusyException(
"Topic has an existing exclusive producer: " + exclusiveProducerName));
} else {
// Normal producer getting added, we don't need a new epoch
return CompletableFuture.completedFuture(topicEpoch);
}

case Exclusive:
if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) {
return FutureUtil.failedFuture(new ProducerFencedException(
"Topic has an existing exclusive producer: " + producers.keys().nextElement()));
return FutureUtil.failedFuture(
new ProducerFencedException(
"Topic has an existing exclusive producer: " + exclusiveProducerName));
} else if (!producers.isEmpty()) {
return FutureUtil.failedFuture(new ProducerFencedException("Topic has existing shared producers"));
} else if (producer.getTopicEpoch().isPresent()
Expand All @@ -405,6 +410,7 @@ protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer
} else {
// There are currently no existing producers
hasExclusiveProducer = true;
exclusiveProducerName = producer.getProducerName();

CompletableFuture<Long> future;
if (producer.getTopicEpoch().isPresent()) {
Expand All @@ -414,6 +420,7 @@ protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer
}
future.exceptionally(ex -> {
hasExclusiveProducer = false;
exclusiveProducerName = null;
return null;
});

Expand All @@ -440,6 +447,7 @@ protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer
} else {
// There are currently no existing producers
hasExclusiveProducer = true;
exclusiveProducerName = producer.getProducerName();

CompletableFuture<Long> future;
if (producer.getTopicEpoch().isPresent()) {
Expand All @@ -449,6 +457,7 @@ protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer
}
future.exceptionally(ex -> {
hasExclusiveProducer = false;
exclusiveProducerName = null;
return null;
});

Expand All @@ -464,6 +473,9 @@ protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer
new BrokerServiceException("Invalid producer access mode: " + producer.getAccessMode()));
}

} catch (Exception e) {
log.error("Encountered unexpected error during exclusive producer creation", e);
return FutureUtil.failedFuture(new BrokerServiceException(e));
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -619,17 +631,21 @@ public void removeProducer(Producer producer) {

protected void handleProducerRemoved(Producer producer) {
// decrement usage only if this was a valid producer close
long newCount = USAGE_COUNT_UPDATER.decrementAndGet(this);
if (newCount == 0) {
USAGE_COUNT_UPDATER.decrementAndGet(this);
// this conditional check is an optimization so we don't have acquire the write lock
// and execute following routine if there are no exclusive producers
if (hasExclusiveProducer) {
lock.writeLock().lock();
try {
hasExclusiveProducer = false;
exclusiveProducerName = null;
Pair<Producer, CompletableFuture<Optional<Long>>> nextWaitingProducer =
waitingExclusiveProducers.poll();
if (nextWaitingProducer != null) {
Producer nextProducer = nextWaitingProducer.getKey();
CompletableFuture<Optional<Long>> producerFuture = nextWaitingProducer.getValue();
hasExclusiveProducer = true;
exclusiveProducerName = nextProducer.getProducerName();

CompletableFuture<Long> future;
if (nextProducer.getTopicEpoch().isPresent()) {
Expand All @@ -643,6 +659,7 @@ protected void handleProducerRemoved(Producer producer) {
producerFuture.complete(topicEpoch);
}).exceptionally(ex -> {
hasExclusiveProducer = false;
exclusiveProducerName = null;
producerFuture.completeExceptionally(ex);
return null;
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ public static Object[][] accessMode() {
@Test(dataProvider = "topics")
public void simpleTest(String type, boolean partitioned) throws Exception {
String topic = newTopic(type, partitioned);
simpleTest(topic);
}

private void simpleTest(String topic) throws Exception {

Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
Expand Down Expand Up @@ -287,6 +291,18 @@ public void waitForExclusiveWithClientTimeout(String type, boolean partitioned)
fp2.get(1, TimeUnit.SECONDS);
}

@Test(dataProvider = "topics")
public void exclusiveWithConsumers(String type, boolean partitioned) throws Exception {
String topic = newTopic(type, partitioned);

pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName("test")
.subscribe();

simpleTest(topic);
}

private String newTopic(String type, boolean isPartitioned) throws Exception {
String topic = type + "://" + newTopicName();
if (isPartitioned) {
Expand Down

0 comments on commit d9c5b9b

Please sign in to comment.