Skip to content

netty: improve flushing and object allocations in write queue. #1989

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
/**
* Command sent from a Netty client stream to the handler to cancel the stream.
*/
class CancelClientStreamCommand {
class CancelClientStreamCommand extends WriteQueue.AbstractQueuedCommand {
private final NettyClientStream stream;
private final Status reason;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
/**
* Command sent from a Netty server stream to the handler to cancel the stream.
*/
class CancelServerStreamCommand {
class CancelServerStreamCommand extends WriteQueue.AbstractQueuedCommand {
private final NettyServerStream.TransportState stream;
private final Status reason;

Expand Down
2 changes: 1 addition & 1 deletion netty/src/main/java/io/grpc/netty/CreateStreamCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* A command to create a new stream. This is created by {@link NettyClientStream} and passed to the
* {@link NettyClientHandler} for processing in the Channel thread.
*/
class CreateStreamCommand {
class CreateStreamCommand extends WriteQueue.AbstractQueuedCommand {
private final Http2Headers headers;
private final NettyClientStream stream;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* A command to trigger close and close all streams. It is buffered differently than normal close
* and also includes reason for closure.
*/
class ForcefulCloseCommand {
class ForcefulCloseCommand extends WriteQueue.AbstractQueuedCommand {
private final Status status;

public ForcefulCloseCommand(Status status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
* A command to trigger close. It is buffered differently than normal close and also includes
* reason for closure.
*/
class GracefulCloseCommand {
class GracefulCloseCommand extends WriteQueue.AbstractQueuedCommand {
private final Status status;

public GracefulCloseCommand(Status status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
/**
* Command which requests messages from the deframer.
*/
class RequestMessagesCommand {
class RequestMessagesCommand extends WriteQueue.AbstractQueuedCommand {

private final int numMessages;
private final Stream stream;
Expand Down
15 changes: 14 additions & 1 deletion netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,17 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.buffer.DefaultByteBufHolder;
import io.netty.channel.ChannelPromise;

/**
* Command sent from the transport to the Netty channel to send a GRPC frame to the remote endpoint.
*/
class SendGrpcFrameCommand extends DefaultByteBufHolder {
class SendGrpcFrameCommand extends DefaultByteBufHolder implements WriteQueue.QueuedCommand {
private final StreamIdHolder stream;
private final boolean endStream;

private ChannelPromise promise;

SendGrpcFrameCommand(StreamIdHolder stream, ByteBuf content, boolean endStream) {
super(content);
this.stream = stream;
Expand Down Expand Up @@ -116,4 +119,14 @@ public int hashCode() {
}
return hash;
}

@Override
public ChannelPromise promise() {
return promise;
}

@Override
public void promise(ChannelPromise promise) {
this.promise = promise;
}
}
2 changes: 1 addition & 1 deletion netty/src/main/java/io/grpc/netty/SendPingCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
/**
* Command sent from the transport to the Netty channel to send a PING frame.
*/
class SendPingCommand {
class SendPingCommand extends WriteQueue.AbstractQueuedCommand {
private final PingCallback callback;
private final Executor executor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
/**
* Command sent from the transport to the Netty channel to send response headers to the client.
*/
class SendResponseHeadersCommand {
class SendResponseHeadersCommand extends WriteQueue.AbstractQueuedCommand {
private final StreamIdHolder stream;
private final Http2Headers headers;
private final boolean endOfStream;
Expand Down
85 changes: 59 additions & 26 deletions netty/src/main/java/io/grpc/netty/WriteQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package io.grpc.netty;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import io.netty.channel.Channel;
Expand All @@ -49,6 +50,9 @@ class WriteQueue {

private static final int DEQUE_CHUNK_SIZE = 128;

@VisibleForTesting
static final int MAX_WRITES_BEFORE_FLUSH = 8192;

/**
* {@link Runnable} used to schedule work onto the tail of the event loop.
*/
Expand All @@ -60,18 +64,18 @@ public void run() {
};

private final Channel channel;
private final BlockingQueue<Runnable> queue;
private final BlockingQueue<QueuedCommand> queue;
private final AtomicBoolean scheduled = new AtomicBoolean();

/**
* ArrayDeque to copy queue into when flushing in event loop.
*/
private final ArrayDeque<Runnable> writeChunk =
new ArrayDeque<Runnable>(DEQUE_CHUNK_SIZE);
private final ArrayDeque<QueuedCommand> writeChunk =
new ArrayDeque<QueuedCommand>(DEQUE_CHUNK_SIZE);

public WriteQueue(Channel channel) {
this.channel = Preconditions.checkNotNull(channel, "channel");
queue = new LinkedBlockingQueue<Runnable>();
queue = new LinkedBlockingQueue<QueuedCommand>();
}

/**
Expand All @@ -93,7 +97,7 @@ void scheduleFlush() {
* @param flush true if a flush of the write should be schedule, false if a later call to
* enqueue will schedule the flush.
*/
ChannelFuture enqueue(Object command, boolean flush) {
ChannelFuture enqueue(QueuedCommand command, boolean flush) {
return enqueue(command, channel.newPromise(), flush);
}

Expand All @@ -105,8 +109,12 @@ ChannelFuture enqueue(Object command, boolean flush) {
* @param flush true if a flush of the write should be schedule, false if a later call to
* enqueue will schedule the flush.
*/
ChannelFuture enqueue(Object command, ChannelPromise promise, boolean flush) {
queue.add(new QueuedCommand(command, promise));
ChannelFuture enqueue(QueuedCommand command, ChannelPromise promise, boolean flush) {
// Detect errornous code that tries to reuse command objects.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

erroneous?

Preconditions.checkNotNull(command.promise() == null, "promise must not be set on command");

command.promise(promise);
queue.add(command);
if (flush) {
scheduleFlush();
}
Expand All @@ -118,20 +126,34 @@ ChannelFuture enqueue(Object command, ChannelPromise promise, boolean flush) {
* called in the event loop
*/
private void flush() {
assert channel.eventLoop().inEventLoop();

try {
boolean flushed = false;
// We can't just call flush after having completely drained the queue, as new objects might
// be added to the queue continuously and so it may never get empty. If we never flushed in
// that case we would be guaranteed to OOM.
// However, we also don't want to flush too often as flushing invokes expensive system
// calls and we want to feed Netty with enough data so as to maximize the data written per
// system call.
int writesBeforeFlush = MAX_WRITES_BEFORE_FLUSH;
// Always dequeue in chunks, in order to not acquire the queue's lock too often.
while (queue.drainTo(writeChunk, DEQUE_CHUNK_SIZE) > 0) {
while (writeChunk.size() > 0) {
writeChunk.poll().run();
writesBeforeFlush -= writeChunk.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would this be simpler as a for loop over queue? I see that there is a lock, but it seems like it would make more sense to atomic compare and swap the current queue with a shadow queue, and avoid the copy altogether.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carl-mastrangelo well even when swapping the queues, each poll() would still have to acquire a lock. I think that's what @louiscryan had in mind there. I wouldn't expect the lock to be contented, as offer and poll use two separate locks in the LinkedBlockingQueue.

Nevertheless, an interesting idea, which we would have to experiment with. I would prefer to do that in a separate PR, as this one is really focused on the flushing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Atomic compare and swap of the queue would be broken since producers first read the variable and then write to it. That is a recipe for races.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are atomic swapping the queues, they don't need to be blocking right? They can be the regular kind

You don't have to do it for this PR, but I think the idea is still worth exploring. There may not even need to be multiple Queues, just one that each thread steals when it needs to do work.

while (!writeChunk.isEmpty()) {
QueuedCommand cmd = writeChunk.poll();
channel.write(cmd, cmd.promise());
}
flushed = false;
if (writesBeforeFlush <= 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we still have to make sure that the last items that were written get flushed. The check below (if (!flushed)) isn't sufficient, because we never reset flushed = false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nmittler The condition if (!flushed) is only for when there are zero items in the queue. The last items also get flushed cause writesBeforeFlush is effectively equal to queue.size(). So it's called after having written queue.size() many items, which we will always be doing. That being said, I think the code is more complex than it needs to be. I ll try to make it more clear.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I forgot that it's a min ... I was thinking it was just MAX_WRITES_BEFORE_FLUSH. Makes sense.

writesBeforeFlush = MAX_WRITES_BEFORE_FLUSH;
flushed = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very broken, since there is no flushed = false after this point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ejona86 I ll have to rename the variable. It's just used as a flag to remember whether we entered the outer while loop or not (that is, whether there are items to write). If we didn't enter the while loop, we still want to call flush. That's what this variable is used. However, when we entered the while loop the condition if (writesBeforeFlush <= 0) will eventually always be true and thus flush is guaranteed to be called after the last write.

channel.flush();
}
// Flush each chunk so we are releasing buffers periodically. In theory this loop
// might never end as new events are continuously added to the queue, if we never
// flushed in that case we would be guaranteed to OOM.
flushed = true;
channel.flush();
}

if (!flushed) {
// Must flush at least once
// In case there were no items in the queue, we must flush at least once
channel.flush();
}
} finally {
Expand All @@ -143,22 +165,33 @@ private void flush() {
}
}

/**
* Simple wrapper type around a command and its optional completion listener.
*/
private final class QueuedCommand implements Runnable {
private final Object command;
private final ChannelPromise promise;
abstract static class AbstractQueuedCommand implements QueuedCommand {

private QueuedCommand(Object command, ChannelPromise promise) {
this.command = command;
private ChannelPromise promise;

@Override
public final void promise(ChannelPromise promise) {
this.promise = promise;
}

@Override
public void run() {
channel.write(command, promise);
public final ChannelPromise promise() {
return promise;
}
}
}

/**
* Simple wrapper type around a command and its optional completion listener.
*/
interface QueuedCommand {
/**
* Returns the promise beeing notified of the success/failure of the write.
*/
ChannelPromise promise();

/**
* Sets the promise.
*/
void promise(ChannelPromise promise);
}
}
7 changes: 3 additions & 4 deletions netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -373,12 +373,11 @@ public void connectionWindowShouldBeOverridden() throws Exception {
@Test
public void createIncrementsIdsForActualAndBufferdStreams() throws Exception {
receiveMaxConcurrentStreams(2);
CreateStreamCommand command = new CreateStreamCommand(grpcHeaders, stream);
enqueue(command);
enqueue(new CreateStreamCommand(grpcHeaders, stream));
verify(stream).id(eq(3));
enqueue(command);
enqueue(new CreateStreamCommand(grpcHeaders, stream));
verify(stream).id(eq(5));
enqueue(command);
enqueue(new CreateStreamCommand(grpcHeaders, stream));
verify(stream).id(eq(7));
}

Expand Down
7 changes: 4 additions & 3 deletions netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import io.grpc.Status;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.GrpcUtil;
import io.grpc.netty.WriteQueue.QueuedCommand;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -380,7 +381,7 @@ public void removeUserAgentFromApplicationHeaders() {
metadata.put(GrpcUtil.USER_AGENT_KEY, "bad agent");
listener = mock(ClientStreamListener.class);
Mockito.reset(writeQueue);
when(writeQueue.enqueue(any(), any(boolean.class))).thenReturn(future);
when(writeQueue.enqueue(any(QueuedCommand.class), any(boolean.class))).thenReturn(future);

stream = new NettyClientStreamImpl(methodDescriptor, new Metadata(), channel, handler,
DEFAULT_MAX_MESSAGE_SIZE, AsciiString.of("localhost"), AsciiString.of("http"),
Expand All @@ -404,8 +405,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
}
return null;
}
}).when(writeQueue).enqueue(any(), any(ChannelPromise.class), anyBoolean());
when(writeQueue.enqueue(any(), anyBoolean())).thenReturn(future);
}).when(writeQueue).enqueue(any(QueuedCommand.class), any(ChannelPromise.class), anyBoolean());
when(writeQueue.enqueue(any(QueuedCommand.class), anyBoolean())).thenReturn(future);
NettyClientStream stream = new NettyClientStreamImpl(methodDescriptor, new Metadata(), channel,
handler, DEFAULT_MAX_MESSAGE_SIZE, AsciiString.of("localhost"), AsciiString.of("http"),
AsciiString.of("agent"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ protected final Http2Connection connection() {
return handler().connection();
}

protected final ChannelFuture enqueue(Object command) {
protected final ChannelFuture enqueue(WriteQueue.QueuedCommand command) {
ChannelFuture future = writeQueue.enqueue(command, newPromise(), true);
channel.runPendingTasks();
return future;
Expand Down
5 changes: 3 additions & 2 deletions netty/src/test/java/io/grpc/netty/NettyServerStreamTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.ServerStreamListener;
import io.grpc.netty.WriteQueue.QueuedCommand;
import io.netty.buffer.EmptyByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.ChannelPromise;
Expand Down Expand Up @@ -287,8 +288,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
}
return null;
}
}).when(writeQueue).enqueue(any(), any(ChannelPromise.class), anyBoolean());
when(writeQueue.enqueue(any(), anyBoolean())).thenReturn(future);
}).when(writeQueue).enqueue(any(QueuedCommand.class), any(ChannelPromise.class), anyBoolean());
when(writeQueue.enqueue(any(QueuedCommand.class), anyBoolean())).thenReturn(future);
NettyServerStream.TransportState state =
new NettyServerStream.TransportState(handler, http2Stream, DEFAULT_MAX_MESSAGE_SIZE);
NettyServerStream stream = new NettyServerStream(channel, state);
Expand Down
Loading