Skip to content

Commit 63b411d

Browse files
committed
Merge pull request apache#238 from ngbinh/upgradeNetty
upgrade Netty from 4.0.0.Beta2 to 4.0.13.Final the changes are listed at https://github.com/netty/netty/wiki/New-and-noteworthy
2 parents 55b7e2f + 2c5bade commit 63b411d

File tree

8 files changed

+60
-44
lines changed

8 files changed

+60
-44
lines changed

core/src/main/java/org/apache/spark/network/netty/FileClient.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,34 @@
2020
import io.netty.bootstrap.Bootstrap;
2121
import io.netty.channel.Channel;
2222
import io.netty.channel.ChannelOption;
23+
import io.netty.channel.EventLoopGroup;
2324
import io.netty.channel.oio.OioEventLoopGroup;
2425
import io.netty.channel.socket.oio.OioSocketChannel;
2526

2627
import org.slf4j.Logger;
2728
import org.slf4j.LoggerFactory;
2829

30+
import java.util.concurrent.TimeUnit;
31+
2932
class FileClient {
3033

3134
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
32-
private FileClientHandler handler = null;
35+
private final FileClientHandler handler;
3336
private Channel channel = null;
3437
private Bootstrap bootstrap = null;
35-
private int connectTimeout = 60*1000; // 1 min
38+
private EventLoopGroup group = null;
39+
private final int connectTimeout;
40+
private final int sendTimeout = 60; // 1 min
3641

3742
public FileClient(FileClientHandler handler, int connectTimeout) {
3843
this.handler = handler;
3944
this.connectTimeout = connectTimeout;
4045
}
4146

4247
public void init() {
48+
group = new OioEventLoopGroup();
4349
bootstrap = new Bootstrap();
44-
bootstrap.group(new OioEventLoopGroup())
50+
bootstrap.group(group)
4551
.channel(OioSocketChannel.class)
4652
.option(ChannelOption.SO_KEEPALIVE, true)
4753
.option(ChannelOption.TCP_NODELAY, true)
@@ -56,6 +62,7 @@ public void connect(String host, int port) {
5662
// ChannelFuture cf = channel.closeFuture();
5763
//cf.addListener(new ChannelCloseListener(this));
5864
} catch (InterruptedException e) {
65+
LOG.warn("FileClient interrupted while trying to connect", e);
5966
close();
6067
}
6168
}
@@ -71,16 +78,21 @@ public void waitForClose() {
7178
public void sendRequest(String file) {
7279
//assert(file == null);
7380
//assert(channel == null);
74-
channel.write(file + "\r\n");
81+
try {
82+
// Should be able to send the message to network link channel.
83+
boolean bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, TimeUnit.SECONDS);
84+
if (!bSent) {
85+
throw new RuntimeException("Failed to send");
86+
}
87+
} catch (InterruptedException e) {
88+
LOG.error("Error", e);
89+
}
7590
}
7691

7792
public void close() {
78-
if(channel != null) {
79-
channel.close();
80-
channel = null;
81-
}
82-
if ( bootstrap!=null) {
83-
bootstrap.shutdown();
93+
if (group != null) {
94+
group.shutdownGracefully();
95+
group = null;
8496
bootstrap = null;
8597
}
8698
}

core/src/main/java/org/apache/spark/network/netty/FileClientChannelInitializer.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,13 @@
1717

1818
package org.apache.spark.network.netty;
1919

20-
import io.netty.buffer.BufType;
2120
import io.netty.channel.ChannelInitializer;
2221
import io.netty.channel.socket.SocketChannel;
2322
import io.netty.handler.codec.string.StringEncoder;
2423

25-
2624
class FileClientChannelInitializer extends ChannelInitializer<SocketChannel> {
2725

28-
private FileClientHandler fhandler;
26+
private final FileClientHandler fhandler;
2927

3028
public FileClientChannelInitializer(FileClientHandler handler) {
3129
fhandler = handler;
@@ -35,7 +33,7 @@ public FileClientChannelInitializer(FileClientHandler handler) {
3533
public void initChannel(SocketChannel channel) {
3634
// file no more than 2G
3735
channel.pipeline()
38-
.addLast("encoder", new StringEncoder(BufType.BYTE))
36+
.addLast("encoder", new StringEncoder())
3937
.addLast("handler", fhandler);
4038
}
4139
}

core/src/main/java/org/apache/spark/network/netty/FileClientHandler.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919

2020
import io.netty.buffer.ByteBuf;
2121
import io.netty.channel.ChannelHandlerContext;
22-
import io.netty.channel.ChannelInboundByteHandlerAdapter;
22+
import io.netty.channel.SimpleChannelInboundHandler;
2323

2424
import org.apache.spark.storage.BlockId;
2525

26-
abstract class FileClientHandler extends ChannelInboundByteHandlerAdapter {
26+
abstract class FileClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
2727

2828
private FileHeader currentHeader = null;
2929

@@ -37,13 +37,7 @@ public boolean isComplete() {
3737
public abstract void handleError(BlockId blockId);
3838

3939
@Override
40-
public ByteBuf newInboundBuffer(ChannelHandlerContext ctx) {
41-
// Use direct buffer if possible.
42-
return ctx.alloc().ioBuffer();
43-
}
44-
45-
@Override
46-
public void inboundBufferUpdated(ChannelHandlerContext ctx, ByteBuf in) {
40+
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
4741
// get header
4842
if (currentHeader == null && in.readableBytes() >= FileHeader.HEADER_SIZE()) {
4943
currentHeader = FileHeader.create(in.readBytes(FileHeader.HEADER_SIZE()));

core/src/main/java/org/apache/spark/network/netty/FileServer.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,21 @@
2222
import io.netty.bootstrap.ServerBootstrap;
2323
import io.netty.channel.ChannelFuture;
2424
import io.netty.channel.ChannelOption;
25+
import io.netty.channel.EventLoopGroup;
2526
import io.netty.channel.oio.OioEventLoopGroup;
2627
import io.netty.channel.socket.oio.OioServerSocketChannel;
27-
2828
import org.slf4j.Logger;
2929
import org.slf4j.LoggerFactory;
3030

31-
3231
/**
3332
* Server that accept the path of a file an echo back its content.
3433
*/
3534
class FileServer {
3635

3736
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
3837

39-
private ServerBootstrap bootstrap = null;
38+
private EventLoopGroup bossGroup = null;
39+
private EventLoopGroup workerGroup = null;
4040
private ChannelFuture channelFuture = null;
4141
private int port = 0;
4242
private Thread blockingThread = null;
@@ -45,8 +45,11 @@ public FileServer(PathResolver pResolver, int port) {
4545
InetSocketAddress addr = new InetSocketAddress(port);
4646

4747
// Configure the server.
48-
bootstrap = new ServerBootstrap();
49-
bootstrap.group(new OioEventLoopGroup(), new OioEventLoopGroup())
48+
bossGroup = new OioEventLoopGroup();
49+
workerGroup = new OioEventLoopGroup();
50+
51+
ServerBootstrap bootstrap = new ServerBootstrap();
52+
bootstrap.group(bossGroup, workerGroup)
5053
.channel(OioServerSocketChannel.class)
5154
.option(ChannelOption.SO_BACKLOG, 100)
5255
.option(ChannelOption.SO_RCVBUF, 1500)
@@ -89,13 +92,19 @@ public int getPort() {
8992
public void stop() {
9093
// Close the bound channel.
9194
if (channelFuture != null) {
92-
channelFuture.channel().close();
95+
channelFuture.channel().close().awaitUninterruptibly();
9396
channelFuture = null;
9497
}
95-
// Shutdown bootstrap.
96-
if (bootstrap != null) {
97-
bootstrap.shutdown();
98-
bootstrap = null;
98+
99+
// Shutdown event groups
100+
if (bossGroup != null) {
101+
bossGroup.shutdownGracefully();
102+
bossGroup = null;
103+
}
104+
105+
if (workerGroup != null) {
106+
workerGroup.shutdownGracefully();
107+
workerGroup = null;
99108
}
100109
// TODO: Shutdown all accepted channels as well ?
101110
}

core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import io.netty.handler.codec.Delimiters;
2424
import io.netty.handler.codec.string.StringDecoder;
2525

26-
2726
class FileServerChannelInitializer extends ChannelInitializer<SocketChannel> {
2827

2928
PathResolver pResolver;
@@ -36,7 +35,7 @@ public FileServerChannelInitializer(PathResolver pResolver) {
3635
public void initChannel(SocketChannel channel) {
3736
channel.pipeline()
3837
.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()))
39-
.addLast("strDecoder", new StringDecoder())
38+
.addLast("stringDecoder", new StringDecoder())
4039
.addLast("handler", new FileServerHandler(pResolver));
4140
}
4241
}

core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,22 +21,26 @@
2121
import java.io.FileInputStream;
2222

2323
import io.netty.channel.ChannelHandlerContext;
24-
import io.netty.channel.ChannelInboundMessageHandlerAdapter;
24+
import io.netty.channel.SimpleChannelInboundHandler;
2525
import io.netty.channel.DefaultFileRegion;
2626

2727
import org.apache.spark.storage.BlockId;
2828
import org.apache.spark.storage.FileSegment;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
2931

30-
class FileServerHandler extends ChannelInboundMessageHandlerAdapter<String> {
32+
class FileServerHandler extends SimpleChannelInboundHandler<String> {
3133

32-
PathResolver pResolver;
34+
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
35+
36+
private final PathResolver pResolver;
3337

3438
public FileServerHandler(PathResolver pResolver){
3539
this.pResolver = pResolver;
3640
}
3741

3842
@Override
39-
public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
43+
public void channelRead0(ChannelHandlerContext ctx, String blockIdString) {
4044
BlockId blockId = BlockId.apply(blockIdString);
4145
FileSegment fileSegment = pResolver.getBlockLocation(blockId);
4246
// if getBlockLocation returns null, close the channel
@@ -60,10 +64,10 @@ public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
6064
int len = new Long(length).intValue();
6165
ctx.write((new FileHeader(len, blockId)).buffer());
6266
try {
63-
ctx.sendFile(new DefaultFileRegion(new FileInputStream(file)
67+
ctx.write(new DefaultFileRegion(new FileInputStream(file)
6468
.getChannel(), fileSegment.offset(), fileSegment.length()));
6569
} catch (Exception e) {
66-
e.printStackTrace();
70+
LOG.error("Exception: ", e);
6771
}
6872
} else {
6973
ctx.write(new FileHeader(0, blockId).buffer());
@@ -73,7 +77,7 @@ public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
7377

7478
@Override
7579
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
76-
cause.printStackTrace();
80+
LOG.error("Exception: ", cause);
7781
ctx.close();
7882
}
7983
}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@
282282
<dependency>
283283
<groupId>io.netty</groupId>
284284
<artifactId>netty-all</artifactId>
285-
<version>4.0.0.CR1</version>
285+
<version>4.0.13.Final</version>
286286
</dependency>
287287
<dependency>
288288
<groupId>org.apache.derby</groupId>

project/SparkBuild.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ object SparkBuild extends Build {
178178

179179

180180
libraryDependencies ++= Seq(
181-
"io.netty" % "netty-all" % "4.0.0.CR1",
181+
"io.netty" % "netty-all" % "4.0.13.Final",
182182
"org.eclipse.jetty" % "jetty-server" % "7.6.8.v20121106",
183183
/** Workaround for SPARK-959. Dependency used by org.eclipse.jetty. Fixed in ivy 2.3.0. */
184184
"org.eclipse.jetty.orbit" % "javax.servlet" % "2.5.0.v201103041518" artifacts Artifact("javax.servlet", "jar", "jar"),

0 commit comments

Comments
 (0)