-
Notifications
You must be signed in to change notification settings - Fork 3.9k
netty: improve flushing and object allocations in write queue. #1989
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ | |
|
||
package io.grpc.netty; | ||
|
||
import com.google.common.annotations.VisibleForTesting; | ||
import com.google.common.base.Preconditions; | ||
|
||
import io.netty.channel.Channel; | ||
|
@@ -49,6 +50,9 @@ class WriteQueue { | |
|
||
private static final int DEQUE_CHUNK_SIZE = 128; | ||
|
||
@VisibleForTesting | ||
static final int MAX_WRITES_BEFORE_FLUSH = 8192; | ||
|
||
/** | ||
* {@link Runnable} used to schedule work onto the tail of the event loop. | ||
*/ | ||
|
@@ -60,18 +64,18 @@ public void run() { | |
}; | ||
|
||
private final Channel channel; | ||
private final BlockingQueue<Runnable> queue; | ||
private final BlockingQueue<QueuedCommand> queue; | ||
private final AtomicBoolean scheduled = new AtomicBoolean(); | ||
|
||
/** | ||
* ArrayDeque to copy queue into when flushing in event loop. | ||
*/ | ||
private final ArrayDeque<Runnable> writeChunk = | ||
new ArrayDeque<Runnable>(DEQUE_CHUNK_SIZE); | ||
private final ArrayDeque<QueuedCommand> writeChunk = | ||
new ArrayDeque<QueuedCommand>(DEQUE_CHUNK_SIZE); | ||
|
||
public WriteQueue(Channel channel) { | ||
this.channel = Preconditions.checkNotNull(channel, "channel"); | ||
queue = new LinkedBlockingQueue<Runnable>(); | ||
queue = new LinkedBlockingQueue<QueuedCommand>(); | ||
} | ||
|
||
/** | ||
|
@@ -93,7 +97,7 @@ void scheduleFlush() { | |
* @param flush true if a flush of the write should be schedule, false if a later call to | ||
* enqueue will schedule the flush. | ||
*/ | ||
ChannelFuture enqueue(Object command, boolean flush) { | ||
ChannelFuture enqueue(QueuedCommand command, boolean flush) { | ||
return enqueue(command, channel.newPromise(), flush); | ||
} | ||
|
||
|
@@ -105,8 +109,12 @@ ChannelFuture enqueue(Object command, boolean flush) { | |
* @param flush true if a flush of the write should be schedule, false if a later call to | ||
* enqueue will schedule the flush. | ||
*/ | ||
ChannelFuture enqueue(Object command, ChannelPromise promise, boolean flush) { | ||
queue.add(new QueuedCommand(command, promise)); | ||
ChannelFuture enqueue(QueuedCommand command, ChannelPromise promise, boolean flush) { | ||
// Detect errornous code that tries to reuse command objects. | ||
Preconditions.checkNotNull(command.promise() == null, "promise must not be set on command"); | ||
|
||
command.promise(promise); | ||
queue.add(command); | ||
if (flush) { | ||
scheduleFlush(); | ||
} | ||
|
@@ -118,20 +126,34 @@ ChannelFuture enqueue(Object command, ChannelPromise promise, boolean flush) { | |
* called in the event loop | ||
*/ | ||
private void flush() { | ||
assert channel.eventLoop().inEventLoop(); | ||
|
||
try { | ||
boolean flushed = false; | ||
// We can't just call flush after having completely drained the queue, as new objects might | ||
// be added to the queue continuously and so it may never get empty. If we never flushed in | ||
// that case we would be guaranteed to OOM. | ||
// However, we also don't want to flush too often as flushing invokes expensive system | ||
// calls and we want to feed Netty with enough data so as to maximize the data written per | ||
// system call. | ||
int writesBeforeFlush = MAX_WRITES_BEFORE_FLUSH; | ||
// Always dequeue in chunks, in order to not acquire the queue's lock too often. | ||
while (queue.drainTo(writeChunk, DEQUE_CHUNK_SIZE) > 0) { | ||
while (writeChunk.size() > 0) { | ||
writeChunk.poll().run(); | ||
writesBeforeFlush -= writeChunk.size(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. would this be simpler as a for loop over queue? I see that there is a lock, but it seems like it would make more sense to atomic compare and swap the current queue with a shadow queue, and avoid the copy altogether. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @carl-mastrangelo well even when swapping the queues, each Nevertheless, an interesting idea, which we would have to experiment with. I would prefer to do that in a separate PR, as this one is really focused on the flushing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Atomic compare and swap of the queue would be broken since producers first read the variable and then write to it. That is a recipe for races. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you are atomic swapping the queues, they don't need to be blocking right? They can be the regular kind You don't have to do it for this PR, but I think the idea is still worth exploring. There may not even need to be multiple Queues, just one that each thread steals when it needs to do work. |
||
while (!writeChunk.isEmpty()) { | ||
QueuedCommand cmd = writeChunk.poll(); | ||
channel.write(cmd, cmd.promise()); | ||
} | ||
flushed = false; | ||
if (writesBeforeFlush <= 0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we still have to make sure that the last items that were written get flushed. The check below ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @nmittler The condition There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh I forgot that it's a min ... I was thinking it was just |
||
writesBeforeFlush = MAX_WRITES_BEFORE_FLUSH; | ||
flushed = true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks very broken, since there is no There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ejona86 I ll have to rename the variable. It's just used as a flag to remember whether we entered the outer while loop or not (that is, whether there are items to write). If we didn't enter the while loop, we still want to call flush. That's what this variable is used. However, when we entered the while loop the condition |
||
channel.flush(); | ||
} | ||
// Flush each chunk so we are releasing buffers periodically. In theory this loop | ||
// might never end as new events are continuously added to the queue, if we never | ||
// flushed in that case we would be guaranteed to OOM. | ||
flushed = true; | ||
channel.flush(); | ||
} | ||
|
||
if (!flushed) { | ||
// Must flush at least once | ||
// In case there were no items in the queue, we must flush at least once | ||
channel.flush(); | ||
} | ||
} finally { | ||
|
@@ -143,22 +165,33 @@ private void flush() { | |
} | ||
} | ||
|
||
/** | ||
* Simple wrapper type around a command and its optional completion listener. | ||
*/ | ||
private final class QueuedCommand implements Runnable { | ||
private final Object command; | ||
private final ChannelPromise promise; | ||
abstract static class AbstractQueuedCommand implements QueuedCommand { | ||
|
||
private QueuedCommand(Object command, ChannelPromise promise) { | ||
this.command = command; | ||
private ChannelPromise promise; | ||
|
||
@Override | ||
public final void promise(ChannelPromise promise) { | ||
this.promise = promise; | ||
} | ||
|
||
@Override | ||
public void run() { | ||
channel.write(command, promise); | ||
public final ChannelPromise promise() { | ||
return promise; | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Simple wrapper type around a command and its optional completion listener. | ||
*/ | ||
interface QueuedCommand { | ||
/** | ||
* Returns the promise beeing notified of the success/failure of the write. | ||
*/ | ||
ChannelPromise promise(); | ||
|
||
/** | ||
* Sets the promise. | ||
*/ | ||
void promise(ChannelPromise promise); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
erroneous?