Skip to content

Commit

Permalink
[fix][broker] Streaming dispatcher stuck after reading the first entr…
Browse files Browse the repository at this point in the history
…y with SHARED subscriptions (#17143)
  • Loading branch information
nicoloboschi authored Aug 18, 2022
1 parent 3c3ec49 commit e16a35d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
"blockedDispatcherOnUnackedMsgs");
protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);
private final ExecutorService dispatchMessagesThread;
protected final ExecutorService dispatchMessagesThread;
private final SharedConsumerAssignor assignor;

protected enum ReadType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service.persistent;

import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.collect.Lists;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -91,7 +92,24 @@ public synchronized void readEntryComplete(Entry entry, PendingReadEntryRequest

cursor.seek(((ManagedLedgerImpl) cursor.getManagedLedger())
.getNextValidPosition((PositionImpl) entry.getPosition()));
sendMessagesToConsumers(readType, Lists.newArrayList(entry));

// dispatch messages to a separate thread, but still in order for this subscription
// sendMessagesToConsumers is responsible for running broker-side filters
// that may be quite expensive
if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
// setting sendInProgress here, because sendMessagesToConsumers will be executed
// in a separate thread, and we want to prevent more reads
sendInProgress = true;
dispatchMessagesThread.execute(safeRun(() -> {
if (sendMessagesToConsumers(readType, Lists.newArrayList(entry))) {
readMoreEntries();
}
}));
} else {
if (sendMessagesToConsumers(readType, Lists.newArrayList(entry))) {
readMoreEntriesAsync();
}
}
ctx.recycle();
}

Expand Down

0 comments on commit e16a35d

Please sign in to comment.