Skip to content

Commit

Permalink
Fixed race condition intruduced managed ledger addEntry introduced in a…
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored and sijie committed Apr 12, 2018
1 parent 0fd4e84 commit aff6c04
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -205,7 +204,7 @@ enum PositionBound {
* Queue of pending entries to be added to the managed ledger. Typically entries are queued when a new ledger is
* created asynchronously and hence there is no ready ledger to write into.
*/
final GrowableArrayBlockingQueue<OpAddEntry> pendingAddEntries = new GrowableArrayBlockingQueue<>();
final ConcurrentLinkedQueue<OpAddEntry> pendingAddEntries = new ConcurrentLinkedQueue<>();

// //////////////////////////////////////////////////////////////////////

Expand Down Expand Up @@ -491,10 +490,11 @@ public void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx)
}

OpAddEntry addOperation = OpAddEntry.create(this, buffer, callback, ctx);
pendingAddEntries.add(addOperation);

// Jump to specific thread to avoid contention from writers writing from different threads
executor.executeOrdered(name, safeRun(() -> {
pendingAddEntries.add(addOperation);

internalAsyncAddEntry(addOperation);
}));
}
Expand Down Expand Up @@ -1200,7 +1200,7 @@ public synchronized void updateLedgersIdsComplete(Stat stat) {
}

// Process all the pending addEntry requests
for (OpAddEntry op : pendingAddEntries.toList()) {
for (OpAddEntry op : pendingAddEntries) {
op.setLedger(currentLedger);
++currentLedgerEntries;
currentLedgerSize += op.data.readableBytes();
Expand Down

0 comments on commit aff6c04

Please sign in to comment.