From 16a40883447f94ec6e56ef73cf374a9284b22ff7 Mon Sep 17 00:00:00 2001 From: norman Date: Tue, 31 Jul 2012 11:42:29 +0200 Subject: [PATCH] Address @trustin 's comments and also make sure the accept of AIO is only triggered from the event loop. See #71 --- .../io/netty/channel/ChannelStateHandler.java | 1 - .../channel/DefaultChannelHandlerContext.java | 2 +- .../netty/channel/DefaultChannelPipeline.java | 7 +--- .../socket/aio/AbstractAioChannel.java | 2 +- .../socket/aio/AioServerSocketChannel.java | 38 +++++++++++++------ .../channel/socket/aio/AioSocketChannel.java | 18 ++++----- .../socket/nio/AbstractNioByteChannel.java | 4 +- .../socket/nio/AbstractNioChannel.java | 1 - .../socket/nio/AbstractNioMessageChannel.java | 4 +- .../socket/nio/NioDatagramChannel.java | 12 ++---- .../socket/nio/NioServerSocketChannel.java | 4 +- .../channel/socket/nio/NioSocketChannel.java | 4 +- .../socket/oio/AbstractOioByteChannel.java | 4 +- .../socket/oio/AbstractOioChannel.java | 2 +- .../socket/oio/AbstractOioMessageChannel.java | 4 +- .../socket/oio/OioDatagramChannel.java | 14 +++---- .../socket/oio/OioServerSocketChannel.java | 14 +++---- .../channel/socket/oio/OioSocketChannel.java | 14 +++---- 18 files changed, 77 insertions(+), 72 deletions(-) diff --git a/transport/src/main/java/io/netty/channel/ChannelStateHandler.java b/transport/src/main/java/io/netty/channel/ChannelStateHandler.java index 348d48d05f29..d9d1823e11ca 100755 --- a/transport/src/main/java/io/netty/channel/ChannelStateHandler.java +++ b/transport/src/main/java/io/netty/channel/ChannelStateHandler.java @@ -23,5 +23,4 @@ public interface ChannelStateHandler extends ChannelHandler { void channelInactive(ChannelHandlerContext ctx) throws Exception; void inboundBufferUpdated(ChannelHandlerContext ctx) throws Exception; - } diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java index 7fbaaf0785cf..176d1d0e4f4a 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelHandlerContext.java @@ -63,7 +63,7 @@ final class DefaultChannelHandlerContext extends DefaultAttributeMap implements final AtomicReference inByteBridge; final AtomicReference outByteBridge; - final AtomicBoolean suspendRead = new AtomicBoolean(false); + final AtomicBoolean suspendRead = new AtomicBoolean(); // Runnables that calls handlers final Runnable fireChannelRegisteredTask = new Runnable() { diff --git a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java index e10911a5de2b..bb54ebc45c10 100755 --- a/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java +++ b/transport/src/main/java/io/netty/channel/DefaultChannelPipeline.java @@ -467,7 +467,7 @@ private void remove0(DefaultChannelHandlerContext ctx) { callAfterRemove(ctx); - // make sure we clear the readable flag + // make sure the it's set back to readable ctx.readable(true); } @@ -529,11 +529,8 @@ private void removeLast0(DefaultChannelHandlerContext oldTail) { callBeforeRemove(oldTail); - // clear readable suspend if necessary + // make sure the it's set back to readable oldTail.readable(true); - - - } @Override diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java index 3185b7368889..ea2d7a35045d 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AbstractAioChannel.java @@ -86,7 +86,7 @@ protected boolean isCompatible(EventLoop loop) { return loop instanceof AioChildEventLoop; } - protected abstract class AsyncUnsafe extends AbstractUnsafe { + protected abstract class AbstractAioUnsafe extends AbstractUnsafe { @Override public void connect(final SocketAddress remoteAddress, diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java index 0a594f10ff75..7563057ca233 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioServerSocketChannel.java @@ -42,7 +42,15 @@ public class AioServerSocketChannel extends AbstractAioChannel implements Server private final AioServerSocketChannelConfig config; private boolean closed; - private AtomicBoolean readSuspend = new AtomicBoolean(); + private AtomicBoolean readSuspended = new AtomicBoolean(); + + private final Runnable acceptTask = new Runnable() { + + @Override + public void run() { + doAccept(); + } + }; private static AsynchronousServerSocketChannel newSocket(AsynchronousChannelGroup group) { try { @@ -94,7 +102,7 @@ protected void doBind(SocketAddress localAddress) throws Exception { } private void doAccept() { - if (readSuspend.get()) { + if (readSuspended.get()) { return; } javaChannel().accept(this, ACCEPT_HANDLER); @@ -140,7 +148,7 @@ protected void completed0(AsynchronousSocketChannel ch, AioServerSocketChannel c // create the socket add it to the buffer and fire the event channel.pipeline().inboundMessageBuffer().add( new AioSocketChannel(channel, null, channel.eventLoop, ch)); - if (!channel.readSuspend.get()) { + if (!channel.readSuspended.get()) { channel.pipeline().fireInboundBufferUpdated(); } } @@ -167,19 +175,25 @@ public ServerSocketChannelConfig config() { @Override protected Unsafe newUnsafe() { - return new AsyncUnsafe() { + return new AioServerSocketUnsafe(); + } - @Override - public void suspendRead() { - readSuspend.set(true); - } + private final class AioServerSocketUnsafe extends AbstractAioUnsafe { - @Override - public void resumeRead() { - if (readSuspend.compareAndSet(true, false)) { + @Override + public void suspendRead() { + readSuspended.set(true); + } + + @Override + public void resumeRead() { + if (readSuspended.compareAndSet(true, false)) { + if (eventLoop().inEventLoop()) { doAccept(); + } else { + eventLoop().execute(acceptTask); } } - }; + } } } diff --git a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java index c571a67127bd..817cbc0af71b 100755 --- a/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/aio/AioSocketChannel.java @@ -53,7 +53,7 @@ private static AsynchronousSocketChannel newSocket(AsynchronousChannelGroup grou private final AioSocketChannelConfig config; private boolean flushing; - private final AtomicBoolean readSuspend = new AtomicBoolean(false); + private final AtomicBoolean readSuspended = new AtomicBoolean(); private final Runnable readTask = new Runnable() { @Override @@ -187,7 +187,7 @@ protected void doFlushByteBuffer(ByteBuf buf) throws Exception { } private void beginRead() { - if (readSuspend.get()) { + if (readSuspended.get()) { return; } @@ -284,7 +284,7 @@ protected void completed0(Integer result, AioSocketChannel channel) { } catch (Throwable t) { if (read) { read = false; - if (!channel.readSuspend.get()) { + if (!channel.readSuspended.get()) { pipeline.fireInboundBufferUpdated(); } } @@ -298,7 +298,7 @@ protected void completed0(Integer result, AioSocketChannel channel) { } } finally { if (read) { - if (!channel.readSuspend.get()) { + if (!channel.readSuspended.get()) { pipeline.fireInboundBufferUpdated(); } } @@ -333,13 +333,13 @@ private static final class ConnectHandler extends AioCompletionHandler buf, boolean lastSpin) throws E } @Override - protected NioMessageUnsafe newUnsafe() { + protected AbstractNioMessageUnsafe newUnsafe() { return new NioServerSocketUnsafe(); } - private final class NioServerSocketUnsafe extends NioMessageUnsafe { + private final class NioServerSocketUnsafe extends AbstractNioMessageUnsafe { @Override public void suspendRead() { selectionKey().interestOps(selectionKey().interestOps() & ~ SelectionKey.OP_ACCEPT); diff --git a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java index fb4eacb99e20..406ceee4c7d2 100755 --- a/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/nio/NioSocketChannel.java @@ -193,11 +193,11 @@ protected int doWriteBytes(ByteBuf buf, boolean lastSpin) throws Exception { } @Override - protected NioByteUnsafe newUnsafe() { + protected AbstractNioByteUnsafe newUnsafe() { return new NioSocketChannelUnsafe(); } - private final class NioSocketChannelUnsafe extends NioByteUnsafe { + private final class NioSocketChannelUnsafe extends AbstractNioByteUnsafe { @Override public void suspendRead() { diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java index fe743f090953..69d9c759277f 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioByteChannel.java @@ -28,9 +28,9 @@ protected AbstractOioByteChannel(Channel parent, Integer id) { } @Override - protected abstract OioByteUnsafe newUnsafe(); + protected abstract AbstractOioByteUnsafe newUnsafe(); - abstract class OioByteUnsafe extends AbstractOioUnsafe { + abstract class AbstractOioByteUnsafe extends AbstractOioUnsafe { @Override public void read() { assert eventLoop().inEventLoop(); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java index 702fd51d1950..7985d72bf9b6 100644 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioChannel.java @@ -50,7 +50,7 @@ public interface OioUnsafe extends Unsafe { void read(); } - protected abstract class AbstractOioUnsafe extends AbstractUnsafe implements OioUnsafe { + abstract class AbstractOioUnsafe extends AbstractUnsafe implements OioUnsafe { @Override public void connect( diff --git a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java index 51b8721fe638..8ef383d5c755 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/AbstractOioMessageChannel.java @@ -28,9 +28,9 @@ protected AbstractOioMessageChannel(Channel parent, Integer id) { } @Override - protected abstract OioMessageUnsafe newUnsafe(); + protected abstract AbstractOioMessageUnsafe newUnsafe(); - abstract class OioMessageUnsafe extends AbstractOioUnsafe { + abstract class AbstractOioMessageUnsafe extends AbstractOioUnsafe { @Override public void read() { assert eventLoop().inEventLoop(); diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java index 6cf917569f26..2586b0812fa6 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioDatagramChannel.java @@ -52,7 +52,7 @@ public class OioDatagramChannel extends AbstractOioMessageChannel private final DatagramChannelConfig config; private final java.net.DatagramPacket tmpPacket = new java.net.DatagramPacket(EMPTY_DATA, 0); - private volatile boolean readSuspend; + private volatile boolean readSuspended; private static MulticastSocket newSocket() { try { @@ -165,7 +165,7 @@ protected void doClose() throws Exception { @Override protected int doReadMessages(MessageBuf buf) throws Exception { - if (readSuspend) { + if (readSuspended) { try { Thread.sleep(SO_TIMEOUT); } catch (InterruptedException e) { @@ -186,7 +186,7 @@ protected int doReadMessages(MessageBuf buf) throws Exception { buf.add(new DatagramPacket(Unpooled.wrappedBuffer( data, tmpPacket.getOffset(), tmpPacket.getLength()), remoteAddr)); - if (readSuspend) { + if (readSuspended) { return 0; } else { return 1; @@ -354,20 +354,20 @@ public ChannelFuture block(InetAddress multicastAddress, } @Override - protected OioMessageUnsafe newUnsafe() { + protected AbstractOioMessageUnsafe newUnsafe() { return new OioDatagramChannelUnsafe(); } - private final class OioDatagramChannelUnsafe extends OioMessageUnsafe { + private final class OioDatagramChannelUnsafe extends AbstractOioMessageUnsafe { @Override public void suspendRead() { - readSuspend = true; + readSuspended = true; } @Override public void resumeRead() { - readSuspend = false; + readSuspended = false; } } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java index 8b7470a399eb..ec60234f7124 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioServerSocketChannel.java @@ -54,7 +54,7 @@ private static ServerSocket newServerSocket() { final Lock shutdownLock = new ReentrantLock(); private final ServerSocketChannelConfig config; - private volatile boolean readSuspend; + private volatile boolean readSuspended; public OioServerSocketChannel() { this(newServerSocket()); @@ -140,7 +140,7 @@ protected int doReadMessages(MessageBuf buf) throws Exception { return -1; } - if (readSuspend) { + if (readSuspended) { try { Thread.sleep(SO_TIMEOUT); } catch (InterruptedException e) { @@ -154,7 +154,7 @@ protected int doReadMessages(MessageBuf buf) throws Exception { s = socket.accept(); if (s != null) { buf.add(new OioSocketChannel(this, null, s)); - if (readSuspend) { + if (readSuspended) { return 0; } return 1; @@ -197,20 +197,20 @@ protected void doWriteMessages(MessageBuf buf) throws Exception { } @Override - protected OioMessageUnsafe newUnsafe() { + protected AbstractOioMessageUnsafe newUnsafe() { return new OioServerSocketUnsafe(); } - private final class OioServerSocketUnsafe extends OioMessageUnsafe { + private final class OioServerSocketUnsafe extends AbstractOioMessageUnsafe { @Override public void suspendRead() { - readSuspend = true; + readSuspended = true; } @Override public void resumeRead() { - readSuspend = false; + readSuspended = false; } } } diff --git a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java index 6f16aa10ffb4..6310af890db2 100755 --- a/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java +++ b/transport/src/main/java/io/netty/channel/socket/oio/OioSocketChannel.java @@ -46,7 +46,7 @@ public class OioSocketChannel extends AbstractOioByteChannel private final SocketChannelConfig config; private InputStream is; private OutputStream os; - private volatile boolean suspendRead; + private volatile boolean readSuspended; public OioSocketChannel() { this(new Socket()); @@ -162,7 +162,7 @@ protected int doReadBytes(ByteBuf buf) throws Exception { return -1; } - if (suspendRead) { + if (readSuspended) { try { Thread.sleep(SO_TIMEOUT); } catch (InterruptedException e) { @@ -173,7 +173,7 @@ protected int doReadBytes(ByteBuf buf) throws Exception { try { int read = buf.writeBytes(is, buf.writableBytes()); - if (read > 0 && !suspendRead) { + if (read > 0 && !readSuspended) { return read; } else { // so the read bytes were 0 or the read was suspend @@ -195,20 +195,20 @@ protected void doWriteBytes(ByteBuf buf) throws Exception { @Override - protected OioByteUnsafe newUnsafe() { + protected AbstractOioByteUnsafe newUnsafe() { return new OioSocketChannelUnsafe(); } - private final class OioSocketChannelUnsafe extends OioByteUnsafe { + private final class OioSocketChannelUnsafe extends AbstractOioByteUnsafe { @Override public void suspendRead() { - suspendRead = true; + readSuspended = true; } @Override public void resumeRead() { - suspendRead = false; + readSuspended = false; } } }