Skip to content

Commit

Permalink
PendingWriteQueue to handle write operations with void future
Browse files Browse the repository at this point in the history
Motivation:

Right now PendingWriteQueue.removeAndWriteAll collects all promises to
PromiseCombiner instance which sets listener to each given promise throwing
IllegalStateException on VoidChannelPromise which breaks while loop
and "reports" operation as failed (when in fact part of writes might be
actually written).

Modifications:

Check if the promise is not void before adding it to the PromiseCombiner
instance.

Result:

PendingWriteQueue.removeAndWriteAll succesfully writes all pendings
even in case void promise was used.
  • Loading branch information
kachayev authored and normanmaurer committed Mar 16, 2018
1 parent f6251c8 commit b082376
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,9 @@ public ChannelFuture removeAndWriteAll() {
Object msg = write.msg;
ChannelPromise promise = write.promise;
recycle(write, false);
combiner.add(promise);
if (!(promise instanceof VoidChannelPromise)) {
combiner.add(promise);
}
ctx.write(msg, promise);
write = next;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,30 @@ public void operationComplete(ChannelFuture future) {
assertEquals(3L, channel.readOutbound());
}

@Test
public void testRemoveAndWriteAllWithVoidPromise() {
EmbeddedChannel channel = new EmbeddedChannel(new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
// Convert to writeAndFlush(...) so the promise will be notified by the transport.
ctx.writeAndFlush(msg, promise);
}
}, new ChannelOutboundHandlerAdapter());

final PendingWriteQueue queue = new PendingWriteQueue(channel.pipeline().lastContext());

ChannelPromise promise = channel.newPromise();
queue.add(1L, promise);
queue.add(2L, channel.voidPromise());
queue.removeAndWriteAll();

assertTrue(channel.finish());
assertTrue(promise.isDone());
assertTrue(promise.isSuccess());
assertEquals(1L, channel.readOutbound());
assertEquals(2L, channel.readOutbound());
}

@Test
public void testRemoveAndFailAllReentrantWrite() {
final List<Integer> failOrder = Collections.synchronizedList(new ArrayList<Integer>());
Expand Down

0 comments on commit b082376

Please sign in to comment.