Skip to content

Commit

Permalink
Allow to create Epoll*Channel from FileDescriptor
Browse files Browse the repository at this point in the history
Motivation:

Sometimes it's useful to be able to create a Epoll*Channel from an existing file descriptor. This is especially helpful if you integrade some c/jni code.

Modifications:

- Add extra constructor to Epoll*Channel implementations that take a FileDescriptor as an argument
- Make Rename EpollFileDescriptor to NativeFileDescriptor and make it public
- Also ensure we obtain the correct remote/local address when create a Channel from a FileDescriptor

Result:

It's now possible to create a FileDescriptor and instance a Epoll*Channel via it.
  • Loading branch information
normanmaurer committed Feb 9, 2015
1 parent 8f4ead0 commit 6b941e9
Show file tree
Hide file tree
Showing 14 changed files with 98 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1222,6 +1222,14 @@ JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getSoLinger(JNIEnv* en
}
}

JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getSoError(JNIEnv* env, jclass clazz, jint fd) {
int optval = 0;
if (getOption(env, fd, SOL_SOCKET, SO_ERROR, &optval, sizeof(optval)) == -1) {
return optval;
}
return 0;
}

JNIEXPORT jint JNICALL Java_io_netty_channel_epoll_Native_getTrafficClass(JNIEnv* env, jclass clazz, jint fd) {
int optval;
if (getOption(env, fd, IPPROTO_IP, IP_TOS, &optval, sizeof(optval)) == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ jint Java_io_netty_channel_epoll_Native_isBroadcast(JNIEnv* env, jclass clazz, j
jint Java_io_netty_channel_epoll_Native_getTcpKeepIdle(JNIEnv* env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_getTcpKeepIntvl(JNIEnv* env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_getTcpKeepCnt(JNIEnv* env, jclass clazz, jint fd);
jint Java_io_netty_channel_epoll_Native_getSoError(JNIEnv* env, jclass clazz, jint fd);

jstring Java_io_netty_channel_epoll_Native_kernelVersion(JNIEnv* env, jclass clazz);
jint Java_io_netty_channel_epoll_Native_iovMax(JNIEnv* env, jclass clazz);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,18 @@ abstract class AbstractEpollChannel extends AbstractChannel {
}

AbstractEpollChannel(Channel parent, int fd, int flag, boolean active) {
this(parent, new NativeFileDescriptor(fd), flag, active);
}

AbstractEpollChannel(Channel parent, FileDescriptor fd, int flag, boolean active) {
super(parent);
if (fd == null) {
throw new NullPointerException("fd");
}
readFlag = flag;
flags |= flag;
this.active = active;
fileDescriptor = new EpollFileDescriptor(fd);
fileDescriptor = fd;
}

void setFlag(int flag) {
Expand Down Expand Up @@ -113,7 +120,7 @@ protected boolean isCompatible(EventLoop loop) {

@Override
public boolean isOpen() {
return fileDescriptor != EpollFileDescriptor.INVALID;
return fileDescriptor != FileDescriptor.INVALID;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.FileDescriptor;
import io.netty.channel.ServerChannel;

import java.net.InetSocketAddress;
Expand All @@ -32,6 +33,10 @@ protected AbstractEpollServerChannel(int fd) {
super(fd, Native.EPOLLIN);
}

protected AbstractEpollServerChannel(FileDescriptor fd) {
super(null, fd, Native.EPOLLIN, Native.getSoError(fd.intValue()) == 0);
}

@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof EpollEventLoop;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.netty.channel.ConnectTimeoutException;
import io.netty.channel.DefaultFileRegion;
import io.netty.channel.EventLoop;
import io.netty.channel.FileDescriptor;
import io.netty.channel.RecvByteBufAllocator;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.util.internal.PlatformDependent;
Expand Down Expand Up @@ -61,6 +62,10 @@ protected AbstractEpollStreamChannel(int fd) {
flags |= Native.EPOLLRDHUP;
}

protected AbstractEpollStreamChannel(FileDescriptor fd) {
super(null, fd, Native.EPOLLIN, Native.getSoError(fd.intValue()) == 0);
}

@Override
protected AbstractEpollUnsafe newUnsafe() {
return new EpollStreamUnsafe();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ public EpollDatagramChannel() {
config = new EpollDatagramChannelConfig(this);
}

/**
* Create a new {@link EpollDatagramChannel} from the given {@link FileDescriptor}.
*/
public EpollDatagramChannel(FileDescriptor fd) {
super(null, fd, Native.EPOLLIN, true);
config = new EpollDatagramChannelConfig(this);

// As we create an EpollDatagramChannel from a FileDescriptor we should try to obtain the remote and local
// address from it. This is needed as the FileDescriptor may be bound already.
local = Native.localAddress(fd.intValue());
}

@Override
public InetSocketAddress remoteAddress() {
return (InetSocketAddress) super.remoteAddress();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public EpollDomainSocketChannel(Channel parent, FileDescriptor fd) {
super(parent, fd.intValue());
}

/**
* Creates a new {@link EpollDomainSocketChannel} from an existing {@link FileDescriptor}
*/
public EpollDomainSocketChannel(FileDescriptor fd) {
super(fd);
}

EpollDomainSocketChannel(Channel parent, int fd) {
super(parent, fd);
}
Expand Down Expand Up @@ -100,7 +107,7 @@ protected boolean doWriteSingle(ChannelOutboundBuffer in, int writeSpinCount) th

@Override
protected Object filterOutboundMessage(Object msg) {
if (msg instanceof EpollFileDescriptor) {
if (msg instanceof NativeFileDescriptor) {
return msg;
}
return super.filterOutboundMessage(msg);
Expand Down Expand Up @@ -143,7 +150,7 @@ private void epollInReadFd() {
readPending = false;

try {
pipeline.fireChannelRead(new EpollFileDescriptor(socketFd));
pipeline.fireChannelRead(new NativeFileDescriptor(socketFd));
} catch (Throwable t) {
// keep on reading as we use epoll ET and need to consume everything from the socket
pipeline.fireChannelReadComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public EpollDomainSocketChannelConfig setEpollMode(EpollMode mode) {
* {@link DomainSocketReadMode#BYTES} which means bytes will be read from the
* {@link Channel} and passed through the pipeline. If
* {@link DomainSocketReadMode#FILE_DESCRIPTORS} is used
* {@link EpollFileDescriptor}s will be passed through the {@link ChannelPipeline}.
* {@link NativeFileDescriptor}s will be passed through the {@link ChannelPipeline}.
*
* This setting can be modified on the fly if needed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.netty.channel.epoll;

import io.netty.channel.Channel;
import io.netty.channel.FileDescriptor;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

Expand All @@ -34,6 +35,13 @@ public EpollServerDomainSocketChannel() {
super(Native.socketDomainFd());
}

/**
* Creates a new {@link EpollServerDomainSocketChannel} from an existing {@link FileDescriptor}.
*/
public EpollServerDomainSocketChannel(FileDescriptor fd) {
super(fd);
}

@Override
protected Channel newChildChannel(int fd) throws Exception {
return new EpollDomainSocketChannel(this, fd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.channel.FileDescriptor;
import io.netty.channel.socket.ServerSocketChannel;

import java.net.InetSocketAddress;
Expand All @@ -36,6 +37,18 @@ public EpollServerSocketChannel() {
config = new EpollServerSocketChannelConfig(this);
}

/**
* Creates a new {@link EpollServerSocketChannel} from an existing {@link FileDescriptor}.
*/
public EpollServerSocketChannel(FileDescriptor fd) {
super(fd);
config = new EpollServerSocketChannelConfig(this);

// As we create an EpollServerSocketChannel from a FileDescriptor we should try to obtain the remote and local
// address from it. This is needed as the FileDescriptor may be bound already.
local = Native.localAddress(fd.intValue());
}

@Override
protected boolean isCompatible(EventLoop loop) {
return loop instanceof EpollEventLoop;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import io.netty.channel.FileDescriptor;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GlobalEventExecutor;
Expand Down Expand Up @@ -53,6 +54,19 @@ public EpollSocketChannel() {
config = new EpollSocketChannelConfig(this);
}

/**
* Creates a new {@link EpollSocketChannel} from an existing {@link FileDescriptor}.
*/
public EpollSocketChannel(FileDescriptor fd) {
super(fd);
config = new EpollSocketChannelConfig(this);

// As we create an EpollSocketChannel from a FileDescriptor we should try to obtain the remote and local
// address from it. This is needed as the FileDescriptor may be bound/connected already.
remote = Native.remoteAddress(fd.intValue());
local = Native.localAddress(fd.intValue());
}

/**
* Returns the {@code TCP_INFO} for the current socket. See <a href="http://linux.die.net/man/7/tcp">man 7 tcp</a>.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,7 @@ public static void shutdown(int fd, boolean read, boolean write) throws IOExcept
public static native int getTcpKeepIdle(int fd);
public static native int getTcpKeepIntvl(int fd);
public static native int getTcpKeepCnt(int fd);
public static native int getSoError(int fd);

public static native void setKeepAlive(int fd, int keepAlive);
public static native void setReceiveBufferSize(int fd, int receiveBufferSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@

import java.io.IOException;

final class EpollFileDescriptor implements FileDescriptor {
/**
* Native {@link FileDescriptor} implementation which allows to wrap an {@code int} and provide a
* {@link FileDescriptor} for it.
*/
public final class NativeFileDescriptor implements FileDescriptor {

private final int fd;

EpollFileDescriptor(int fd) {
public NativeFileDescriptor(int fd) {
if (fd < 0) {
throw new IllegalArgumentException("fd must be >= 0");
}
Expand Down Expand Up @@ -52,11 +56,11 @@ public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof EpollFileDescriptor)) {
if (!(o instanceof NativeFileDescriptor)) {
return false;
}

return fd == ((EpollFileDescriptor) o).fd;
return fd == ((NativeFileDescriptor) o).fd;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
cb.handler(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
EpollFileDescriptor fd = (EpollFileDescriptor) msg;
NativeFileDescriptor fd = (NativeFileDescriptor) msg;
queue.offer(fd);
}

Expand All @@ -90,9 +90,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
cc.close().sync();
sc.close().sync();

if (received instanceof EpollFileDescriptor) {
Assert.assertNotSame(EpollFileDescriptor.INVALID, received);
((EpollFileDescriptor) received).close();
if (received instanceof NativeFileDescriptor) {
Assert.assertNotSame(NativeFileDescriptor.INVALID, received);
((NativeFileDescriptor) received).close();
Assert.assertNull(queue.poll());
} else {
throw (Throwable) received;
Expand Down

0 comments on commit 6b941e9

Please sign in to comment.