Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] PIP-379: Enable the use of the classic implementation of Key_Shared / Shared with feature flag #23424

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
8f6e42d
Extract AbstractPersistentDispatcherMultipleConsumers class
lhotari Oct 8, 2024
af78832
Rename unused and wrong logger
lhotari Oct 8, 2024
8ed62e0
Add StickyKeyDispatcher interface
lhotari Oct 8, 2024
c30697f
Add implementations for Shared and Key_Shared from branch-3.3, ported…
lhotari Oct 9, 2024
a7aea94
Restore readPositionWhenJoining
lhotari Oct 9, 2024
36f89b2
Restore consumersAfterMarkDeletePosition
lhotari Oct 9, 2024
c5cefcd
Add method "isClassic()" that can be used to detect the implementatio…
lhotari Oct 9, 2024
ef1bae5
Add feature toggle
lhotari Oct 9, 2024
71add25
Rename duplicate KeySharedSubscriptioTest to KeySharedSubscriptionMax…
lhotari Oct 9, 2024
34c0277
Test both PIP-379 and classic in KeySharedSubscriptionMaxUnackedMessa…
lhotari Oct 9, 2024
09ab041
Test both PIP-379 and Classic in KeySharedSubscriptionTest
lhotari Oct 9, 2024
2d2524c
Fix bug where hash wasn't added to pending acks in classic implementa…
lhotari Oct 9, 2024
ff297ac
Revert "Fix bug where hash wasn't added to pending acks in classic im…
lhotari Oct 9, 2024
e3718a0
Don't require hash in replay with classic implementations
lhotari Oct 9, 2024
a6d7f8f
Use Integer.MAX_VALUE-1 as the range end in classic and consistent ha…
lhotari Oct 9, 2024
773917d
Make KeySharedSubscriptionTest run for both implementation types
lhotari Oct 9, 2024
e467c53
Refactor test parameterization solution
lhotari Oct 9, 2024
816db77
Revisit receiveAndCheckDistribution to use multiple threads for recei…
lhotari Oct 9, 2024
59448a6
Copy the unit test classes for the classic dispatchers
lhotari Oct 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Extract AbstractPersistentDispatcherMultipleConsumers class
  • Loading branch information
lhotari committed Oct 8, 2024
commit 8f6e42dd5eb2e66863164595b233eb5b9d58372f
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import java.time.Clock;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;

@Slf4j
public abstract class AbstractDelayedDeliveryTracker implements DelayedDeliveryTracker, TimerTask {

protected final PersistentDispatcherMultipleConsumers dispatcher;
protected final AbstractPersistentDispatcherMultipleConsumers dispatcher;

// Reference to the shared (per-broker) timer for delayed delivery
protected final Timer timer;
Expand All @@ -49,13 +49,13 @@ public abstract class AbstractDelayedDeliveryTracker implements DelayedDeliveryT

private final boolean isDelayedDeliveryDeliverAtTimeStrict;

public AbstractDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
public AbstractDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict);
}

public AbstractDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
public AbstractDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict) {
this.dispatcher = dispatcher;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.pulsar.broker.delayed.bucket.BucketSnapshotStorage;
import org.apache.pulsar.broker.delayed.bucket.RecoverDelayedDeliveryTrackerException;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -78,7 +78,7 @@ public void initialize(PulsarService pulsarService) throws Exception {
}

@Override
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
String topicName = dispatcher.getTopic().getName();
String subscriptionName = dispatcher.getSubscription().getName();
BrokerService brokerService = dispatcher.getTopic().getBrokerService();
Expand All @@ -97,7 +97,7 @@ public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers d
}

@VisibleForTesting
BucketDelayedDeliveryTracker newTracker0(PersistentDispatcherMultipleConsumers dispatcher)
BucketDelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher)
throws RecoverDelayedDeliveryTrackerException {
return new BucketDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, delayedDeliveryMinIndexCountPerBucket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import com.google.common.annotations.Beta;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;

/**
* Factory of InMemoryDelayedDeliveryTracker objects. This is the entry point for implementations.
Expand All @@ -42,7 +42,7 @@ public interface DelayedDeliveryTrackerFactory extends AutoCloseable {
* @param dispatcher
* a multi-consumer dispatcher instance
*/
DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher);
DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher);

/**
* Close the factory and release all the resources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.PositionFactory;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;

@Slf4j
Expand All @@ -52,17 +52,18 @@ public class InMemoryDelayedDeliveryTracker extends AbstractDelayedDeliveryTrack
// Track whether we have seen all messages with fixed delay so far.
private boolean messagesHaveFixedDelay = true;

InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis,
InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
fixedDelayDetectionLookahead);
}

public InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
public InMemoryDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
long fixedDelayDetectionLookahead) {
super(dispatcher, timer, tickTimeMillis, clock, isDelayedDeliveryDeliverAtTimeStrict);
this.fixedDelayDetectionLookahead = fixedDelayDetectionLookahead;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -51,7 +51,7 @@ public void initialize(PulsarService pulsarService) {
}

@Override
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
public DelayedDeliveryTracker newTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
String topicName = dispatcher.getTopic().getName();
String subscriptionName = dispatcher.getSubscription().getName();
DelayedDeliveryTracker tracker = DelayedDeliveryTracker.DISABLE;
Expand All @@ -66,7 +66,7 @@ public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers d
}

@VisibleForTesting
InMemoryDelayedDeliveryTracker newTracker0(PersistentDispatcherMultipleConsumers dispatcher) {
InMemoryDelayedDeliveryTracker newTracker0(AbstractPersistentDispatcherMultipleConsumers dispatcher) {
return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict, fixedDelayDetectionLookahead);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
import org.apache.pulsar.broker.delayed.AbstractDelayedDeliveryTracker;
import org.apache.pulsar.broker.delayed.proto.DelayedIndex;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.TripleLongPriorityQueue;
Expand Down Expand Up @@ -105,7 +105,7 @@ public class BucketDelayedDeliveryTracker extends AbstractDelayedDeliveryTracker

private CompletableFuture<Void> pendingLoad = null;

public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
public BucketDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
Expand All @@ -117,7 +117,7 @@ public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispat
maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
}

public BucketDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher,
public BucketDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@
import org.apache.pulsar.broker.service.TopicEventsListener.EventStage;
import org.apache.pulsar.broker.service.TopicEventsListener.TopicEvent;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.SystemTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilterProvider;
Expand Down Expand Up @@ -301,7 +301,7 @@ public class BrokerService implements Closeable {
private final int maxUnackedMessages;
public final int maxUnackedMsgsPerDispatcher;
private final AtomicBoolean blockedDispatcherOnHighUnackedMsgs = new AtomicBoolean(false);
private final Set<PersistentDispatcherMultipleConsumers> blockedDispatchers = ConcurrentHashMap.newKeySet();
private final Set<AbstractPersistentDispatcherMultipleConsumers> blockedDispatchers = ConcurrentHashMap.newKeySet();
private final ReadWriteLock lock = new ReentrantReadWriteLock();
@VisibleForTesting
private final DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory;
Expand Down Expand Up @@ -3328,7 +3328,7 @@ public OrderedExecutor getTopicOrderedExecutor() {
* @param dispatcher
* @param numberOfMessages
*/
public void addUnAckedMessages(PersistentDispatcherMultipleConsumers dispatcher, int numberOfMessages) {
public void addUnAckedMessages(AbstractPersistentDispatcherMultipleConsumers dispatcher, int numberOfMessages) {
// don't block dispatchers if maxUnackedMessages = 0
if (maxUnackedMessages > 0) {
totalUnackedMessages.add(numberOfMessages);
Expand Down Expand Up @@ -3387,10 +3387,10 @@ private void blockDispatchersWithLargeUnAckMessages() {
try {
forEachTopic(topic -> {
topic.getSubscriptions().forEach((subName, persistentSubscription) -> {
if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
PersistentDispatcherMultipleConsumers dispatcher =
(PersistentDispatcherMultipleConsumers) persistentSubscription
.getDispatcher();
if (persistentSubscription.getDispatcher()
instanceof AbstractPersistentDispatcherMultipleConsumers) {
AbstractPersistentDispatcherMultipleConsumers dispatcher =
(AbstractPersistentDispatcherMultipleConsumers) persistentSubscription.getDispatcher();
int dispatcherUnAckMsgs = dispatcher.getTotalUnackedMessages();
if (dispatcherUnAckMsgs > maxUnackedMsgsPerDispatcher) {
log.info("[{}] Blocking dispatcher due to reached max broker limit {}",
Expand All @@ -3411,7 +3411,7 @@ private void blockDispatchersWithLargeUnAckMessages() {
*
* @param dispatcherList
*/
public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleConsumers> dispatcherList) {
public void unblockDispatchersOnUnAckMessages(List<AbstractPersistentDispatcherMultipleConsumers> dispatcherList) {
lock.writeLock().lock();
try {
dispatcherList.forEach(dispatcher -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,8 @@ default void afterAckMessages(Throwable exOfDeletion, Object ctxOfDeletion){}

/**
* Trigger a new "readMoreEntries" if the dispatching has been paused before. This method is only implemented in
* {@link org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers} right now, other
* implements are not necessary to implement this method.
* {@link org.apache.pulsar.broker.service.persistent.AbstractPersistentDispatcherMultipleConsumers} right now,
* other implementations do not necessary implement this method.
* @return did a resume.
*/
default boolean checkAndResumeIfPaused(){
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;

import java.util.Map;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.common.policies.data.stats.TopicMetricBean;

public abstract class AbstractPersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers
implements Dispatcher, AsyncCallbacks.ReadEntriesCallback {
public AbstractPersistentDispatcherMultipleConsumers(Subscription subscription,
ServiceConfiguration serviceConfig) {
super(subscription, serviceConfig);
}

public abstract void unBlockDispatcherOnUnackedMsgs();

public abstract void readMoreEntriesAsync();

public abstract String getName();

public abstract boolean isBlockedDispatcherOnUnackedMsgs();

public abstract int getTotalUnackedMessages();

public abstract void blockDispatcherOnUnackedMsgs();

public abstract long getNumberOfMessagesInReplay();

public abstract boolean isHavePendingRead();

public abstract boolean isHavePendingReplayRead();

public abstract ManagedCursor getCursor();

public abstract Topic getTopic();

public abstract Subscription getSubscription();

public abstract long getDelayedTrackerMemoryUsage();

public abstract Map<String, TopicMetricBean> getBucketDelayedIndexStats();
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
Expand All @@ -60,7 +59,6 @@
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryAndMetadata;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
Expand All @@ -85,8 +83,7 @@
/**
*
*/
public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMultipleConsumers
implements Dispatcher, ReadEntriesCallback {
public class PersistentDispatcherMultipleConsumers extends AbstractPersistentDispatcherMultipleConsumers {
protected final PersistentTopic topic;
protected final ManagedCursor cursor;
protected volatile Range<Position> lastIndividualDeletedRangeFromCursorRecovery;
Expand Down Expand Up @@ -320,6 +317,7 @@ private synchronized void internalConsumerFlow(Consumer consumer, int additional
* We should not call readMoreEntries() recursively in the same thread as there is a risk of StackOverflowError.
*
*/
@Override
public void readMoreEntriesAsync() {
// deduplication for readMoreEntriesAsync calls
if (readMoreEntriesAsyncRequested.compareAndSet(false, true)) {
Expand Down Expand Up @@ -1285,6 +1283,7 @@ public void blockDispatcherOnUnackedMsgs() {
blockedDispatcherOnUnackedMsgs = TRUE;
}

@Override
public void unBlockDispatcherOnUnackedMsgs() {
blockedDispatcherOnUnackedMsgs = FALSE;
}
Expand All @@ -1293,6 +1292,7 @@ public int getTotalUnackedMessages() {
return totalUnackedMessages;
}

@Override
public String getName() {
return name;
}
Expand Down Expand Up @@ -1505,5 +1505,15 @@ public long getNumberOfMessagesInReplay() {
return redeliveryMessages.size();
}

@Override
public boolean isHavePendingRead() {
return havePendingRead;
}

@Override
public boolean isHavePendingReplayRead() {
return havePendingReplayRead;
}

private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherMultipleConsumers.class);
}
Loading