Skip to content

Commit

Permalink
Make read entry request recyclable (apache#3842)
Browse files Browse the repository at this point in the history
* Make read entry request recyclable

* Move recycle to finally block

* Fix test and comments

* Fix test
  • Loading branch information
codelipenghui authored Mar 7, 2023
1 parent 5703132 commit 09365df
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public Object encode(Object msg, ByteBufAllocator allocator)
if (r.hasMasterKey()) {
buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH);
}

r.recycle();
return buf;
} else if (r instanceof BookieProtocol.AuthRequest) {
BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthRequest) r).getAuthMessage();
Expand Down Expand Up @@ -193,9 +193,9 @@ public Object decode(ByteBuf packet)
if ((flags & BookieProtocol.FLAG_DO_FENCING) == BookieProtocol.FLAG_DO_FENCING
&& version >= 2) {
byte[] masterKey = readMasterKey(packet);
return new BookieProtocol.ReadRequest(version, ledgerId, entryId, flags, masterKey);
return BookieProtocol.ReadRequest.create(version, ledgerId, entryId, flags, masterKey);
} else {
return new BookieProtocol.ReadRequest(version, ledgerId, entryId, flags, null);
return BookieProtocol.ReadRequest.create(version, ledgerId, entryId, flags, null);
}
case BookieProtocol.AUTH:
BookkeeperProtocol.AuthMessage.Builder builder = BookkeeperProtocol.AuthMessage.newBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -362,14 +362,43 @@ public void recycle() {
* A Request that reads data.
*/
class ReadRequest extends Request {
ReadRequest(byte protocolVersion, long ledgerId, long entryId,
short flags, byte[] masterKey) {
init(protocolVersion, READENTRY, ledgerId, entryId, flags, masterKey);

static ReadRequest create(byte protocolVersion, long ledgerId, long entryId,
short flags, byte[] masterKey) {
ReadRequest read = RECYCLER.get();
read.protocolVersion = protocolVersion;
read.opCode = READENTRY;
read.ledgerId = ledgerId;
read.entryId = entryId;
read.flags = flags;
read.masterKey = masterKey;
return read;
}

boolean isFencing() {
return (flags & FLAG_DO_FENCING) == FLAG_DO_FENCING;
}

private final Handle<ReadRequest> recyclerHandle;

private ReadRequest(Handle<ReadRequest> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

private static final Recycler<ReadRequest> RECYCLER = new Recycler<ReadRequest>() {
@Override
protected ReadRequest newObject(Handle<ReadRequest> handle) {
return new ReadRequest(handle);
}
};

@Override
public void recycle() {
ledgerId = -1;
entryId = -1;
masterKey = null;
recyclerHandle.recycle(this);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ public void readLac(final long ledgerId, ReadLacCallback cb, Object ctx) {
Object request = null;
CompletionKey completionKey = null;
if (useV2WireProtocol) {
request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
request = BookieProtocol.ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION,
ledgerId, 0, (short) 0, null);
completionKey = acquireV2Key(ledgerId, 0, OperationType.READ_LAC);
} else {
Expand Down Expand Up @@ -933,7 +933,7 @@ private void readEntryInternal(final long ledgerId,
Object request = null;
CompletionKey completionKey = null;
if (useV2WireProtocol) {
request = new BookieProtocol.ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
request = BookieProtocol.ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION,
ledgerId, entryId, (short) flags, masterKey);
completionKey = acquireV2Key(ledgerId, entryId, OperationType.READ_ENTRY);
} else {
Expand Down Expand Up @@ -1168,7 +1168,6 @@ private void writeAndFlush(final Channel channel,
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
}
});

channel.writeAndFlush(request, promise);
} catch (Throwable e) {
LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ public String toString() {
}

private void recycle() {
request.recycle();
super.reset();
this.recyclerHandle.recycle(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private void testAsynchronousRequest(boolean result, int errorCode) throws Excep

ExecutorService service = Executors.newCachedThreadPool();
long ledgerId = System.currentTimeMillis();
ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
1, BookieProtocol.FLAG_DO_FENCING, new byte[]{});
ReadEntryProcessor processor = ReadEntryProcessor.create(
request, requestHandler, requestProcessor, service, true);
Expand Down Expand Up @@ -150,7 +150,7 @@ private void testSynchronousRequest(boolean result, int errorCode) throws Except
}).when(channel).writeAndFlush(any(Response.class));

long ledgerId = System.currentTimeMillis();
ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
1, BookieProtocol.FLAG_DO_FENCING, new byte[]{});
ReadEntryProcessor processor = ReadEntryProcessor.create(request, requestHandler, requestProcessor, null, true);
fenceResult.complete(result);
Expand Down Expand Up @@ -180,7 +180,7 @@ public void testNonFenceRequest() throws Exception {
}).when(channel).writeAndFlush(any(Response.class));

long ledgerId = System.currentTimeMillis();
ReadRequest request = new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
ReadRequest request = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId,
1, (short) 0, new byte[]{});
ReadEntryProcessor processor = ReadEntryProcessor.create(request, requestHandler, requestProcessor, null, true);
processor.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,9 @@ public void testAuthFail() throws Exception {

}

client.sendRequest(new ReadRequest(BookieProtocol.CURRENT_PROTOCOL_VERSION,
1L, 1L, (short) 0, null));
ReadRequest read = ReadRequest.create(BookieProtocol.CURRENT_PROTOCOL_VERSION,
1L, 1L, (short) 0, null);
client.sendRequest(read);
Response response = client.takeResponse();
assertEquals("Should have failed",
response.getErrorCode(), BookieProtocol.EUA);
Expand Down

0 comments on commit 09365df

Please sign in to comment.