diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java index f923b61ad50..f2f2fb24769 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java @@ -41,6 +41,7 @@ import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage; import org.apache.bookkeeper.util.ByteBufList; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -228,7 +229,7 @@ static class ClientSideHandler extends ChannelDuplexHandler { final ClientAuthProvider.Factory authProviderFactory; ClientAuthProvider authProvider; final AtomicLong transactionIdGenerator; - final Queue waitingForAuth = new ConcurrentLinkedQueue<>(); + final Queue> waitingForAuth = new ConcurrentLinkedQueue<>(); final ClientConnectionPeer connectionPeer; private final boolean isUsingV2Protocol; @@ -349,7 +350,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) super.write(ctx, msg, promise); super.flush(ctx); } else { - waitingForAuth.add(msg); + waitingForAuth.add(Pair.of(msg, promise)); } } else if (msg instanceof BookieProtocol.Request) { // let auth messages through, queue the rest @@ -358,10 +359,10 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) super.write(ctx, msg, promise); super.flush(ctx); } else { - waitingForAuth.add(msg); + waitingForAuth.add(Pair.of(msg, promise)); } } else if (msg instanceof ByteBuf || msg instanceof ByteBufList) { - waitingForAuth.add(msg); + waitingForAuth.add(Pair.of(msg, promise)); } else { LOG.info("[{}] dropping write of message {}", ctx.channel(), msg); } @@ -427,10 +428,10 @@ public void operationComplete(int rc, Void v) { if (rc == BKException.Code.OK) { synchronized (this) { authenticated = true; - Object msg = waitingForAuth.poll(); - while (msg != null) { - ctx.writeAndFlush(msg); - msg = waitingForAuth.poll(); + Pair pair = waitingForAuth.poll(); + while (pair != null && pair.getKey() != null) { + ctx.writeAndFlush(pair.getKey(), pair.getValue()); + pair = waitingForAuth.poll(); } } } else { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 472c3610afa..cb46a6c0216 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -28,7 +28,6 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufOutputStream; -import io.netty.buffer.CompositeByteBuf; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index bf52c90c468..798495d7755 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -21,6 +21,8 @@ import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID; import com.carrotsearch.hppc.ObjectHashSet; +import com.carrotsearch.hppc.ObjectSet; +import com.carrotsearch.hppc.procedures.ObjectProcedure; import com.google.common.base.Joiner; import com.google.common.collect.Sets; import com.google.protobuf.ByteString; @@ -29,7 +31,6 @@ import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; @@ -78,7 +79,6 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; @@ -176,7 +176,6 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { BKException.Code.DuplicateEntryIdException, BKException.Code.WriteOnReadOnlyBookieException)); private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later. - private static final int DEFAULT_PENDING_REQUEST_SIZE = 1024; private static final int MAX_PENDING_REQUEST_SIZE = 1024 * 1024; private static final AtomicLong txnIdGenerator = new AtomicLong(0); @@ -356,9 +355,8 @@ enum ConnectionState { private volatile boolean isWritable = true; private long lastBookieUnavailableLogTimestamp = 0; private ByteBufList pendingSendRequests = null; - private final Set pendingSendKeys = new HashSet<>(); - private int maxPendingRequestsSize = DEFAULT_PENDING_REQUEST_SIZE; - private volatile Future nextScheduledFlush = null; + private final ObjectSet pendingSendKeys = new ObjectHashSet<>(); + private volatile boolean needFlush = true; public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup, BookieId addr, BookieAddressResolver bookieAddressResolver) throws SecurityException { @@ -1165,13 +1163,9 @@ private void writeAndFlush(final Channel channel, try { if (request instanceof ByteBuf || request instanceof ByteBufList) { - if (prepareSendRequests(request, key)) { + if (prepareSendRequests(request, key) || needFlush) { flushPendingRequests(); } - - if (nextScheduledFlush == null) { - nextScheduledFlush = channel.eventLoop().submit(this::flushPendingRequests); - } } else { final long startTime = MathUtils.nowInNano(); @@ -1210,38 +1204,39 @@ public synchronized boolean prepareSendRequests(Object request, CompletionKey ke } public synchronized void flushPendingRequests() { + if (pendingSendRequests == null) { + needFlush = true; + return; + } + final long startTime = MathUtils.nowInNano(); - Set keys = new ObjectHashSet<>(pendingSendKeys); + ObjectSet keys = new ObjectHashSet<>(pendingSendKeys); ChannelPromise promise = channel.newPromise().addListener(future -> { if (future.isSuccess()) { nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); - for (CompletionKey completionKey : keys) { - CompletionValue completion = completionObjects.get(completionKey); + keys.forEach((ObjectProcedure) k -> { + CompletionValue completion = completionObjects.get(k); if (completion != null) { completion.setOutstanding(); } - } + }); + } else { nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); } + flushPendingRequests(); }); - if (pendingSendRequests != null) { - maxPendingRequestsSize = (int) Math.max( - maxPendingRequestsSize * 0.5 + pendingSendRequests.readableBytes() * 0.5, - DEFAULT_PENDING_REQUEST_SIZE); - - if (channel != null && channel.isActive()) { - channel.writeAndFlush(pendingSendRequests, promise); - } else { - pendingSendRequests.release(); - keys.forEach(key -> errorOut(key, BKException.Code.TooManyRequestsException)); - - } - pendingSendRequests = null; - pendingSendKeys.clear(); + if (channel != null && channel.isActive()) { + channel.writeAndFlush(pendingSendRequests, promise); + } else { + pendingSendRequests.release(); + pendingSendKeys.forEach((ObjectProcedure) key -> + errorOut(key, BKException.Code.TooManyRequestsException)); } - nextScheduledFlush = null; + pendingSendRequests = null; + pendingSendKeys.clear(); + needFlush = false; } void errorOut(final CompletionKey key) { diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java index d0eabe85f66..efa9aa0d7e0 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java @@ -1553,7 +1553,7 @@ public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, NoLedg @Test public void testReadWriteEntry() throws Exception { lh = bkc.createLedgerAdv(1, 1, 1, digestType, ledgerPassword); - numEntriesToWrite = 100; + numEntriesToWrite = 10; List entries = new ArrayList<>(); CountDownLatch latch = new CountDownLatch(numEntriesToWrite); for (int i = 0; i < numEntriesToWrite; ++i) {