Skip to content

Commit fb8454b

Browse files
committed
address comments and add tests
1 parent f776068 commit fb8454b

15 files changed

+120
-86
lines changed

netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import com.google.common.base.Preconditions;
3535

3636
import io.grpc.Status;
37-
import io.netty.channel.ChannelPromise;
3837

3938
/**
4039
* Command sent from a Netty client stream to the handler to cancel the stream.
@@ -43,8 +42,6 @@ class CancelClientStreamCommand extends WriteQueue.AbstractQueuedCommand {
4342
private final NettyClientStream stream;
4443
private final Status reason;
4544

46-
private ChannelPromise promise;
47-
4845
CancelClientStreamCommand(NettyClientStream stream, Status reason) {
4946
this.stream = Preconditions.checkNotNull(stream, "stream");
5047
Preconditions.checkNotNull(reason);

netty/src/main/java/io/grpc/netty/CancelServerStreamCommand.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import com.google.common.base.Preconditions;
3737

3838
import io.grpc.Status;
39-
import io.netty.channel.ChannelPromise;
4039

4140
/**
4241
* Command sent from a Netty server stream to the handler to cancel the stream.
@@ -45,8 +44,6 @@ class CancelServerStreamCommand extends WriteQueue.AbstractQueuedCommand {
4544
private final NettyServerStream.TransportState stream;
4645
private final Status reason;
4746

48-
private ChannelPromise promise;
49-
5047
CancelServerStreamCommand(NettyServerStream.TransportState stream, Status reason) {
5148
this.stream = Preconditions.checkNotNull(stream);
5249
this.reason = Preconditions.checkNotNull(reason);

netty/src/main/java/io/grpc/netty/CreateStreamCommand.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333

3434
import com.google.common.base.Preconditions;
3535

36-
import io.netty.channel.ChannelPromise;
3736
import io.netty.handler.codec.http2.Http2Headers;
3837

3938
/**
@@ -44,8 +43,6 @@ class CreateStreamCommand extends WriteQueue.AbstractQueuedCommand {
4443
private final Http2Headers headers;
4544
private final NettyClientStream stream;
4645

47-
private ChannelPromise promise;
48-
4946
CreateStreamCommand(Http2Headers headers,
5047
NettyClientStream stream) {
5148
this.stream = Preconditions.checkNotNull(stream, "stream");

netty/src/main/java/io/grpc/netty/ForcefulCloseCommand.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
package io.grpc.netty;
3333

3434
import io.grpc.Status;
35-
import io.netty.channel.ChannelPromise;
3635

3736
/**
3837
* A command to trigger close and close all streams. It is buffered differently than normal close
@@ -41,8 +40,6 @@
4140
class ForcefulCloseCommand extends WriteQueue.AbstractQueuedCommand {
4241
private final Status status;
4342

44-
private ChannelPromise promise;
45-
4643
public ForcefulCloseCommand(Status status) {
4744
this.status = status;
4845
}

netty/src/main/java/io/grpc/netty/GracefulCloseCommand.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
package io.grpc.netty;
3333

3434
import io.grpc.Status;
35-
import io.netty.channel.ChannelPromise;
3635

3736
/**
3837
* A command to trigger close. It is buffered differently than normal close and also includes
@@ -41,8 +40,6 @@
4140
class GracefulCloseCommand extends WriteQueue.AbstractQueuedCommand {
4241
private final Status status;
4342

44-
private ChannelPromise promise;
45-
4643
public GracefulCloseCommand(Status status) {
4744
this.status = status;
4845
}

netty/src/main/java/io/grpc/netty/RequestMessagesCommand.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333

3434
import io.grpc.internal.AbstractStream2;
3535
import io.grpc.internal.Stream;
36-
import io.netty.channel.ChannelPromise;
3736

3837
/**
3938
* Command which requests messages from the deframer.
@@ -44,8 +43,6 @@ class RequestMessagesCommand extends WriteQueue.AbstractQueuedCommand {
4443
private final Stream stream;
4544
private final AbstractStream2.TransportState state;
4645

47-
private ChannelPromise promise;
48-
4946
public RequestMessagesCommand(Stream stream, int numMessages) {
5047
this.state = null;
5148
this.numMessages = numMessages;

netty/src/main/java/io/grpc/netty/SendGrpcFrameCommand.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,11 +120,6 @@ public int hashCode() {
120120
return hash;
121121
}
122122

123-
@Override
124-
public Object command() {
125-
return this;
126-
}
127-
128123
@Override
129124
public ChannelPromise promise() {
130125
return promise;

netty/src/main/java/io/grpc/netty/SendPingCommand.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
package io.grpc.netty;
3333

3434
import io.grpc.internal.ClientTransport.PingCallback;
35-
import io.netty.channel.ChannelPromise;
3635

3736
import java.util.concurrent.Executor;
3837

@@ -43,8 +42,6 @@ class SendPingCommand extends WriteQueue.AbstractQueuedCommand {
4342
private final PingCallback callback;
4443
private final Executor executor;
4544

46-
private ChannelPromise promise;
47-
4845
SendPingCommand(PingCallback callback, Executor executor) {
4946
this.callback = callback;
5047
this.executor = executor;

netty/src/main/java/io/grpc/netty/SendResponseHeadersCommand.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333

3434
import com.google.common.base.Preconditions;
3535

36-
import io.netty.channel.ChannelPromise;
3736
import io.netty.handler.codec.http2.Http2Headers;
3837

3938
/**
@@ -44,8 +43,6 @@ class SendResponseHeadersCommand extends WriteQueue.AbstractQueuedCommand {
4443
private final Http2Headers headers;
4544
private final boolean endOfStream;
4645

47-
private ChannelPromise promise;
48-
4946
SendResponseHeadersCommand(StreamIdHolder stream, Http2Headers headers, boolean endOfStream) {
5047
this.stream = Preconditions.checkNotNull(stream);
5148
this.headers = Preconditions.checkNotNull(headers);

netty/src/main/java/io/grpc/netty/WriteQueue.java

Lines changed: 13 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ void scheduleFlush() {
9999
* @param flush true if a flush of the write should be schedule, false if a later call to
100100
* enqueue will schedule the flush.
101101
*/
102-
ChannelFuture enqueue(Object command, boolean flush) {
102+
ChannelFuture enqueue(QueuedCommand command, boolean flush) {
103103
return enqueue(command, channel.newPromise(), flush);
104104
}
105105

@@ -111,15 +111,12 @@ ChannelFuture enqueue(Object command, boolean flush) {
111111
* @param flush true if a flush of the write should be schedule, false if a later call to
112112
* enqueue will schedule the flush.
113113
*/
114-
ChannelFuture enqueue(Object command, ChannelPromise promise, boolean flush) {
115-
final QueuedCommand queuedCommand;
116-
if (command instanceof QueuedCommand) {
117-
queuedCommand = (QueuedCommand) command;
118-
queuedCommand.promise(promise);
119-
} else {
120-
queuedCommand = new InternalQueuedCommand(command, promise);
121-
}
122-
queue.add(queuedCommand);
114+
ChannelFuture enqueue(QueuedCommand command, ChannelPromise promise, boolean flush) {
115+
// Detect errornous code that tries to reuse command objects.
116+
Preconditions.checkNotNull(command.promise() == null, "promise must not be set on command");
117+
118+
command.promise(promise);
119+
queue.add(command);
123120
if (flush) {
124121
scheduleFlush();
125122
}
@@ -145,16 +142,19 @@ private void flush() {
145142
writesBeforeFlush -= writeChunk.size();
146143
while (!writeChunk.isEmpty()) {
147144
QueuedCommand cmd = writeChunk.poll();
148-
channel.write(cmd.command(), cmd.promise());
145+
channel.write(cmd, cmd.promise());
149146
}
150147
if (writesBeforeFlush <= 0) {
151148
writesBeforeFlush = min(queue.size(), MAX_WRITES_BEFORE_FLUSH);
152149
flushed = true;
153150
channel.flush();
154151
}
155152
}
153+
154+
assert writesBeforeFlush == 0;
155+
156156
if (!flushed) {
157-
// Must flush at least once
157+
// In case there were no items in the queue, we must flush at least once
158158
channel.flush();
159159
}
160160
} finally {
@@ -166,15 +166,10 @@ private void flush() {
166166
}
167167
}
168168

169-
static class AbstractQueuedCommand implements QueuedCommand {
169+
abstract static class AbstractQueuedCommand implements QueuedCommand {
170170

171171
private ChannelPromise promise;
172172

173-
@Override
174-
public Object command() {
175-
return this;
176-
}
177-
178173
@Override
179174
public final void promise(ChannelPromise promise) {
180175
this.promise = promise;
@@ -190,11 +185,6 @@ public final ChannelPromise promise() {
190185
* Simple wrapper type around a command and its optional completion listener.
191186
*/
192187
interface QueuedCommand {
193-
/**
194-
* Returns the object to write to the channel.
195-
*/
196-
Object command();
197-
198188
/**
199189
* Returns the promise beeing notified of the success/failure of the write.
200190
*/
@@ -205,18 +195,4 @@ interface QueuedCommand {
205195
*/
206196
void promise(ChannelPromise promise);
207197
}
208-
209-
private static final class InternalQueuedCommand extends AbstractQueuedCommand {
210-
private final Object command;
211-
212-
private InternalQueuedCommand(Object command, ChannelPromise promise) {
213-
this.command = command;
214-
promise(promise);
215-
}
216-
217-
@Override
218-
public Object command() {
219-
return command;
220-
}
221-
}
222198
}

0 commit comments

Comments
 (0)