Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Client] Group writing into the channel in PerChannelBookieClient #3886

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

hangc0276
Copy link
Contributor

Motivation

When the BookKeeper client writes an entry to the BookKeeper server, it runs with the following steps:

  • Step 1: Initiate a PendingAddOp object.
  • Step 2: For each replica, select a bookie client channel according to the ledgerId
  • Step 3: Write the entry to the bookie client channel, and flush it.
  • Step 4: The entry was added to Netty's pending queue, and processed with the configured Netty pipeline, such as bookieProtoEncoder, lengthbasedframedecoder, and consolidation
  • Step 5: Waiting for the written response

If the bookie client writes small entries with high ops and the Netty's pending queue will be full and the Netty thread will be busy with processing entries and flushing them into the socket channel. The CPU will switch between the user mode and the kernel mode in high frequency.

#3383 introduced Netty channel flushes consolidation to mitigate syscall overhead. But it can not reduce the overhead on the Netty threads.

We can tune it one Step 3 to group the small entries into one ByteBuf and flush it into the Netty pending queue when conditions are met.

Design

When a new entry comes to the bookie client channel, we add it into one ByteBuf and check whether the ByteBuf exceeds the max threshold, the default is 1MB.

In order to avoid entry staying in the Bookie client channel ByteBuf for a long time causing high write latency, we schedule a timer task to flush the ByteBuf every 1 ms.

Performance

We test the write performance on my laptop with the following command.

bin/benchmark writes -ensemble 1 -quorum 1 -ackQuorum 1 -ledgers 100 -throttle 300000 -entrysize 60 -useV2 -warmupMessages 1000000

The performance result.

Writer ledgers batched write ops/s non-batched write ops/s improved
1 333238 335970 0%
50 261605 153011 71%
100 260650 126331 100%
500 265628 164393 62%

@hangc0276 hangc0276 self-assigned this Mar 26, 2023
@hangc0276 hangc0276 added this to the 4.17.0 milestone Mar 26, 2023
@hangc0276 hangc0276 changed the title [Improve][Client] Group writing for per bookie client [Improve][Client] Group writing for perBookieClient Mar 26, 2023
@hangc0276 hangc0276 changed the title [Improve][Client] Group writing for perBookieClient [Improve][Client] Group writing into the channel in PerChannelBookieClient Mar 26, 2023
@@ -349,6 +353,10 @@ enum ConnectionState {
private final SecurityHandlerFactory shFactory;
private volatile boolean isWritable = true;
private long lastBookieUnavailableLogTimestamp = 0;
private ByteBuf pendingSendRequests = null;
private final Set<CompletionKey> pendingSendKeys = new HashSet<>();
private int maxPendingRequestsSize = DEFAULT_PENDING_REQUEST_SIZE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make it configurable?

} else {
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
if (nextScheduledFlush == null) {
nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make the delay time configurable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should cancel the nextScheduledFlush in the close method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make the delay time configurable?

We should ensure the flush operation as soon as possible, or it may lead to the addEntry operation timeout. So if we make it configurable, the user may config it with a wrong value.


public synchronized void flushPendingRequests() {
final long startTime = MathUtils.nowInNano();
Set<CompletionKey> keys = new HashSet<>(pendingSendKeys);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check the pendingSendRequests is not null before pending the listener? To avoid unnecessary listeners registered.

@@ -349,6 +353,10 @@ enum ConnectionState {
private final SecurityHandlerFactory shFactory;
private volatile boolean isWritable = true;
private long lastBookieUnavailableLogTimestamp = 0;
private ByteBuf pendingSendRequests = null;
private final Set<CompletionKey> pendingSendKeys = new HashSet<>();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we can also use ObjectHashSet

}
});
channel.writeAndFlush(request, promise);
}
} catch (Throwable e) {
LOG.warn("Operation {} failed", StringUtils.requestToString(request), e);
errorOut(key);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The catch code block is not suitable for the new logic. It just handle the single key case, and didn't handle the pending keys case.
And in the try code block, it looks like there is no exception that will be thrown. I think we can remove the try catch block.

return pendingSendRequests.readableBytes() > MAX_PENDING_REQUEST_SIZE;
}

public synchronized void flushPendingRequests() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The flushPendingRequests trigger at every millisecond. It may exacerbate the lock race between prepareSendRequests and flushPendingRequests.

Copy link
Member

@horizonzy horizonzy Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we can refer to the implements of writeCache and writeCacheBeingFlushed in the SingleDirectoryDbLedgerStorage to make flushPendingRequests lock-free.

return pendingSendRequests.readableBytes() > MAX_PENDING_REQUEST_SIZE;
}

public synchronized void flushPendingRequests() {
Copy link
Member

@horizonzy horizonzy Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we can refer to the implements of writeCache and writeCacheBeingFlushed in the SingleDirectoryDbLedgerStorage to make flushPendingRequests lock-free.

@dlg99
Copy link
Contributor

dlg99 commented Mar 29, 2023

I think this should be configurable.
It improves throughput for some workloads, it is possible that more latency-sensitive workloads would want to disable this

public static void serializeAddRequests(Object request, ByteBuf buf) {
if (request instanceof ByteBuf) {
ByteBuf r = (ByteBuf) request;
buf.writeBytes(r);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if buf does not have enough space to write bytes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we really need to copy data to another ByteBuf or can use CompositeByteBuf?

@@ -173,6 +174,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter {
BKException.Code.DuplicateEntryIdException,
BKException.Code.WriteOnReadOnlyBookieException));
private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100; // We may add finer grained priority later.
private static final int DEFAULT_PENDING_REQUEST_SIZE = 1024;

private static final int MAX_PENDING_REQUEST_SIZE = 1024 * 1024;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this cannot be larger than nettyMaxFrameSizeBytes

if (pendingSendRequests == null) {
pendingSendRequests = channel.alloc().directBuffer(maxPendingRequestsSize);
}
BookieProtoEncoding.RequestEnDeCoderPreV3.serializeAddRequests(request, pendingSendRequests);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what if request's size > maxPendingRequestsSize ?

} else {
nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
if (nextScheduledFlush == null) {
nextScheduledFlush = channel.eventLoop().scheduleWithFixedDelay(this::flushPendingRequests,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nextScheduledFlush is volatile so we assume access from different threads.
It can be null in "if" and not null in the assignment

@hangc0276 hangc0276 force-pushed the chenhang/group_writing_for_perBookieClient branch from 00567fd to ab95ba6 Compare April 2, 2023 13:56
@eolivelli eolivelli removed this from the 4.17.0 milestone May 3, 2024
buf.add((ByteBuf) request);
} else if (request instanceof ByteBufList) {
buf.add((ByteBufList) request);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

else throw IllegalStateException ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants