Skip to content

Commit

Permalink
tmp
Browse files Browse the repository at this point in the history
  • Loading branch information
hangc0276 committed Apr 1, 2023
1 parent 1891fe2 commit 0d09e8e
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
Expand Down Expand Up @@ -216,15 +217,11 @@ private static byte[] readMasterKey(ByteBuf packet) {
return masterKey;
}

public static void serializeAddRequests(Object request, ByteBuf buf) {
public static void serializeAddRequests(Object request, ByteBufList buf) {
if (request instanceof ByteBuf) {
ByteBuf r = (ByteBuf) request;
buf.writeBytes(r);
r.release();
buf.add((ByteBuf) request);
} else if (request instanceof ByteBufList) {
ByteBufList list = (ByteBufList) request;
list.writeTo(buf);
list.release();
buf.add((ByteBufList) request);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static org.apache.bookkeeper.client.LedgerHandle.INVALID_ENTRY_ID;

import com.carrotsearch.hppc.ObjectHashSet;
import com.google.common.base.Joiner;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
Expand All @@ -28,6 +29,7 @@
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -353,7 +355,7 @@ enum ConnectionState {
private final SecurityHandlerFactory shFactory;
private volatile boolean isWritable = true;
private long lastBookieUnavailableLogTimestamp = 0;
private ByteBuf pendingSendRequests = null;
private ByteBufList pendingSendRequests = null;
private final Set<CompletionKey> pendingSendKeys = new HashSet<>();
private int maxPendingRequestsSize = DEFAULT_PENDING_REQUEST_SIZE;
private volatile Future<?> nextScheduledFlush = null;
Expand Down Expand Up @@ -1163,17 +1165,17 @@ private void writeAndFlush(final Channel channel,

try {
if (request instanceof ByteBuf || request instanceof ByteBufList) {
if (prepareSendRequests(channel, request, key)) {
if (prepareSendRequests(request, key)) {
flushPendingRequests();
}

if (nextScheduledFlush == null) {
nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,
1, 1, TimeUnit.MILLISECONDS);
nextScheduledFlush = channel.eventLoop().submit(this::flushPendingRequests);
}
} else {
final long startTime = MathUtils.nowInNano();

// promise complete trigger flush pending request.
ChannelPromise promise = channel.newPromise().addListener(future -> {
if (future.isSuccess()) {
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
Expand All @@ -1193,18 +1195,23 @@ private void writeAndFlush(final Channel channel,
}
}

public synchronized boolean prepareSendRequests(Channel channel, Object request, CompletionKey key) {
public synchronized boolean prepareSendRequests(Object request, CompletionKey key) {
if (pendingSendRequests == null) {
pendingSendRequests = channel.alloc().directBuffer(maxPendingRequestsSize);
if (request instanceof ByteBuf) {
pendingSendRequests = ByteBufList.get((ByteBuf) request);
} else if (request instanceof ByteBufList) {
pendingSendRequests = ByteBufList.get((ByteBufList) request);
}
} else {
BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests);
}
BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests);
pendingSendKeys.add(key);
return pendingSendRequests.readableBytes() > MAX_PENDING_REQUEST_SIZE;
}

public synchronized void flushPendingRequests() {
final long startTime = MathUtils.nowInNano();
Set<CompletionKey> keys = new HashSet<>(pendingSendKeys);
Set<CompletionKey> keys = new ObjectHashSet<>(pendingSendKeys);
ChannelPromise promise = channel.newPromise().addListener(future -> {
if (future.isSuccess()) {
nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
Expand Down Expand Up @@ -1234,6 +1241,7 @@ public synchronized void flushPendingRequests() {
pendingSendRequests = null;
pendingSendKeys.clear();
}
nextScheduledFlush = null;
}

void errorOut(final CompletionKey key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,15 @@ public static ByteBufList get(ByteBuf b1) {
return buf;
}

public static ByteBufList get(ByteBufList b1) {
ByteBufList buf = get();
for (int i = 0; i < b1.buffers.size(); ++i) {
buf.add(b1.buffers.get(i));
}
return buf;
}


/**
* Get a new {@link ByteBufList} instance from the pool that is the clone of an already existing instance.
*/
Expand Down Expand Up @@ -149,6 +158,10 @@ public void add(ByteBuf buf) {
}
}

public void add(ByteBufList b1) {
buffers.addAll(b1.buffers);
}

/**
* Prepend a {@link ByteBuf} at the beginning of this {@link ByteBufList}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ public SyncObj() {
@Override
@Before
public void setUp() throws Exception {
baseConf.setJournalWriteData(writeJournal);
baseClientConf.setUseV2WireProtocol(useV2);
super.setUp();
rng = new Random(0); // Initialize the Random
// Number Generator
Expand All @@ -136,14 +138,12 @@ public BookieWriteLedgerTest() {
String ledgerManagerFactory = "org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory";
// set ledger manager
baseConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
baseConf.setJournalWriteData(writeJournal);
/*
* 'testLedgerCreateAdvWithLedgerIdInLoop2' testcase relies on skipListSizeLimit,
* so setting it to some small value for making that testcase lite.
*/
baseConf.setSkipListSizeLimit(4 * 1024 * 1024);
baseClientConf.setLedgerManagerFactoryClassName(ledgerManagerFactory);
baseClientConf.setUseV2WireProtocol(useV2);
}

/**
Expand Down Expand Up @@ -1553,7 +1553,7 @@ public ByteBuf readEntry(long ledgerId, long entryId) throws IOException, NoLedg
@Test
public void testReadWriteEntry() throws Exception {
lh = bkc.createLedgerAdv(1, 1, 1, digestType, ledgerPassword);
numEntriesToWrite = 10000;
numEntriesToWrite = 100;
List<byte[]> entries = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(numEntriesToWrite);
for (int i = 0; i < numEntriesToWrite; ++i) {
Expand Down

0 comments on commit 0d09e8e

Please sign in to comment.