Skip to content

Commit

Permalink
use new impl
Browse files Browse the repository at this point in the history
  • Loading branch information
hangc0276 committed Apr 2, 2023
1 parent 0d09e8e commit 00567fd
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
import org.apache.bookkeeper.util.ByteBufList;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -228,7 +229,7 @@ static class ClientSideHandler extends ChannelDuplexHandler {
final ClientAuthProvider.Factory authProviderFactory;
ClientAuthProvider authProvider;
final AtomicLong transactionIdGenerator;
final Queue<Object> waitingForAuth = new ConcurrentLinkedQueue<>();
final Queue<Pair<Object, ChannelPromise>> waitingForAuth = new ConcurrentLinkedQueue<>();
final ClientConnectionPeer connectionPeer;

private final boolean isUsingV2Protocol;
Expand Down Expand Up @@ -349,7 +350,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
super.write(ctx, msg, promise);
super.flush(ctx);
} else {
waitingForAuth.add(msg);
waitingForAuth.add(Pair.of(msg, promise));
}
} else if (msg instanceof BookieProtocol.Request) {
// let auth messages through, queue the rest
Expand All @@ -358,10 +359,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
super.write(ctx, msg, promise);
super.flush(ctx);
} else {
waitingForAuth.add(msg);
waitingForAuth.add(Pair.of(msg, promise));
}
} else if (msg instanceof ByteBuf || msg instanceof ByteBufList) {
waitingForAuth.add(msg);
waitingForAuth.add(Pair.of(msg, promise));
} else {
LOG.info("[{}] dropping write of message {}", ctx.channel(), msg);
}
Expand Down Expand Up @@ -427,10 +428,10 @@ public void operationComplete(int rc, Void v) {
if (rc == BKException.Code.OK) {
synchronized (this) {
authenticated = true;
Object msg = waitingForAuth.poll();
while (msg != null) {
ctx.writeAndFlush(msg);
msg = waitingForAuth.poll();
Pair<Object, ChannelPromise> pair = waitingForAuth.poll();
while (pair != null && pair.getKey() != null) {
ctx.writeAndFlush(pair.getKey(), pair.getValue());
pair = waitingForAuth.poll();
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;

import com.carrotsearch.hppc.ObjectHashSet;
import com.carrotsearch.hppc.ObjectSet;
import com.carrotsearch.hppc.procedures.ObjectProcedure;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
Expand All @@ -29,7 +31,6 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -78,7 +79,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
Expand Down Expand Up @@ -176,7 +176,6 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
BKException.Code.DuplicateEntryIdException,
BKException.Code.WriteOnReadOnlyBookieException));
private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later.
private static final int DEFAULT_PENDING_REQUEST_SIZE = 1024;

private static final int MAX_PENDING_REQUEST_SIZE = 1024 * 1024;
private static final AtomicLong txnIdGenerator = new AtomicLong(0);
Expand Down Expand Up @@ -356,9 +355,8 @@ enum ConnectionState {
private volatile boolean isWritable = true;
private long lastBookieUnavailableLogTimestamp = 0;
private ByteBufList pendingSendRequests = null;
private final Set<CompletionKey> pendingSendKeys = new HashSet<>();
private int maxPendingRequestsSize = DEFAULT_PENDING_REQUEST_SIZE;
private volatile Future<?> nextScheduledFlush = null;
private final ObjectSet<CompletionKey> pendingSendKeys = new ObjectHashSet<>();
private volatile boolean needFlush = true;

public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup,
BookieId addr, BookieAddressResolver bookieAddressResolver) throws SecurityException {
Expand Down Expand Up @@ -1165,13 +1163,9 @@ private void writeAndFlush(final Channel channel,

try {
if (request instanceof ByteBuf || request instanceof ByteBufList) {
if (prepareSendRequests(request, key)) {
if (prepareSendRequests(request, key) || needFlush) {
flushPendingRequests();
}

if (nextScheduledFlush == null) {
nextScheduledFlush = channel.eventLoop().submit(this::flushPendingRequests);
}
} else {
final long startTime = MathUtils.nowInNano();

Expand Down Expand Up @@ -1210,38 +1204,39 @@ public synchronized boolean prepareSendRequests(Object request, CompletionKey ke
}

public synchronized void flushPendingRequests() {
if (pendingSendRequests == null) {
needFlush = true;
return;
}

final long startTime = MathUtils.nowInNano();
Set<CompletionKey> keys = new ObjectHashSet<>(pendingSendKeys);
ObjectSet<CompletionKey> keys = new ObjectHashSet<>(pendingSendKeys);
ChannelPromise promise = channel.newPromise().addListener(future -> {
if (future.isSuccess()) {
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
for (CompletionKey completionKey : keys) {
CompletionValue completion = completionObjects.get(completionKey);
keys.forEach((ObjectProcedure<? super CompletionKey>) k -> {
CompletionValue completion = completionObjects.get(k);
if (completion != null) {
completion.setOutstanding();
}
}
});

} else {
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
flushPendingRequests();
});

if (pendingSendRequests != null) {
maxPendingRequestsSize = (int) Math.max(
maxPendingRequestsSize * 0.5 + pendingSendRequests.readableBytes() * 0.5,
DEFAULT_PENDING_REQUEST_SIZE);

if (channel != null && channel.isActive()) {
channel.writeAndFlush(pendingSendRequests, promise);
} else {
pendingSendRequests.release();
keys.forEach(key -> errorOut(key, BKException.Code.TooManyRequestsException));

}
pendingSendRequests = null;
pendingSendKeys.clear();
if (channel != null && channel.isActive()) {
channel.writeAndFlush(pendingSendRequests, promise);
} else {
pendingSendRequests.release();
pendingSendKeys.forEach((ObjectProcedure<? super CompletionKey>) key ->
errorOut(key, BKException.Code.TooManyRequestsException));
}
nextScheduledFlush = null;
pendingSendRequests = null;
pendingSendKeys.clear();
needFlush = false;
}

void errorOut(final CompletionKey key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1553,7 +1553,7 @@ public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, NoLedg
@Test
public void testReadWriteEntry() throws Exception {
lh = bkc.createLedgerAdv(1, 1, 1, digestType, ledgerPassword);
numEntriesToWrite = 100;
numEntriesToWrite = 10;
List<byte[]> entries = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(numEntriesToWrite);
for (int i = 0; i < numEntriesToWrite; ++i) {
Expand Down

0 comments on commit 00567fd

Please sign in to comment.