Skip to content

Commit

Permalink
Address @trustin 's comments and also make sure the accept of AIO is …
Browse files Browse the repository at this point in the history
…only triggered from the event loop. See netty#71
  • Loading branch information
normanmaurer committed Jul 31, 2012
1 parent 8b473dc commit 16a4088
Show file tree
Hide file tree
Showing 18 changed files with 77 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,4 @@ public interface ChannelStateHandler extends ChannelHandler {
void channelInactive(ChannelHandlerContext ctx) throws Exception;

void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements
final AtomicReference<ByteBridge> inByteBridge;
final AtomicReference<ByteBridge> outByteBridge;

final AtomicBoolean suspendRead = new AtomicBoolean(false);
final AtomicBoolean suspendRead = new AtomicBoolean();

// Runnables that calls handlers
final Runnable fireChannelRegisteredTask = new Runnable() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@ private void remove0(DefaultChannelHandlerContext ctx) {

callAfterRemove(ctx);

// make sure we clear the readable flag
// make sure the it's set back to readable
ctx.readable(true);
}

Expand Down Expand Up @@ -529,11 +529,8 @@ private void removeLast0(DefaultChannelHandlerContext oldTail) {

callBeforeRemove(oldTail);

// clear readable suspend if necessary
// make sure the it's set back to readable
oldTail.readable(true);



}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected boolean isCompatible(EventLoop loop) {
return loop instanceof AioChildEventLoop;
}

protected abstract class AsyncUnsafe extends AbstractUnsafe {
protected abstract class AbstractAioUnsafe extends AbstractUnsafe {

@Override
public void connect(final SocketAddress remoteAddress,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,15 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server

private final AioServerSocketChannelConfig config;
private boolean closed;
private AtomicBoolean readSuspend = new AtomicBoolean();
private AtomicBoolean readSuspended = new AtomicBoolean();

private final Runnable acceptTask = new Runnable() {

@Override
public void run() {
doAccept();
}
};

private static AsynchronousServerSocketChannel newSocket(AsynchronousChannelGroup group) {
try {
Expand Down Expand Up @@ -94,7 +102,7 @@ protected void doBind(SocketAddress localAddress) throws Exception {
}

private void doAccept() {
if (readSuspend.get()) {
if (readSuspended.get()) {
return;
}
javaChannel().accept(this, ACCEPT_HANDLER);
Expand Down Expand Up @@ -140,7 +148,7 @@ protected void completed0(AsynchronousSocketChannel ch, AioServerSocketChannel c
// create the socket add it to the buffer and fire the event
channel.pipeline().inboundMessageBuffer().add(
new AioSocketChannel(channel, null, channel.eventLoop, ch));
if (!channel.readSuspend.get()) {
if (!channel.readSuspended.get()) {
channel.pipeline().fireInboundBufferUpdated();
}
}
Expand All @@ -167,19 +175,25 @@ public ServerSocketChannelConfig config() {

@Override
protected Unsafe newUnsafe() {
return new AsyncUnsafe() {
return new AioServerSocketUnsafe();
}

@Override
public void suspendRead() {
readSuspend.set(true);
}
private final class AioServerSocketUnsafe extends AbstractAioUnsafe {

@Override
public void resumeRead() {
if (readSuspend.compareAndSet(true, false)) {
@Override
public void suspendRead() {
readSuspended.set(true);
}

@Override
public void resumeRead() {
if (readSuspended.compareAndSet(true, false)) {
if (eventLoop().inEventLoop()) {
doAccept();
} else {
eventLoop().execute(acceptTask);
}
}
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private static AsynchronousSocketChannel newSocket(AsynchronousChannelGroup grou
private final AioSocketChannelConfig config;
private boolean flushing;

private final AtomicBoolean readSuspend = new AtomicBoolean(false);
private final AtomicBoolean readSuspended = new AtomicBoolean();

private final Runnable readTask = new Runnable() {
@Override
Expand Down Expand Up @@ -187,7 +187,7 @@ protected void doFlushByteBuffer(ByteBuf buf) throws Exception {
}

private void beginRead() {
if (readSuspend.get()) {
if (readSuspended.get()) {
return;
}

Expand Down Expand Up @@ -284,7 +284,7 @@ protected void completed0(Integer result, AioSocketChannel channel) {
} catch (Throwable t) {
if (read) {
read = false;
if (!channel.readSuspend.get()) {
if (!channel.readSuspended.get()) {
pipeline.fireInboundBufferUpdated();
}
}
Expand All @@ -298,7 +298,7 @@ protected void completed0(Integer result, AioSocketChannel channel) {
}
} finally {
if (read) {
if (!channel.readSuspend.get()) {
if (!channel.readSuspended.get()) {
pipeline.fireInboundBufferUpdated();
}
}
Expand Down Expand Up @@ -333,13 +333,13 @@ private static final class ConnectHandler extends AioCompletionHandler<Void, Aio
@Override
protected void completed0(Void result, AioSocketChannel channel) {
channel.beginRead();
((AsyncUnsafe) channel.unsafe()).connectSuccess();
((AbstractAioUnsafe) channel.unsafe()).connectSuccess();
channel.pipeline().fireChannelActive();
}

@Override
protected void failed0(Throwable exc, AioSocketChannel channel) {
((AsyncUnsafe) channel.unsafe()).connectFailed(exc);
((AbstractAioUnsafe) channel.unsafe()).connectFailed(exc);
}
}

Expand All @@ -353,16 +353,16 @@ protected Unsafe newUnsafe() {
return new AioSocketChannelAsyncUnsafe();
}

private final class AioSocketChannelAsyncUnsafe extends AsyncUnsafe {
private final class AioSocketChannelAsyncUnsafe extends AbstractAioUnsafe {

@Override
public void suspendRead() {
readSuspend.set(true);
readSuspended.set(true);
}

@Override
public void resumeRead() {
if (readSuspend.compareAndSet(true, false)) {
if (readSuspended.compareAndSet(true, false)) {
if (eventLoop().inEventLoop()) {
beginRead();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ protected AbstractNioByteChannel(
}

@Override
protected abstract NioByteUnsafe newUnsafe();
protected abstract AbstractNioByteUnsafe newUnsafe();

protected abstract class NioByteUnsafe extends AbstractNioUnsafe {
abstract class AbstractNioByteUnsafe extends AbstractNioUnsafe {
@Override
public void read() {
assert eventLoop().inEventLoop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ public void finishConnect() {
connectFuture = null;
}
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ protected AbstractNioMessageChannel(
}

@Override
protected abstract NioMessageUnsafe newUnsafe();
protected abstract AbstractNioMessageUnsafe newUnsafe();

abstract class NioMessageUnsafe extends AbstractNioUnsafe {
abstract class AbstractNioMessageUnsafe extends AbstractNioUnsafe {
@Override
public void read() {
assert eventLoop().inEventLoop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,24 +462,20 @@ public ChannelFuture block(
}

@Override
protected NioMessageUnsafe newUnsafe() {
protected AbstractNioMessageUnsafe newUnsafe() {
return new NioDatagramChannelUnsafe();
}

private final class NioDatagramChannelUnsafe extends NioMessageUnsafe {
private final class NioDatagramChannelUnsafe extends AbstractNioMessageUnsafe {

@Override
public void suspendRead() {
if (!connected) {
selectionKey().interestOps(selectionKey().interestOps() & ~ SelectionKey.OP_READ);
}
selectionKey().interestOps(selectionKey().interestOps() & ~ SelectionKey.OP_READ);
}

@Override
public void resumeRead() {
if (!connected) {
selectionKey().interestOps(selectionKey().interestOps() & ~ SelectionKey.OP_READ);
}
selectionKey().interestOps(selectionKey().interestOps() & ~ SelectionKey.OP_READ);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ protected int doWriteMessages(MessageBuf<Object> buf, boolean lastSpin) throws E
}

@Override
protected NioMessageUnsafe newUnsafe() {
protected AbstractNioMessageUnsafe newUnsafe() {
return new NioServerSocketUnsafe();
}

private final class NioServerSocketUnsafe extends NioMessageUnsafe {
private final class NioServerSocketUnsafe extends AbstractNioMessageUnsafe {
@Override
public void suspendRead() {
selectionKey().interestOps(selectionKey().interestOps() & ~ SelectionKey.OP_ACCEPT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,11 @@ protected int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception {
}

@Override
protected NioByteUnsafe newUnsafe() {
protected AbstractNioByteUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}

private final class NioSocketChannelUnsafe extends NioByteUnsafe {
private final class NioSocketChannelUnsafe extends AbstractNioByteUnsafe {

@Override
public void suspendRead() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ protected AbstractOioByteChannel(Channel parent, Integer id) {
}

@Override
protected abstract OioByteUnsafe newUnsafe();
protected abstract AbstractOioByteUnsafe newUnsafe();

abstract class OioByteUnsafe extends AbstractOioUnsafe {
abstract class AbstractOioByteUnsafe extends AbstractOioUnsafe {
@Override
public void read() {
assert eventLoop().inEventLoop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public interface OioUnsafe extends Unsafe {
void read();
}

protected abstract class AbstractOioUnsafe extends AbstractUnsafe implements OioUnsafe {
abstract class AbstractOioUnsafe extends AbstractUnsafe implements OioUnsafe {

@Override
public void connect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ protected AbstractOioMessageChannel(Channel parent, Integer id) {
}

@Override
protected abstract OioMessageUnsafe newUnsafe();
protected abstract AbstractOioMessageUnsafe newUnsafe();

abstract class OioMessageUnsafe extends AbstractOioUnsafe {
abstract class AbstractOioMessageUnsafe extends AbstractOioUnsafe {
@Override
public void read() {
assert eventLoop().inEventLoop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel
private final DatagramChannelConfig config;
private final java.net.DatagramPacket tmpPacket = new java.net.DatagramPacket(EMPTY_DATA, 0);

private volatile boolean readSuspend;
private volatile boolean readSuspended;

private static MulticastSocket newSocket() {
try {
Expand Down Expand Up @@ -165,7 +165,7 @@ protected void doClose() throws Exception {

@Override
protected int doReadMessages(MessageBuf<Object> buf) throws Exception {
if (readSuspend) {
if (readSuspended) {
try {
Thread.sleep(SO_TIMEOUT);
} catch (InterruptedException e) {
Expand All @@ -186,7 +186,7 @@ protected int doReadMessages(MessageBuf<Object> buf) throws Exception {
buf.add(new DatagramPacket(Unpooled.wrappedBuffer(
data, tmpPacket.getOffset(), tmpPacket.getLength()), remoteAddr));

if (readSuspend) {
if (readSuspended) {
return 0;
} else {
return 1;
Expand Down Expand Up @@ -354,20 +354,20 @@ public ChannelFuture block(InetAddress multicastAddress,
}

@Override
protected OioMessageUnsafe newUnsafe() {
protected AbstractOioMessageUnsafe newUnsafe() {
return new OioDatagramChannelUnsafe();
}

private final class OioDatagramChannelUnsafe extends OioMessageUnsafe {
private final class OioDatagramChannelUnsafe extends AbstractOioMessageUnsafe {

@Override
public void suspendRead() {
readSuspend = true;
readSuspended = true;
}

@Override
public void resumeRead() {
readSuspend = false;
readSuspended = false;
}
}
}
Loading

0 comments on commit 16a4088

Please sign in to comment.