Skip to content

Commit 2c5bade

Browse files
committed
Fix failed unit tests
Also clean up a bit.
1 parent 786f393 commit 2c5bade

File tree

3 files changed

+24
-13
lines changed

3 files changed

+24
-13
lines changed

core/src/main/java/org/apache/spark/network/netty/FileClient.java

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,17 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

30+
import java.util.concurrent.TimeUnit;
31+
3032
class FileClient {
3133

3234
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
33-
private FileClientHandler handler = null;
35+
private final FileClientHandler handler;
3436
private Channel channel = null;
3537
private Bootstrap bootstrap = null;
3638
private EventLoopGroup group = null;
37-
private int connectTimeout = 60*1000; // 1 min
39+
private final int connectTimeout;
40+
private final int sendTimeout = 60; // 1 min
3841

3942
public FileClient(FileClientHandler handler, int connectTimeout) {
4043
this.handler = handler;
@@ -43,7 +46,7 @@ public FileClient(FileClientHandler handler, int connectTimeout) {
4346

4447
public void init() {
4548
group = new OioEventLoopGroup();
46-
Bootstrap bootstrap = new Bootstrap();
49+
bootstrap = new Bootstrap();
4750
bootstrap.group(group)
4851
.channel(OioSocketChannel.class)
4952
.option(ChannelOption.SO_KEEPALIVE, true)
@@ -59,6 +62,7 @@ public void connect(String host, int port) {
5962
// ChannelFuture cf = channel.closeFuture();
6063
//cf.addListener(new ChannelCloseListener(this));
6164
} catch (InterruptedException e) {
65+
LOG.warn("FileClient interrupted while trying to connect", e);
6266
close();
6367
}
6468
}
@@ -74,15 +78,18 @@ public void waitForClose() {
7478
public void sendRequest(String file) {
7579
//assert(file == null);
7680
//assert(channel == null);
77-
channel.write(file + "\r\n");
81+
try {
82+
// Should be able to send the message to network link channel.
83+
boolean bSent = channel.writeAndFlush(file + "\r\n").await(sendTimeout, TimeUnit.SECONDS);
84+
if (!bSent) {
85+
throw new RuntimeException("Failed to send");
86+
}
87+
} catch (InterruptedException e) {
88+
LOG.error("Error", e);
89+
}
7890
}
7991

8092
public void close() {
81-
if(channel != null) {
82-
channel.close().awaitUninterruptibly();
83-
channel = null;
84-
}
85-
8693
if (group != null) {
8794
group.shutdownGracefully();
8895
group = null;

core/src/main/java/org/apache/spark/network/netty/FileServerChannelInitializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public FileServerChannelInitializer(PathResolver pResolver) {
3535
public void initChannel(SocketChannel channel) {
3636
channel.pipeline()
3737
.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()))
38-
.addLast("strDecoder", new StringDecoder())
38+
.addLast("stringDecoder", new StringDecoder())
3939
.addLast("handler", new FileServerHandler(pResolver));
4040
}
4141
}

core/src/main/java/org/apache/spark/network/netty/FileServerHandler.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,14 @@
2626

2727
import org.apache.spark.storage.BlockId;
2828
import org.apache.spark.storage.FileSegment;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
2931

3032
class FileServerHandler extends SimpleChannelInboundHandler<String> {
3133

32-
PathResolver pResolver;
34+
private Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
35+
36+
private final PathResolver pResolver;
3337

3438
public FileServerHandler(PathResolver pResolver){
3539
this.pResolver = pResolver;
@@ -63,7 +67,7 @@ public void channelRead0(ChannelHandlerContext ctx, String blockIdString) {
6367
ctx.write(new DefaultFileRegion(new FileInputStream(file)
6468
.getChannel(), fileSegment.offset(), fileSegment.length()));
6569
} catch (Exception e) {
66-
e.printStackTrace();
70+
LOG.error("Exception: ", e);
6771
}
6872
} else {
6973
ctx.write(new FileHeader(0, blockId).buffer());
@@ -73,7 +77,7 @@ public void channelRead0(ChannelHandlerContext ctx, String blockIdString) {
7377

7478
@Override
7579
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
76-
cause.printStackTrace();
80+
LOG.error("Exception: ", cause);
7781
ctx.close();
7882
}
7983
}

0 commit comments

Comments
 (0)