Skip to content

Commit 3cc06b5

Browse files
BewareMyPowereolivelli
authored andcommitted
Avoid adding duplicated BrokerEntryMetadata (apache#12018)
### Motivation When the Pulsar cluster enables broker entry metadata, sometimes there're some corrupted entries. See streamnative/kop#442 for example. It's because the broker entry metadata has been added twice. This bug might be introduced from apache#9039 https://github.com/apache/pulsar/blob/9b7c3275c904ac1e6a8ef67487a10a0506bb2c58/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1516-L1518 It happened during a managed ledger's rollover while there're some pending `OpAddEntry`s in `updateLedgersIdsComplete`, only the ledger id should be updated and the data of `OpAddEntry` should not be modified. ### Modifications Only call `beforeAddEntry` for once at the beginning of `internalAsyncAddEntry`. (cherry picked from commit 9d44617)
1 parent 0e95a1a commit 3cc06b5

File tree

1 file changed

+5
-7
lines changed

1 file changed

+5
-7
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,9 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback
713713
}
714714

715715
private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
716+
if (!beforeAddEntry(addOperation)) {
717+
return;
718+
}
716719
pendingAddEntries.add(addOperation);
717720
final State state = STATE_UPDATER.get(this);
718721
if (state == State.Fenced) {
@@ -775,10 +778,7 @@ private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
775778
addOperation.setCloseWhenDone(true);
776779
STATE_UPDATER.set(this, State.ClosingLedger);
777780
}
778-
// interceptor entry before add to bookie
779-
if (beforeAddEntry(addOperation)) {
780-
addOperation.initiate();
781-
}
781+
addOperation.initiate();
782782
}
783783
}
784784

@@ -1508,9 +1508,7 @@ public synchronized void updateLedgersIdsComplete(Stat stat) {
15081508
ReferenceCountUtil.release(existsOp.data);
15091509
}
15101510
existsOp.setLedger(currentLedger);
1511-
if (beforeAddEntry(existsOp)) {
1512-
pendingAddEntries.add(existsOp);
1513-
}
1511+
pendingAddEntries.add(existsOp);
15141512
}
15151513
} while (existsOp != null && --pendingSize > 0);
15161514

0 commit comments

Comments
 (0)