Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][broker] Don't call ManagedLedger#asyncAddEntry in Netty I/O thread #23983

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.NavigableMap;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Predicate;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
Expand Down Expand Up @@ -76,6 +77,8 @@ public interface ManagedLedger {
* data entry to be persisted
* @return the Position at which the entry has been inserted
* @throws ManagedLedgerException
* @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all
* {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order.
*/
Position addEntry(byte[] data) throws InterruptedException, ManagedLedgerException;

Expand All @@ -88,6 +91,8 @@ public interface ManagedLedger {
* numberOfMessages of entry
* @return the Position at which the entry has been inserted
* @throws ManagedLedgerException
* @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all
* {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order.
*/
Position addEntry(byte[] data, int numberOfMessages) throws InterruptedException, ManagedLedgerException;

Expand All @@ -102,6 +107,8 @@ public interface ManagedLedger {
* callback object
* @param ctx
* opaque context
* @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all
* {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order.
*/
void asyncAddEntry(byte[] data, AddEntryCallback callback, Object ctx);

Expand All @@ -116,6 +123,8 @@ public interface ManagedLedger {
* number of bytes
* @return the Position at which the entry has been inserted
* @throws ManagedLedgerException
* @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all
* {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order.
*/
Position addEntry(byte[] data, int offset, int length) throws InterruptedException, ManagedLedgerException;

Expand All @@ -132,6 +141,8 @@ public interface ManagedLedger {
* number of bytes
* @return the Position at which the entry has been inserted
* @throws ManagedLedgerException
* @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all
* {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order.
*/
Position addEntry(byte[] data, int numberOfMessages, int offset, int length) throws InterruptedException,
ManagedLedgerException;
Expand All @@ -150,6 +161,8 @@ Position addEntry(byte[] data, int numberOfMessages, int offset, int length) thr
* callback object
* @param ctx
* opaque context
* @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all
* {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order.
*/
void asyncAddEntry(byte[] data, int offset, int length, AddEntryCallback callback, Object ctx);

Expand All @@ -169,6 +182,8 @@ Position addEntry(byte[] data, int numberOfMessages, int offset, int length) thr
* callback object
* @param ctx
* opaque context
* @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all
* {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order.
*/
void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, AddEntryCallback callback,
Object ctx);
Expand All @@ -184,6 +199,8 @@ void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, Ad
* callback object
* @param ctx
* opaque context
* @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all
* {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order.
*/
void asyncAddEntry(ByteBuf buffer, AddEntryCallback callback, Object ctx);

Expand All @@ -199,6 +216,8 @@ void asyncAddEntry(byte[] data, int numberOfMessages, int offset, int length, Ad
* callback object
* @param ctx
* opaque context
* @apiNote This method is not guaranteed to be thread safe. The caller should be responsible to call all
* {@link ManagedLedger#asyncAddEntry} and {@link ManagedLedger#addEntry} overloads in order.
*/
void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback callback, Object ctx);

Expand Down Expand Up @@ -733,4 +752,13 @@ default CompletableFuture<Position> getLastDispatchablePosition(final Predicate<
}

Position getFirstPosition();

/**
* In the internal implementation of a managed ledger, it is reasonable to use a single-thread executor to execute
* tasks that need to be performed in order. By exposing this internal executor, the caller can synchronize its
* custom tasks, as well as the internal tasks.
*/
default Executor getExecutor() {
return Runnable::run;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -798,34 +798,30 @@ public void asyncAddEntry(ByteBuf buffer, int numberOfMessages, AddEntryCallback
log.debug("[{}] asyncAddEntry size={} state={}", name, buffer.readableBytes(), state);
}

// retain buffer in this thread
// The buffer will be queued in `pendingAddEntries` and might be polled later in a different thread. However,
// the caller could release it after this method returns. To ensure the buffer is not released when it's polled,
// increase the reference count, which should be decreased by `OpAddEntry`'s methods later.
buffer.retain();

// Jump to specific thread to avoid contention from writers writing from different threads
final var addOperation = OpAddEntry.createNoRetainBuffer(this, buffer, numberOfMessages, callback, ctx,
currentLedgerTimeoutTriggered);
var added = false;
try {
// Use synchronized to ensure if `addOperation` is added to queue and fails later, it will be the first
// element in `pendingAddEntries`.
synchronized (this) {
if (managedLedgerInterceptor != null) {
managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages());
}
final var state = STATE_UPDATER.get(this);
beforeAddEntryToQueue(state);
pendingAddEntries.add(addOperation);
added = true;
afterAddEntryToQueue(state, addOperation);
if (managedLedgerInterceptor != null) {
managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages());
}
beforeAddEntryToQueue();
pendingAddEntries.add(addOperation);
added = true;
afterAddEntryToQueue(addOperation);
} catch (Throwable throwable) {
if (!added) {
addOperation.failed(ManagedLedgerException.getManagedLedgerException(throwable));
} // else: all elements of `pendingAddEntries` will fail in another thread
}
}

protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException {
protected void beforeAddEntryToQueue() throws ManagedLedgerException {
final var state = STATE_UPDATER.get(this);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using the STATE_UPDATER doesn't make any difference for plain reads of the field value.

if (state.isFenced()) {
throw new ManagedLedgerFencedException();
}
Expand All @@ -836,7 +832,9 @@ protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException
}
}

protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException {
// TODO: does this method really need to be synchronized?
protected synchronized void afterAddEntryToQueue(OpAddEntry addOperation) throws ManagedLedgerException {
final var state = STATE_UPDATER.get(this);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

using the STATE_UPDATER doesn't make any difference for plain reads of the field value.

if (state == State.ClosingLedger || state == State.CreatingLedger) {
// We don't have a ready ledger to write into
// We are waiting for a new ledger to be created
Expand Down Expand Up @@ -895,22 +893,6 @@ protected void afterFailedAddEntry(int numOfMessages) {
managedLedgerInterceptor.afterFailedAddEntry(numOfMessages);
}

protected boolean beforeAddEntry(OpAddEntry addOperation) {
// if no interceptor, just return true to make sure addOperation will be initiate()
if (managedLedgerInterceptor == null) {
return true;
}
try {
managedLedgerInterceptor.beforeAddEntry(addOperation, addOperation.getNumberOfMessages());
return true;
} catch (Exception e) {
addOperation.failed(
new ManagedLedgerInterceptException("Interceptor managed ledger before add to bookie failed."));
log.error("[{}] Failed to intercept adding an entry to bookie.", name, e);
return false;
}
}

@Override
public void readyToCreateNewLedger() {
// only set transition state to ClosedLedger if current state is WriteFailed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,14 @@ private void initLastConfirmedEntry() {
}

@Override
protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException {
if (state != State.LedgerOpened) {
protected void beforeAddEntryToQueue() throws ManagedLedgerException {
if (STATE_UPDATER.get(this) != State.LedgerOpened) {
throw new ManagedLedgerException("Managed ledger is not opened");
}
}

@Override
protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException {
protected void afterAddEntryToQueue(OpAddEntry addOperation) throws ManagedLedgerException {
if (addOperation.getCtx() == null || !(addOperation.getCtx() instanceof Position position)) {
pendingAddEntries.poll();
throw new ManagedLedgerException("Illegal addOperation context object.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,10 @@ public void verifyAsyncReadEntryUsingCache() throws Exception {
// wait for all threads to be ready to start at once
barrier.await();
while (!done.get()) {
Position position = ledger.addEntry("entry".getBytes());
Position position;
synchronized (this) {
position = ledger.addEntry("entry".getBytes());
}
positions.add(position);
Thread.sleep(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,21 @@ public void updateSubscribeRateLimiter() {
}

private void asyncAddEntry(ByteBuf headersAndPayload, PublishContext publishContext) {
ledger.asyncAddEntry(headersAndPayload,
(int) publishContext.getNumberOfMessages(), this, publishContext);
// Retain the buffer in advance to avoid the buffer might have been released when it's passed to `asyncAddEntry`
final var buffer = headersAndPayload.retain();
try {
ledger.getExecutor().execute(() -> {
try {
ledger.asyncAddEntry(buffer, (int) publishContext.getNumberOfMessages(), this, publishContext);
} finally {
buffer.release();
}
});
} catch (Exception e) {
buffer.release(); // decrease the reference count retained at the beginning of this method
buffer.release(); // release the original headersAndPayload since it won't be used anymore
publishContext.completed(e, -1L, -1L);
}
}

public void asyncReadEntry(Position position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
Expand All @@ -72,6 +73,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -296,16 +298,36 @@ public void testCreateTopicMLFailure() {

@Test
public void testPublishMessage() throws Exception {

// Only allow at most 1 pending task
final var mlIOExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1));
doAnswer(invocationOnMock -> {
final ByteBuf payload = (ByteBuf) invocationOnMock.getArguments()[0];
final AddEntryCallback callback = (AddEntryCallback) invocationOnMock.getArguments()[2];
final Topic.PublishContext ctx = (Topic.PublishContext) invocationOnMock.getArguments()[3];
callback.addComplete(PositionFactory.LATEST, payload, ctx);
mlIOExecutor.execute(() -> {
callback.addComplete(PositionFactory.LATEST, payload, ctx);
payload.release();
});
return null;
}).when(ledgerMock).asyncAddEntry(any(ByteBuf.class), anyInt(), any(AddEntryCallback.class), any());
doReturn(mlIOExecutor).when(ledgerMock).getExecutor();

PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, brokerService);
verifyMessagePublish(topic, true);

mlIOExecutor.execute(() -> {
try {
Thread.sleep(30000);
} catch (InterruptedException ignored) {
}
});
mlIOExecutor.execute(() -> {}); // make the work queue full
verifyMessagePublish(topic, false);

mlIOExecutor.shutdown();
}

private void verifyMessagePublish(PersistentTopic topic, boolean success) throws Exception {
long lastMaxReadPositionMovedForwardTimestamp = topic.getLastMaxReadPositionMovedForwardTimestamp();

/*
Expand All @@ -315,27 +337,35 @@ public void testPublishMessage() throws Exception {
*/
ByteBuf payload = Unpooled.wrappedBuffer("content".getBytes());

final CountDownLatch latch = new CountDownLatch(1);

final var future = new CompletableFuture<Position>();
final Topic.PublishContext publishContext = new Topic.PublishContext() {
@Override
public void completed(Exception e, long ledgerId, long entryId) {
assertEquals(ledgerId, PositionFactory.LATEST.getLedgerId());
assertEquals(entryId, PositionFactory.LATEST.getEntryId());
latch.countDown();
if (e == null) {
future.complete(PositionFactory.create(ledgerId, entryId));
} else {
future.completeExceptionally(e);
}
}

@Override
public void setMetadataFromEntryData(ByteBuf entryData) {
// This method must be invoked before `completed`
assertEquals(latch.getCount(), 1);
assertFalse(future.isDone());
assertEquals(entryData.array(), payload.array());
}
};
topic.publishMessage(payload, publishContext);

assertTrue(latch.await(1, TimeUnit.SECONDS));
assertTrue(topic.getLastMaxReadPositionMovedForwardTimestamp() > lastMaxReadPositionMovedForwardTimestamp);
try {
assertEquals(future.get(3, TimeUnit.SECONDS), PositionFactory.LATEST);
assertTrue(topic.getLastMaxReadPositionMovedForwardTimestamp() > lastMaxReadPositionMovedForwardTimestamp);
assertTrue(success);
} catch (ExecutionException ignored) {
assertFalse(success);
}

Awaitility.await().atMost(1, TimeUnit.SECONDS).untilAsserted(() -> assertEquals(payload.refCnt(), 0));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -2922,6 +2923,7 @@ private void setupMLAsyncCallbackMocks() {
cursorMock = mock(ManagedCursor.class);
doReturn(new ArrayList<>()).when(ledgerMock).getCursors();
doReturn(new ManagedLedgerConfig()).when(ledgerMock).getConfig();
doReturn((Executor) Runnable::run).when(ledgerMock).getExecutor();

// call openLedgerComplete with ledgerMock on ML factory asyncOpen
doAnswer((Answer<Object>) invocationOnMock -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
Expand Down Expand Up @@ -252,6 +253,7 @@ public void testIsDuplicateWithFailure() {
ManagedLedger managedLedger = mock(ManagedLedger.class);
MessageDeduplication messageDeduplication = spy(new MessageDeduplication(pulsarService, mock(PersistentTopic.class), managedLedger));
doReturn(true).when(messageDeduplication).isEnabled();
doReturn((Executor) Runnable::run).when(managedLedger).getExecutor();


EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
Expand Down
Loading