Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,13 @@ protected synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
lastAddEntryTimeMs = System.currentTimeMillis();
}

protected void afterFailedAddEntry(int numOfMessages) {
if (managedLedgerInterceptor == null) {
return;
}
managedLedgerInterceptor.afterFailedAddEntry(numOfMessages);
}

protected boolean beforeAddEntry(OpAddEntry addOperation) {
// if no interceptor, just return true to make sure addOperation will be initiate()
if (managedLedgerInterceptor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public void initiateShadowWrite() {

public void failed(ManagedLedgerException e) {
AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
ml.afterFailedAddEntry(this.getNumberOfMessages());
if (cb != null) {
ReferenceCountUtil.release(data);
cb.addFailed(e, ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@ public interface ManagedLedgerInterceptor {
*/
OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages);

/**
* Intercept When add entry failed.
* @param numberOfMessages
*/
default void afterFailedAddEntry(int numberOfMessages){

}

/**
* Intercept when ManagedLedger is initialized.
* @param propertiesMap map of properties.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ public OpAddEntry beforeAddEntry(OpAddEntry op, int numberOfMessages) {
return op;
}

@Override
public void afterFailedAddEntry(int numberOfMessages) {
for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) {
if (interceptor instanceof AppendIndexMetadataInterceptor) {
((AppendIndexMetadataInterceptor) interceptor).decreaseWithNumberOfMessages(numberOfMessages);
}
}
}

@Override
public void onManagedLedgerPropertiesInitialize(Map<String, String> propertiesMap) {
if (propertiesMap == null || propertiesMap.size() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,49 @@ public void testFindPositionByIndex() throws Exception {
ledger.close();
}

@Test
public void testAddEntryFailed() throws Exception {
final int MOCK_BATCH_SIZE = 2;
final String ledgerAndCursorName = "testAddEntryFailed";

ManagedLedgerInterceptor interceptor =
new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null);

ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setMaxEntriesPerLedger(2);
config.setManagedLedgerInterceptor(interceptor);

ByteBuf buffer = Unpooled.wrappedBuffer("message".getBytes());
ManagedLedger ledger = factory.open(ledgerAndCursorName, config);

ledger.terminate();

ManagedLedgerInterceptorImpl interceptor1 =
(ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor();

CountDownLatch countDownLatch = new CountDownLatch(1);
try {
ledger.asyncAddEntry(buffer, MOCK_BATCH_SIZE, new AsyncCallbacks.AddEntryCallback() {
@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
countDownLatch.countDown();
}

@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
countDownLatch.countDown();
}
}, null);

countDownLatch.await();
assertEquals(interceptor1.getIndex(), -1);
} finally {
ledger.close();
factory.shutdown();
}

}

@Test
public void testBeforeAddEntryWithException() throws Exception {
final int MOCK_BATCH_SIZE = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ public BrokerEntryMetadata interceptWithNumberOfMessages(
public long getIndex() {
return indexGenerator.get();
}

public void decreaseWithNumberOfMessages(int numberOfMessages) {
indexGenerator.addAndGet(-numberOfMessages);
}
}