Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Client] Group writing into the channel in PerChannelBookieClient #3886

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
address comments
  • Loading branch information
hangc0276 committed Apr 2, 2023
commit 4ae8e532027ed999536c37f4496c61448a59c086
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
BKException.Code.WriteOnReadOnlyBookieException));
private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later.

private static final int MAX_PENDING_REQUEST_SIZE = 1024 * 1024;
private static final AtomicLong txnIdGenerator = new AtomicLong(0);

final BookieId bookieId;
Expand Down Expand Up @@ -1163,7 +1162,11 @@ private void writeAndFlush(final Channel channel,

try {
if (request instanceof ByteBuf || request instanceof ByteBufList) {
if (prepareSendRequests(request, key) || needFlush) {
if (checkFlushPendingRequests(request)) {
flushPendingRequests();
}
prepareSendRequests(request, key);
if (needFlush) {
flushPendingRequests();
}
} else {
Expand All @@ -1189,7 +1192,7 @@ private void writeAndFlush(final Channel channel,
}
}

public synchronized boolean prepareSendRequests(Object request, CompletionKey key) {
public synchronized void prepareSendRequests(Object request, CompletionKey key) {
if (pendingSendRequests == null) {
if (request instanceof ByteBuf) {
pendingSendRequests = ByteBufList.get((ByteBuf) request);
Expand All @@ -1200,7 +1203,16 @@ public synchronized boolean prepareSendRequests(Object request, CompletionKey ke
BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests);
}
pendingSendKeys.add(key);
return pendingSendRequests.readableBytes() > MAX_PENDING_REQUEST_SIZE;
}

public synchronized boolean checkFlushPendingRequests(Object request) {
if (pendingSendRequests == null) {
return false;
}

int numBytes = request instanceof ByteBuf
? ((ByteBuf) request).readableBytes() : ((ByteBufList) request).readableBytes();
return pendingSendRequests.readableBytes() + numBytes > maxFrameSize;
}

public synchronized void flushPendingRequests() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flushPendingRequests trigger at every millisecond. It may exacerbate the lock race between prepareSendRequests and flushPendingRequests.

Copy link
Member

@horizonzy horizonzy Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we can refer to the implements of writeCache and writeCacheBeingFlushed in the SingleDirectoryDbLedgerStorage to make flushPendingRequests lock-free.

Expand Down