Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Client] Group writing into the channel in PerChannelBookieClient #3886

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -228,7 +229,7 @@ static class ClientSideHandler extends ChannelDuplexHandler {
final ClientAuthProvider.Factory authProviderFactory;
ClientAuthProvider authProvider;
final AtomicLong transactionIdGenerator;
final Queue<Object> waitingForAuth = new ConcurrentLinkedQueue<>();
final Queue<Pair<Object, ChannelPromise>> waitingForAuth = new ConcurrentLinkedQueue<>();
final ClientConnectionPeer connectionPeer;

private final boolean isUsingV2Protocol;
Expand Down Expand Up @@ -349,7 +350,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
super.write(ctx, msg, promise);
super.flush(ctx);
} else {
waitingForAuth.add(msg);
waitingForAuth.add(Pair.of(msg, promise));
}
} else if (msg instanceof BookieProtocol.Request) {
// let auth messages through, queue the rest
Expand All @@ -358,10 +359,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
super.write(ctx, msg, promise);
super.flush(ctx);
} else {
waitingForAuth.add(msg);
waitingForAuth.add(Pair.of(msg, promise));
}
} else if (msg instanceof ByteBuf || msg instanceof ByteBufList) {
waitingForAuth.add(msg);
waitingForAuth.add(Pair.of(msg, promise));
} else {
LOG.info("[{}] dropping write of message {}", ctx.channel(), msg);
}
Expand Down Expand Up @@ -427,10 +428,10 @@ public void operationComplete(int rc, Void v) {
if (rc == BKException.Code.OK) {
synchronized (this) {
authenticated = true;
Object msg = waitingForAuth.poll();
while (msg != null) {
ctx.writeAndFlush(msg);
msg = waitingForAuth.poll();
Pair<Object, ChannelPromise> pair = waitingForAuth.poll();
while (pair != null && pair.getKey() != null) {
ctx.writeAndFlush(pair.getKey(), pair.getValue());
pair = waitingForAuth.poll();
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,14 @@ private static byte[] readMasterKey(ByteBuf packet) {

return masterKey;
}

public static void serializeAddRequests(Object request, ByteBufList buf) {
if (request instanceof ByteBuf) {
buf.add((ByteBuf) request);
} else if (request instanceof ByteBufList) {
buf.add((ByteBufList) request);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else throw IllegalStateException ?

}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;

import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.ObjectSet;
import com.carrotsearch.hppc.procedures.ObjectProcedure;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
Expand Down Expand Up @@ -173,6 +176,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
BKException.Code.DuplicateEntryIdException,
BKException.Code.WriteOnReadOnlyBookieException));
private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later.

private static final AtomicLong txnIdGenerator = new AtomicLong(0);

final BookieId bookieId;
Expand Down Expand Up @@ -349,6 +353,9 @@ enum ConnectionState {
private final SecurityHandlerFactory shFactory;
private volatile boolean isWritable = true;
private long lastBookieUnavailableLogTimestamp = 0;
private ByteBufList pendingSendRequests = null;
private final ObjectSet<CompletionKey> pendingSendKeys = new ObjectHashSet<>();
private volatile boolean needFlush = true;

public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup,
BookieId addr, BookieAddressResolver bookieAddressResolver) throws SecurityException {
Expand Down Expand Up @@ -1154,26 +1161,96 @@ private void writeAndFlush(final Channel channel,
}

try {
final long startTime = MathUtils.nowInNano();

ChannelPromise promise = channel.newPromise().addListener(future -> {
if (future.isSuccess()) {
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
CompletionValue completion = completionObjects.get(key);
if (completion != null) {
completion.setOutstanding();
}
} else {
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
if (request instanceof ByteBuf || request instanceof ByteBufList) {
if (checkFlushPendingRequests(request)) {
flushPendingRequests();
}
});
channel.writeAndFlush(request, promise);
prepareSendRequests(request, key);
if (needFlush) {
flushPendingRequests();
}
} else {
final long startTime = MathUtils.nowInNano();

// promise complete trigger flush pending request.
ChannelPromise promise = channel.newPromise().addListener(future -> {
if (future.isSuccess()) {
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
CompletionValue completion = completionObjects.get(key);
if (completion != null) {
completion.setOutstanding();
}
} else {
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
});
channel.writeAndFlush(request, promise);
}
} catch (Throwable e) {
LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
errorOut(key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The catch code block is not suitable for the new logic. It just handle the single key case, and didn't handle the pending keys case.
And in the try code block, it looks like there is no exception that will be thrown. I think we can remove the try catch block.

}
}

public synchronized void prepareSendRequests(Object request, CompletionKey key) {
if (pendingSendRequests == null) {
if (request instanceof ByteBuf) {
pendingSendRequests = ByteBufList.get((ByteBuf) request);
} else if (request instanceof ByteBufList) {
pendingSendRequests = ByteBufList.get((ByteBufList) request);
}
} else {
BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests);
}
pendingSendKeys.add(key);
}

public synchronized boolean checkFlushPendingRequests(Object request) {
if (pendingSendRequests == null) {
return false;
}

int numBytes = request instanceof ByteBuf
? ((ByteBuf) request).readableBytes() : ((ByteBufList) request).readableBytes();
return pendingSendRequests.readableBytes() + numBytes > maxFrameSize;
}

public synchronized void flushPendingRequests() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flushPendingRequests trigger at every millisecond. It may exacerbate the lock race between prepareSendRequests and flushPendingRequests.

Copy link
Member

@horizonzy horizonzy Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we can refer to the implements of writeCache and writeCacheBeingFlushed in the SingleDirectoryDbLedgerStorage to make flushPendingRequests lock-free.

if (pendingSendRequests == null) {
needFlush = true;
return;
}

final long startTime = MathUtils.nowInNano();
ObjectSet<CompletionKey> keys = new ObjectHashSet<>(pendingSendKeys);
ChannelPromise promise = channel.newPromise().addListener(future -> {
if (future.isSuccess()) {
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
keys.forEach((ObjectProcedure<? super CompletionKey>) k -> {
CompletionValue completion = completionObjects.get(k);
if (completion != null) {
completion.setOutstanding();
}
});

} else {
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
flushPendingRequests();
});

if (channel != null && channel.isActive()) {
channel.writeAndFlush(pendingSendRequests, promise);
} else {
pendingSendRequests.release();
pendingSendKeys.forEach((ObjectProcedure<? super CompletionKey>) key ->
errorOut(key, BKException.Code.TooManyRequestsException));
}
pendingSendRequests = null;
pendingSendKeys.clear();
needFlush = false;
}

void errorOut(final CompletionKey key) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing completion key: {}", key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,15 @@ public static ByteBufList get(ByteBuf b1) {
return buf;
}

public static ByteBufList get(ByteBufList b1) {
ByteBufList buf = get();
for (int i = 0; i < b1.buffers.size(); ++i) {
buf.add(b1.buffers.get(i));
}
return buf;
}


/**
* Get a new {@link ByteBufList} instance from the pool that is the clone of an already existing instance.
*/
Expand Down Expand Up @@ -149,6 +158,10 @@ public void add(ByteBuf buf) {
}
}

public void add(ByteBufList b1) {
buffers.addAll(b1.buffers);
}

/**
* Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}.
*/
Expand Down Expand Up @@ -276,6 +289,13 @@ public static ByteBuf coalesce(ByteBufList list) {
return res;
}

public void writeTo(ByteBuf buf) {
for (int i = 0; i < buffers.size(); ++i) {
ByteBuf b = buffers.get(i);
buf.writeBytes(b, b.readerIndex(), b.readableBytes());
}
}

@Override
public ByteBufList retain() {
super.retain();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ public SyncObj() {
public void setUp() throws Exception {
baseConf.setJournalWriteData(writeJournal);
baseClientConf.setUseV2WireProtocol(useV2);

super.setUp();
rng = new Random(0); // Initialize the Random
// Number Generator
Expand Down Expand Up @@ -1550,4 +1549,29 @@ public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, NoLedg
}

}

@Test
public void testReadWriteEntry() throws Exception {
lh = bkc.createLedgerAdv(1, 1, 1, digestType, ledgerPassword);
numEntriesToWrite = 10000;
List<byte[]> entries = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(numEntriesToWrite);
for (int i = 0; i < numEntriesToWrite; ++i) {
ByteBuffer entry = ByteBuffer.allocate(4);
entry.putInt(rng.nextInt(maxInt));
entry.position(0);
entries.add(entry.array());
lh.asyncAddEntry(i, entry.array(), new AddCallback() {
@Override
public void addComplete(int rc, LedgerHandle lh, long entryId, Object ctx) {
assertEquals(0, rc);
latch.countDown();
}
}, null);
}
latch.await();
readEntries(lh, entries);
lh.close();

}
}