-
Notifications
You must be signed in to change notification settings - Fork 903
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Improve][Client] Group writing into the channel in PerChannelBookieClient #3886
base: master
Are you sure you want to change the base?
[Improve][Client] Group writing into the channel in PerChannelBookieClient #3886
Conversation
@@ -349,6 +353,10 @@ enum ConnectionState { | |||
private final SecurityHandlerFactory shFactory; | |||
private volatile boolean isWritable = true; | |||
private long lastBookieUnavailableLogTimestamp = 0; | |||
private ByteBuf pendingSendRequests = null; | |||
private final Set<CompletionKey> pendingSendKeys = new HashSet<>(); | |||
private int maxPendingRequestsSize = DEFAULT_PENDING_REQUEST_SIZE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to make it configurable?
} else { | ||
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); | ||
if (nextScheduledFlush == null) { | ||
nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to make the delay time configurable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should cancel the nextScheduledFlush
in the close method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to make the delay time configurable?
We should ensure the flush operation as soon as possible, or it may lead to the addEntry operation timeout. So if we make it configurable, the user may config it with a wrong value.
|
||
public synchronized void flushPendingRequests() { | ||
final long startTime = MathUtils.nowInNano(); | ||
Set<CompletionKey> keys = new HashSet<>(pendingSendKeys); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to check the pendingSendRequests is not null before pending the listener? To avoid unnecessary listeners registered.
@@ -349,6 +353,10 @@ enum ConnectionState { | |||
private final SecurityHandlerFactory shFactory; | |||
private volatile boolean isWritable = true; | |||
private long lastBookieUnavailableLogTimestamp = 0; | |||
private ByteBuf pendingSendRequests = null; | |||
private final Set<CompletionKey> pendingSendKeys = new HashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we can also use ObjectHashSet
} | ||
}); | ||
channel.writeAndFlush(request, promise); | ||
} | ||
} catch (Throwable e) { | ||
LOG.warn("Operation {} failed", StringUtils.requestToString(request), e); | ||
errorOut(key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The catch code block is not suitable for the new logic. It just handle the single key case, and didn't handle the pending keys case.
And in the try code block, it looks like there is no exception that will be thrown. I think we can remove the try catch block.
return pendingSendRequests.readableBytes() > MAX_PENDING_REQUEST_SIZE; | ||
} | ||
|
||
public synchronized void flushPendingRequests() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The flushPendingRequests trigger at every millisecond. It may exacerbate the lock race between prepareSendRequests
and flushPendingRequests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we can refer to the implements of writeCache and writeCacheBeingFlushed in the SingleDirectoryDbLedgerStorage
to make flushPendingRequests lock-free.
return pendingSendRequests.readableBytes() > MAX_PENDING_REQUEST_SIZE; | ||
} | ||
|
||
public synchronized void flushPendingRequests() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we can refer to the implements of writeCache and writeCacheBeingFlushed in the SingleDirectoryDbLedgerStorage
to make flushPendingRequests lock-free.
I think this should be configurable. |
public static void serializeAddRequests(Object request, ByteBuf buf) { | ||
if (request instanceof ByteBuf) { | ||
ByteBuf r = (ByteBuf) request; | ||
buf.writeBytes(r); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if buf does not have enough space to write bytes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we really need to copy data to another ByteBuf or can use CompositeByteBuf?
@@ -173,6 +174,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { | |||
BKException.Code.DuplicateEntryIdException, | |||
BKException.Code.WriteOnReadOnlyBookieException)); | |||
private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later. | |||
private static final int DEFAULT_PENDING_REQUEST_SIZE = 1024; | |||
|
|||
private static final int MAX_PENDING_REQUEST_SIZE = 1024 * 1024; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this cannot be larger than nettyMaxFrameSizeBytes
if (pendingSendRequests == null) { | ||
pendingSendRequests = channel.alloc().directBuffer(maxPendingRequestsSize); | ||
} | ||
BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if request's size > maxPendingRequestsSize ?
} else { | ||
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS); | ||
if (nextScheduledFlush == null) { | ||
nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nextScheduledFlush is volatile so we assume access from different threads.
It can be null in "if" and not null in the assignment
00567fd
to
ab95ba6
Compare
buf.add((ByteBuf) request); | ||
} else if (request instanceof ByteBufList) { | ||
buf.add((ByteBufList) request); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
else throw IllegalStateException ?
Motivation
When the BookKeeper client writes an entry to the BookKeeper server, it runs with the following steps:
bookieProtoEncoder
,lengthbasedframedecoder
, andconsolidation
If the bookie client writes small entries with high ops and the Netty's pending queue will be full and the Netty thread will be busy with processing entries and flushing them into the socket channel. The CPU will switch between the user mode and the kernel mode in high frequency.
#3383 introduced Netty channel flushes consolidation to mitigate syscall overhead. But it can not reduce the overhead on the Netty threads.
We can tune it one
Step 3
to group the small entries into one ByteBuf and flush it into the Netty pending queue when conditions are met.Design
When a new entry comes to the bookie client channel, we add it into one ByteBuf and check whether the ByteBuf exceeds the max threshold, the default is 1MB.
In order to avoid entry staying in the Bookie client channel ByteBuf for a long time causing high write latency, we schedule a timer task to flush the ByteBuf every 1 ms.
Performance
We test the write performance on my laptop with the following command.
The performance result.