diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 15c380cb6a1f1..8649d1d6ed79d 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -95,7 +95,6 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; -import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -205,7 +204,7 @@ enum PositionBound { * Queue of pending entries to be added to the managed ledger. Typically entries are queued when a new ledger is * created asynchronously and hence there is no ready ledger to write into. */ - final GrowableArrayBlockingQueue pendingAddEntries = new GrowableArrayBlockingQueue<>(); + final ConcurrentLinkedQueue pendingAddEntries = new ConcurrentLinkedQueue<>(); // ////////////////////////////////////////////////////////////////////// @@ -491,10 +490,11 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx) } OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx); - pendingAddEntries.add(addOperation); // Jump to specific thread to avoid contention from writers writing from different threads executor.executeOrdered(name, safeRun(() -> { + pendingAddEntries.add(addOperation); + internalAsyncAddEntry(addOperation); })); } @@ -1200,7 +1200,7 @@ public synchronized void updateLedgersIdsComplete(Stat stat) { } // Process all the pending addEntry requests - for (OpAddEntry op : pendingAddEntries.toList()) { + for (OpAddEntry op : pendingAddEntries) { op.setLedger(currentLedger); ++currentLedgerEntries; currentLedgerSize += op.data.readableBytes();