Skip to content

Commit

Permalink
Ensure that a user implements flush() or inboundBufferUpdated()
Browse files Browse the repository at this point in the history
- Also prohibited a user from overriding
  ChannelInbound(Byte|Message)HandlerAdapter.  If a user wants to do
  that, he or she should extend ChannelInboundHandlerAdapter instead.
  • Loading branch information
trustin committed Jun 10, 2012
1 parent b6d5593 commit 87f52aa
Show file tree
Hide file tree
Showing 20 changed files with 99 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,18 @@
public abstract class ByteToByteDecoder extends ChannelInboundByteHandlerAdapter {

@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
callDecode(ctx);
public void inboundBufferUpdated(ChannelHandlerContext ctx, ChannelBuffer in) throws Exception {
callDecode(ctx, in, ctx.nextOutboundByteBuffer());
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ChannelBuffer in = ctx.inboundByteBuffer();
ChannelBuffer out = ctx.nextInboundByteBuffer();
if (!in.readable()) {
callDecode(ctx);
callDecode(ctx, in, out);
}

ChannelBuffer out = ctx.nextInboundByteBuffer();
int oldOutSize = out.readableBytes();
try {
decodeLast(ctx, in, out);
Expand All @@ -53,10 +53,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelInactive();
}

private void callDecode(ChannelHandlerContext ctx) {
ChannelBuffer in = ctx.inboundByteBuffer();
ChannelBuffer out = ctx.nextInboundByteBuffer();

private void callDecode(ChannelHandlerContext ctx, ChannelBuffer in, ChannelBuffer out) {
int oldOutSize = out.readableBytes();
while (in.readable()) {
int oldInSize = in.readableBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
package io.netty.handler.codec;

import io.netty.buffer.ChannelBuffer;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandler;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;

public abstract class ByteToMessageDecoder<O> extends ChannelInboundByteHandlerAdapter {
public abstract class ByteToMessageDecoder<O> extends ChannelInboundHandlerAdapter<Byte> {

private ChannelHandlerContext ctx;

Expand All @@ -31,6 +33,11 @@ public void beforeAdd(ChannelHandlerContext ctx) throws Exception {
super.beforeAdd(ctx);
}

@Override
public ChannelBufferHolder<Byte> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.byteBuffer();
}

@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
callDecode(ctx);
Expand Down
32 changes: 16 additions & 16 deletions codec/src/main/java/io/netty/handler/codec/CodecUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,28 @@ static boolean unfoldAndAdd(
}

if (inbound) {
try {
if (ctx.hasNextInboundMessageBuffer()) {
ctx.nextInboundMessageBuffer().add(msg);
return true;
} catch (NoSuchBufferException e) {
if (msg instanceof ChannelBuffer) {
ChannelBuffer altDst = ctx.nextInboundByteBuffer();
ChannelBuffer src = (ChannelBuffer) msg;
altDst.writeBytes(src, src.readerIndex(), src.readableBytes());
return true;
}
}

if (msg instanceof ChannelBuffer && ctx.hasNextInboundByteBuffer()) {
ChannelBuffer altDst = ctx.nextInboundByteBuffer();
ChannelBuffer src = (ChannelBuffer) msg;
altDst.writeBytes(src, src.readerIndex(), src.readableBytes());
return true;
}
} else {
try {
if (ctx.hasNextOutboundMessageBuffer()) {
ctx.nextOutboundMessageBuffer().add(msg);
return true;
} catch (NoSuchBufferException e) {
if (msg instanceof ChannelBuffer) {
ChannelBuffer altDst = ctx.nextOutboundByteBuffer();
ChannelBuffer src = (ChannelBuffer) msg;
altDst.writeBytes(src, src.readerIndex(), src.readableBytes());
return true;
}
}

if (msg instanceof ChannelBuffer && ctx.hasNextOutboundByteBuffer()) {
ChannelBuffer altDst = ctx.nextOutboundByteBuffer();
ChannelBuffer src = (ChannelBuffer) msg;
altDst.writeBytes(src, src.readerIndex(), src.readableBytes());
return true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,19 @@
*/
package io.netty.handler.codec;

import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.Queue;

public abstract class MessageToMessageDecoder<I, O> extends ChannelInboundMessageHandlerAdapter<I> {
public abstract class MessageToMessageDecoder<I, O> extends ChannelInboundHandlerAdapter<I> {

@Override
public ChannelBufferHolder<I> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
}

@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,14 @@ private PortUnificationServerHandler(boolean detectSsl, boolean detectGzip) {
}

@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
ChannelBuffer buffer = ctx.inboundByteBuffer();

public void inboundBufferUpdated(ChannelHandlerContext ctx, ChannelBuffer in) throws Exception {
// Will use the first two bytes to detect a protocol.
if (buffer.readableBytes() < 2) {
if (in.readableBytes() < 2) {
return;
}

final int magic1 = buffer.getUnsignedByte(buffer.readerIndex());
final int magic2 = buffer.getUnsignedByte(buffer.readerIndex() + 1);
final int magic1 = in.getUnsignedByte(in.readerIndex());
final int magic2 = in.getUnsignedByte(in.readerIndex() + 1);

if (isSsl(magic1)) {
enableSsl(ctx);
Expand All @@ -74,13 +72,13 @@ public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
switchToFactorial(ctx);
} else {
// Unknown protocol; discard everything and close the connection.
buffer.clear();
in.clear();
ctx.close();
return;
}

// Forward the current read buffer as is to the new handlers.
ctx.nextInboundByteBuffer().writeBytes(buffer);
ctx.nextInboundByteBuffer().writeBytes(in);
ctx.fireInboundBufferUpdated();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
}

@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
ChannelBuffer in = ctx.inboundByteBuffer();
public void inboundBufferUpdated(ChannelHandlerContext ctx, ChannelBuffer in) throws Exception {
ChannelBuffer out = inboundChannel.outboundByteBuffer();
out.discardReadBytes();
out.writeBytes(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
}

@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
ChannelBuffer in = ctx.inboundByteBuffer();
public void inboundBufferUpdated(ChannelHandlerContext ctx, ChannelBuffer in) throws Exception {
ChannelBuffer out = outboundChannel.outboundByteBuffer();
out.discardReadBytes();
out.writeBytes(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,4 +218,10 @@ private void addEvent(Object e) {
queue.add(e);
}
}

@Override
public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
// TODO Auto-generated method stub

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,8 @@ public ChannelBufferHolder<ChannelBuffer> newOutboundBuffer(ChannelHandlerContex

@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
super.flush(ctx, future);

future.setSuccess();
}

};

EmbeddedMessageChannel ch = new EmbeddedMessageChannel(new ChunkedWriteHandler(), testHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInboundByteHandlerAdapter;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.spdy.SpdyConstants;
Expand Down Expand Up @@ -255,10 +255,9 @@ private class SpdyEchoTestClientHandler extends ChannelInboundByteHandlerAdapter
}

@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
ChannelBuffer m = ctx.inboundByteBuffer();
byte[] actual = new byte[m.readableBytes()];
m.readBytes(actual);
public void inboundBufferUpdated(ChannelHandlerContext ctx, ChannelBuffer in) throws Exception {
byte[] actual = new byte[in.readableBytes()];
in.readBytes(actual);

int lastIdx = counter;
for (int i = 0; i < actual.length; i ++) {
Expand Down
10 changes: 8 additions & 2 deletions transport/src/main/java/io/netty/bootstrap/ServerBootstrap.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
package io.netty.bootstrap;

import io.netty.channel.Channel;
import io.netty.channel.ChannelBufferHolder;
import io.netty.channel.ChannelBufferHolders;
import io.netty.channel.ChannelException;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
Expand Down Expand Up @@ -228,7 +230,11 @@ private void validate(ChannelFuture future) {
validate();
}

private class Acceptor extends ChannelInboundMessageHandlerAdapter<Channel> {
private class Acceptor extends ChannelInboundHandlerAdapter<Channel> {
@Override
public ChannelBufferHolder<Channel> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
}

@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
*/
package io.netty.channel;

import io.netty.buffer.ChannelBuffer;

import java.net.SocketAddress;
import java.util.Queue;

public class ChannelHandlerAdapter extends ChannelStateHandlerAdapter implements ChannelOperationHandler {

Expand Down Expand Up @@ -51,27 +48,11 @@ public void deregister(ChannelHandlerContext ctx, ChannelFuture future) throws E

@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future) throws Exception {
flush0(ctx, future);
}

static <O> void flush0(ChannelHandlerContext ctx, ChannelFuture future) {
if (ctx.hasOutboundMessageBuffer()) {
Queue<O> out = ctx.outboundMessageBuffer();
Queue<Object> nextOut = ctx.nextOutboundMessageBuffer();
for (;;) {
O msg = out.poll();
if (msg == null) {
break;
}
nextOut.add(msg);
}
} else if (ctx.hasOutboundByteBuffer()) {
ChannelBuffer out = ctx.outboundByteBuffer();
ChannelBuffer nextOut = ctx.nextOutboundByteBuffer();
nextOut.writeBytes(out);
out.discardReadBytes();
if (this instanceof ChannelOutboundHandler) {
throw new IllegalStateException(
"flush(...) must be overridden by " + getClass().getName() +
", which implements " + ChannelOutboundHandler.class.getSimpleName());
}

ctx.flush(future);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,24 @@
import io.netty.buffer.ChannelBuffer;


public class ChannelInboundByteHandlerAdapter extends ChannelInboundHandlerAdapter<Byte> {
public abstract class ChannelInboundByteHandlerAdapter extends ChannelInboundHandlerAdapter<Byte> {

@Override
public ChannelBufferHolder<Byte> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.byteBuffer();
}

@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
inboundBufferUpdated(ctx, ctx.inboundByteBuffer());
public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
ChannelBuffer in = ctx.inboundByteBuffer();
try {
inboundBufferUpdated(ctx, in);
} finally {
if (!in.readable()) {
in.discardReadBytes();
}
}
}

public void inboundBufferUpdated(ChannelHandlerContext ctx, ChannelBuffer in) throws Exception {
ctx.nextInboundByteBuffer().writeBytes(in);
in.discardReadBytes();
}
public abstract void inboundBufferUpdated(ChannelHandlerContext ctx, ChannelBuffer in) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@

public abstract class ChannelInboundHandlerAdapter<I> extends ChannelStateHandlerAdapter
implements ChannelInboundHandler<I> {
// Useful when you have to create an anonymous class
@Override
public abstract void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

import java.util.Queue;

public class ChannelInboundMessageHandlerAdapter<I> extends ChannelInboundHandlerAdapter<I> {
public abstract class ChannelInboundMessageHandlerAdapter<I> extends ChannelInboundHandlerAdapter<I> {

@Override
public ChannelBufferHolder<I> newInboundBuffer(ChannelHandlerContext ctx) throws Exception {
return ChannelBufferHolders.messageBuffer();
}

@Override
public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
public final void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
Queue<I> in = ctx.inboundMessageBuffer();
for (;;) {
I msg = in.poll();
Expand All @@ -34,14 +34,11 @@ public void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception {
}
try {
messageReceived(ctx, msg);
ctx.fireInboundBufferUpdated();
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
}
}

public void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception {
ctx.nextInboundMessageBuffer().add(msg);
}
public abstract void messageReceived(ChannelHandlerContext ctx, I msg) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ public void deregister(ChannelHandlerContext ctx, ChannelFuture future)
@Override
public void flush(ChannelHandlerContext ctx, ChannelFuture future)
throws Exception {
ChannelHandlerAdapter.flush0(ctx, future);
if (this instanceof ChannelOutboundHandler) {
throw new IllegalStateException(
"flush(...) must be overridden by " + getClass().getName() +
", which implements " + ChannelOutboundHandler.class.getSimpleName());
}
ctx.flush(future);
}
}
Loading

0 comments on commit 87f52aa

Please sign in to comment.