Skip to content

Commit

Permalink
Support fetching metadata from entry data in publish callback (apache…
Browse files Browse the repository at this point in the history
…#9257)

### Motivation

[PIP-70](https://github.com/apache/pulsar/wiki/PIP-70%3A-Introduce-lightweight-broker-entry-metadata) introduced lightweight broker entry metadata which is added by related interceptor like `AppendBrokerTimestampMetadataInterceptor` before being written to bookie asynchronously. However, in the callback when entries are written to bookie successfully, the caller side can only get the ledger id and entry id, see `PublishContext#completed`:

```java
void completed(Exception e, long ledgerId, long entryId);
```

Currently there's no way to get the broker entry data in publish callback. We can access the interceptor's internal field but it may be overwritten by the next entry's broker entry metadata because the interceptor changes the internal fields when the entry was sent while the callback is invoked when the send is done.

This PR intends to expose the entry's data to callbacks, including managed ledger's add callback and the publish context's complete callback. If we want to make use of broker entry metadata or the old message metadata in future, we can use the new callbacks.

### Modifications

- Add an extra `ByteBuf` argument to `AddEntryCallback#addComplete` to expose the entry data.
- Add a default method `setMetadataFromEntryData` to `PublishContext` interface to allow implementer side retrieve some metadata from entry data.
- Add tests for `AddEntryCallback#addComplete`.
- Add tests for `PublishContext#setMetadataFromEntryData`.
  • Loading branch information
BewareMyPower authored Jan 25, 2021
1 parent 0e6bbc8 commit b4ee611
Show file tree
Hide file tree
Showing 17 changed files with 100 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;

import io.netty.buffer.ByteBuf;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;

Expand Down Expand Up @@ -64,7 +66,7 @@ interface DeleteCursorCallback {
}

interface AddEntryCallback {
void addComplete(Position position, Object ctx);
void addComplete(Position position, ByteBuf entryData, Object ctx);

void addFailed(ManagedLedgerException exception, Object ctx);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ class Result {

asyncAddEntry(data, offset, length, new AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
result.position = position;
counter.countDown();
}
Expand Down Expand Up @@ -628,7 +628,7 @@ class Result {

asyncAddEntry(data, numberOfMessages, offset, length, new AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
result.position = position;
counter.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,23 +192,24 @@ public void safeRun() {
entry.release();
}

// We are done using the byte buffer
ReferenceCountUtil.release(data);

PositionImpl lastEntry = PositionImpl.get(ledger.getId(), entryId);
ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(ml);
ml.lastConfirmedEntry = lastEntry;

if (closeWhenDone) {
ReferenceCountUtil.release(data);
log.info("[{}] Closing ledger {} for being full", ml.getName(), ledger.getId());
ledger.asyncClose(this, ctx);
} else {
updateLatency();
AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
if (cb != null) {
cb.addComplete(lastEntry, ctx);
cb.addComplete(lastEntry, data.asReadOnly(), ctx);
ReferenceCountUtil.release(data);
ml.notifyCursors();
this.recycle();
} else {
ReferenceCountUtil.release(data);
}
}
}
Expand All @@ -229,7 +230,7 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) {

AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
if (cb != null) {
cb.addComplete(PositionImpl.get(lh.getId(), entryId), ctx);
cb.addComplete(PositionImpl.get(lh.getId(), entryId), null, ctx);
ml.notifyCursors();
this.recycle();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
Expand Down Expand Up @@ -1000,7 +1001,7 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
}

@Override
public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
lastPosition.set(position);
c1.asyncMarkDelete(position, new MarkDeleteCallback() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import io.netty.buffer.ByteBuf;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.client.api.DigestType;
Expand Down Expand Up @@ -484,7 +486,7 @@ public void managedLedgerClosed() throws Exception {
ledger1.asyncAddEntry(("entry-" + i).getBytes(), new AddEntryCallback() {

@Override
public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
latch.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;

import io.netty.buffer.ByteBuf;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
Expand Down Expand Up @@ -332,7 +334,7 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
// not-ok
}

public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
// ok
}
}, null);
Expand All @@ -342,7 +344,7 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
promise.complete(null);
}

public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
promise.completeExceptionally(new Exception("should have failed"));
}
}, null);
Expand Down Expand Up @@ -445,7 +447,7 @@ public void recoverLongTimeAfterMultipleWriteErrors() throws Exception {
AddEntryCallback cb = new AddEntryCallback() {

@Override
public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
counter.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
import static org.testng.Assert.fail;
import com.google.common.base.Charsets;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.nio.ReadOnlyBufferException;
import java.nio.charset.Charset;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
Expand Down Expand Up @@ -295,7 +297,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) {

ledger.asyncAddEntry("test".getBytes(Encoding), new AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
@SuppressWarnings("unchecked")
Pair<ManagedLedger, ManagedCursor> pair = (Pair<ManagedLedger, ManagedCursor>) ctx;
ManagedLedger ledger = pair.getLeft();
Expand Down Expand Up @@ -539,17 +541,34 @@ public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
counter.await();
}

private byte[] copyBytesFromByteBuf(final ByteBuf buf) {
final int index = buf.readerIndex();
final byte[] bytes = new byte[buf.readableBytes()];
buf.getBytes(index, bytes);
buf.readerIndex(index);
return bytes;
}

@Test(timeOut = 20000)
public void asyncAddEntryWithoutError() throws Exception {
ManagedLedger ledger = factory.open("my_test_ledger");
ledger.openCursor("test-cursor");

final CountDownLatch counter = new CountDownLatch(1);

ledger.asyncAddEntry("dummy-entry-1".getBytes(Encoding), new AddEntryCallback() {
final byte[] bytes = "dummy-entry-1".getBytes(Encoding);
ledger.asyncAddEntry(bytes, new AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
assertNull(ctx);
assertEquals(copyBytesFromByteBuf(entryData), bytes);

// `entryData` is read-only so that write related methods will throw ReadOnlyBufferException
try {
entryData.array();
} catch (Exception e) {
assertTrue(e instanceof ReadOnlyBufferException);
}

counter.countDown();
}
Expand Down Expand Up @@ -577,8 +596,9 @@ public void doubleAsyncAddEntryWithoutError() throws Exception {
final String content = "dummy-entry-" + i;
ledger.asyncAddEntry(content.getBytes(Encoding), new AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
assertNotNull(ctx);
assertEquals(copyBytesFromByteBuf(entryData), content.getBytes(Encoding));

log.info("Successfully added {}", content);
done.countDown();
Expand Down Expand Up @@ -607,7 +627,7 @@ public void asyncAddEntryWithError() throws Exception {

ledger.asyncAddEntry("dummy-entry-1".getBytes(Encoding), new AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
fail("Should have failed");
}

Expand Down Expand Up @@ -832,7 +852,7 @@ public void testAsyncAddEntryAndSyncClose() throws Exception {
String content = "entry-" + i;
ledger.asyncAddEntry(content.getBytes(Encoding), new AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
counter.countDown();
}

Expand Down Expand Up @@ -2615,7 +2635,7 @@ public void asyncClose(org.apache.bookkeeper.client.AsyncCallback.CloseCallback
ledger.asyncAddEntry(data.getBytes(), new AddEntryCallback() {

@Override
public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
addSuccess.set(true);
latch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import io.netty.buffer.ByteBuf;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.MarkDeleteCallback;
Expand Down Expand Up @@ -478,7 +480,7 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
}

@Override
public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
lastPosition.set(position);
c1.asyncMarkDelete(position, new MarkDeleteCallback() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ default long getOriginalSequenceId() {

void completed(Exception e, long ledgerId, long entryId);

default void setMetadataFromEntryData(ByteBuf entryData) {
}

default long getHighestSequenceId() {
return -1L;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,12 +449,13 @@ private void decrementPendingWriteOpsAndCheck() {
}

@Override
public void addComplete(Position pos, Object ctx) {
public void addComplete(Position pos, ByteBuf entryData, Object ctx) {
PublishContext publishContext = (PublishContext) ctx;
PositionImpl position = (PositionImpl) pos;

// Message has been successfully persisted
messageDeduplication.recordMessagePersisted(publishContext, position);
publishContext.setMetadataFromEntryData(entryData);
publishContext.completed(null, position.getLedgerId(), position.getEntryId());
// in order to sync the max position when cursor read entries
transactionBuffer.syncMaxReadPositionForNormalPublish((PositionImpl) ledger.getLastConfirmedEntry());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceI
CompletableFuture<Position> completableFuture = new CompletableFuture<>();
topic.getManagedLedger().asyncAddEntry(buffer, new AsyncCallbacks.AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
synchronized (TopicTransactionBuffer.this) {
if (!ongoingTxns.containsKey(txnId)) {
ongoingTxns.put(txnId, (PositionImpl) position);
Expand Down Expand Up @@ -99,7 +99,7 @@ public CompletableFuture<Void> commitTxn(TxnID txnID, long lowWaterMark) {

topic.getManagedLedger().asyncAddEntry(commitMarker, new AsyncCallbacks.AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
synchronized (TopicTransactionBuffer.this) {
updateMaxReadPosition(txnID);
handleLowWaterMark(txnID, lowWaterMark);
Expand All @@ -126,7 +126,7 @@ public CompletableFuture<Void> abortTxn(TxnID txnID, long lowWaterMark) {
ByteBuf abortMarker = Markers.newTxnAbortMarker(-1L, txnID.getMostSigBits(), txnID.getLeastSigBits());
topic.getManagedLedger().asyncAddEntry(abortMarker, new AsyncCallbacks.AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
synchronized (TopicTransactionBuffer.this) {
aborts.put(txnID, (PositionImpl) position);
updateMaxReadPosition(txnID);
Expand All @@ -152,7 +152,7 @@ private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {
txnID.getMostSigBits(), txnID.getLeastSigBits());
topic.getManagedLedger().asyncAddEntry(abortMarker, new AsyncCallbacks.AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
synchronized (TopicTransactionBuffer.this) {
aborts.put(firstTxn, (PositionImpl) position);
updateMaxReadPosition(firstTxn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,8 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(1, 1), null);
((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(
new PositionImpl(1, 1), null, null);
return null;
}
}).when(ledgerMock).asyncAddEntry(any(byte[].class), any(AddEntryCallback.class), any());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,14 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
@Test
public void testPublishMessage() throws Exception {

doAnswer(invocationOnMock -> {
final ByteBuf payload = (ByteBuf) invocationOnMock.getArguments()[0];
final AddEntryCallback callback = (AddEntryCallback) invocationOnMock.getArguments()[1];
final Topic.PublishContext ctx = (Topic.PublishContext) invocationOnMock.getArguments()[2];
callback.addComplete(PositionImpl.latest, payload, ctx);
return null;
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), any(AddEntryCallback.class), any());

PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
/*
* MessageMetadata.Builder messageMetadata = MessageMetadata.newBuilder();
Expand All @@ -299,9 +307,23 @@ public void testPublishMessage() throws Exception {

final CountDownLatch latch = new CountDownLatch(1);

topic.publishMessage(payload, (exception, ledgerId, entryId) -> {
latch.countDown();
});
final Topic.PublishContext publishContext = new Topic.PublishContext() {
@Override
public void completed(Exception e, long ledgerId, long entryId) {
assertEquals(ledgerId, PositionImpl.latest.getLedgerId());
assertEquals(entryId, PositionImpl.latest.getEntryId());
latch.countDown();
}

@Override
public void setMetadataFromEntryData(ByteBuf entryData) {
// This method must be invoked before `completed`
assertEquals(latch.getCount(), 1);
assertEquals(entryData.array(), payload.array());
}
};

topic.publishMessage(payload, publishContext);

assertTrue(latch.await(1, TimeUnit.SECONDS));
}
Expand Down Expand Up @@ -1235,6 +1257,7 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(1, 1),
null,
invocationOnMock.getArguments()[2]);
return null;
}
Expand Down Expand Up @@ -1717,7 +1740,7 @@ public void testBacklogCursor() throws Exception {
ByteBuf entry = getMessageWithMetadata(content.getBytes());
ledger.asyncAddEntry(entry, new AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
latch.countDown();
entry.release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1531,7 +1531,9 @@ public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(new PositionImpl(-1, -1),
((AddEntryCallback) invocationOnMock.getArguments()[1]).addComplete(
new PositionImpl(-1, -1),
null,
invocationOnMock.getArguments()[2]);
return null;
}
Expand Down
Loading

0 comments on commit b4ee611

Please sign in to comment.