Skip to content

Commit 9381b85

Browse files
gaozhangmingavingaozhangmin
andauthored
[branch-2.10][fix][broker] Fix index generator is not rollback after entries are failed added (#19980)
Co-authored-by: gavingaozhangmin <gavingaozhangmin@didiglobal.com>
1 parent c3282bc commit 9381b85

File tree

6 files changed

+72
-0
lines changed

6 files changed

+72
-0
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -840,6 +840,13 @@ private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
840840
lastAddEntryTimeMs = System.currentTimeMillis();
841841
}
842842

843+
protected void afterFailedAddEntry(int numOfMessages) {
844+
if (managedLedgerInterceptor == null) {
845+
return;
846+
}
847+
managedLedgerInterceptor.afterFailedAddEntry(numOfMessages);
848+
}
849+
843850
private boolean beforeAddEntry(OpAddEntry addOperation) {
844851
// if no interceptor, just return true to make sure addOperation will be initiate()
845852
if (managedLedgerInterceptor == null) {

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ public void initiate() {
150150

151151
public void failed(ManagedLedgerException e) {
152152
AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
153+
ml.afterFailedAddEntry(this.getNumberOfMessages());
153154
if (cb != null) {
154155
ReferenceCountUtil.release(data);
155156
cb.addFailed(e, ctx);

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,14 @@ public interface ManagedLedgerInterceptor {
4141
*/
4242
OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages);
4343

44+
/**
45+
* Intercept When add entry failed.
46+
* @param numberOfMessages
47+
*/
48+
default void afterFailedAddEntry(int numberOfMessages){
49+
50+
}
51+
4452
/**
4553
* Intercept when ManagedLedger is initialized.
4654
* @param propertiesMap map of properties.

pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,15 @@ public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
7878
return op;
7979
}
8080

81+
@Override
82+
public void afterFailedAddEntry(int numberOfMessages) {
83+
for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
84+
if (interceptor instanceof AppendIndexMetadataInterceptor) {
85+
((AppendIndexMetadataInterceptor) interceptor).decreaseWithNumberOfMessages(numberOfMessages);
86+
}
87+
}
88+
}
89+
8190
@Override
8291
public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMap) {
8392
if (propertiesMap == null || propertiesMap.size() == 0) {

pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/MangedLedgerInterceptorImplTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,49 @@ public void testFindPositionByIndex() throws Exception {
304304
ledger.close();
305305
}
306306

307+
@Test
308+
public void testAddEntryFailed() throws Exception {
309+
final int MOCK_BATCH_SIZE = 2;
310+
final String ledgerAndCursorName = "testAddEntryFailed";
311+
312+
ManagedLedgerInterceptor interceptor =
313+
new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null);
314+
315+
ManagedLedgerConfig config = new ManagedLedgerConfig();
316+
config.setMaxEntriesPerLedger(2);
317+
config.setManagedLedgerInterceptor(interceptor);
318+
319+
ByteBuf buffer = Unpooled.wrappedBuffer("message".getBytes());
320+
ManagedLedger ledger = factory.open(ledgerAndCursorName, config);
321+
322+
ledger.terminate();
323+
324+
ManagedLedgerInterceptorImpl interceptor1 =
325+
(ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor();
326+
327+
CountDownLatch countDownLatch = new CountDownLatch(1);
328+
try {
329+
ledger.asyncAddEntry(buffer, MOCK_BATCH_SIZE, new AsyncCallbacks.AddEntryCallback() {
330+
@Override
331+
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
332+
countDownLatch.countDown();
333+
}
334+
335+
@Override
336+
public void addFailed(ManagedLedgerException exception, Object ctx) {
337+
countDownLatch.countDown();
338+
}
339+
}, null);
340+
341+
countDownLatch.await();
342+
assertEquals(interceptor1.getIndex(), -1);
343+
} finally {
344+
ledger.close();
345+
factory.shutdown();
346+
}
347+
348+
}
349+
307350
@Test
308351
public void testBeforeAddEntryWithException() throws Exception {
309352
final int MOCK_BATCH_SIZE = 2;

pulsar-common/src/main/java/org/apache/pulsar/common/intercept/AppendIndexMetadataInterceptor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,4 +50,8 @@ public BrokerEntryMetadata interceptWithNumberOfMessages(
5050
public long getIndex() {
5151
return indexGenerator.get();
5252
}
53+
54+
public void decreaseWithNumberOfMessages(int numberOfMessages) {
55+
indexGenerator.addAndGet(-numberOfMessages);
56+
}
5357
}

0 commit comments

Comments
 (0)