diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/client/handler/HeartBeatPongHandler.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/client/handler/ClientHeartBeatHandler.java similarity index 76% rename from lightning-rpc/src/main/java/com/bruce/lightning/rpc/client/handler/HeartBeatPongHandler.java rename to lightning-rpc/src/main/java/com/bruce/lightning/rpc/client/handler/ClientHeartBeatHandler.java index f774fd3..eb5fe06 100644 --- a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/client/handler/HeartBeatPongHandler.java +++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/client/handler/ClientHeartBeatHandler.java @@ -14,8 +14,8 @@ * 如果客户端接收到的是ping请求,则直接回复pong */ @ChannelHandler.Sharable -public class HeartBeatPongHandler extends ChannelInboundHandlerAdapter { - private static final Logger log = LoggerFactory.getLogger(HeartBeatPongHandler.class); +public class ClientHeartBeatHandler extends ChannelInboundHandlerAdapter { + private static final Logger log = LoggerFactory.getLogger(ClientHeartBeatHandler.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { @@ -39,6 +39,10 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc if (idleStateEvent.state() == IdleState.ALL_IDLE) { //向服务端发送心跳检测 ctx.writeAndFlush("ping"); + } else if (idleStateEvent.state() == IdleState.READER_IDLE) { + //超过指定时间没有读事件,关闭连接 + log.info("超过心跳时间,关闭和服务端的连接:{}", ctx.channel().remoteAddress()); + ctx.channel().close(); } } else { super.userEventTriggered(ctx, evt); diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/client/handler/RpcClientHandler.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/client/handler/RpcClientHandler.java index 4f04e0a..d91c678 100644 --- a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/client/handler/RpcClientHandler.java +++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/client/handler/RpcClientHandler.java @@ -15,7 +15,13 @@ public class RpcClientHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.info("客户端接收:{}", msg); - DefaultResponseFuture.receive((RpcResponse)msg); + DefaultResponseFuture.receive((RpcResponse) msg); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + log.warn("远程主机主动关闭了连接,{}", ctx.channel().remoteAddress()); + ctx.close(); } @Override @@ -25,5 +31,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E } else { cause.printStackTrace(); } + ctx.close(); } } diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/client/initial/json/ClientHandlerChannelInitializer.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/client/initial/json/ClientHandlerChannelInitializer.java index 104b427..466db2a 100644 --- a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/client/initial/json/ClientHandlerChannelInitializer.java +++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/client/initial/json/ClientHandlerChannelInitializer.java @@ -1,6 +1,6 @@ package com.bruce.lightning.rpc.client.initial.json; -import com.bruce.lightning.rpc.client.handler.HeartBeatPongHandler; +import com.bruce.lightning.rpc.client.handler.ClientHeartBeatHandler; import com.bruce.lightning.rpc.client.handler.RpcClientHandler; import com.bruce.lightning.rpc.common.AppendDelimiterOutboundHandler; import com.bruce.lightning.rpc.common.serial.JsonEncodeHandler; @@ -21,7 +21,7 @@ public class ClientHandlerChannelInitializer extends ChannelInitializer { - HeartBeatPongHandler heartBeatPongHandler = new HeartBeatPongHandler(); + ClientHeartBeatHandler clientHeartBeatHandler = new ClientHeartBeatHandler(); @Override protected void initChannel(SocketChannel ch) throws Exception { @@ -19,8 +19,8 @@ protected void initChannel(SocketChannel ch) throws Exception { pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder()); pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder()); - pipeline.addLast(new IdleStateHandler(0, 0, 5)); - pipeline.addLast(heartBeatPongHandler); + pipeline.addLast(new IdleStateHandler(15, 0, 5)); + pipeline.addLast(clientHeartBeatHandler); pipeline.addLast(new RpcClientHandler()); } diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/handler/RpcServerHandler.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/handler/RpcServerHandler.java index 5774d37..8cbc0c5 100644 --- a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/handler/RpcServerHandler.java +++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/handler/RpcServerHandler.java @@ -23,6 +23,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception ctx.writeAndFlush(response); } + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + log.warn("远程主机主动关闭了连接,{}", ctx.channel().remoteAddress()); + ctx.close(); + } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { @@ -31,5 +36,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E } else { cause.printStackTrace(); } + ctx.close(); } } diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/handler/HeartBeatHandler.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/handler/ServerHeartBeatHandler.java similarity index 93% rename from lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/handler/HeartBeatHandler.java rename to lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/handler/ServerHeartBeatHandler.java index 16f4f7c..b7e07b2 100644 --- a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/handler/HeartBeatHandler.java +++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/handler/ServerHeartBeatHandler.java @@ -27,8 +27,8 @@ * */ @ChannelHandler.Sharable -public class HeartBeatHandler extends ChannelInboundHandlerAdapter { - private static final Logger log = LoggerFactory.getLogger(HeartBeatHandler.class); +public class ServerHeartBeatHandler extends ChannelInboundHandlerAdapter { + private static final Logger log = LoggerFactory.getLogger(ServerHeartBeatHandler.class); @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/initial/json/ServerHandlerChannelInitializer.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/initial/json/ServerHandlerChannelInitializer.java index e9f62b2..3433db7 100644 --- a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/initial/json/ServerHandlerChannelInitializer.java +++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/initial/json/ServerHandlerChannelInitializer.java @@ -2,7 +2,7 @@ import com.bruce.lightning.rpc.common.AppendDelimiterOutboundHandler; import com.bruce.lightning.rpc.common.serial.JsonEncodeHandler; -import com.bruce.lightning.rpc.server.handler.HeartBeatHandler; +import com.bruce.lightning.rpc.server.handler.ServerHeartBeatHandler; import com.bruce.lightning.rpc.server.handler.RpcServerHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; @@ -22,7 +22,7 @@ public class ServerHandlerChannelInitializer extends ChannelInitializer { - HeartBeatHandler heartBeatHandler = new HeartBeatHandler(); + ServerHeartBeatHandler serverHeartBeatHandler = new ServerHeartBeatHandler(); @Override protected void initChannel(SocketChannel ch) throws Exception { @@ -20,7 +20,7 @@ protected void initChannel(SocketChannel ch) throws Exception { //心跳检测机制(这里采用服务端向客户端发送ping机制) pipeline.addLast(new IdleStateHandler(25, 0, 10)); - pipeline.addLast(heartBeatHandler); + pipeline.addLast(serverHeartBeatHandler); pipeline.addLast(new RpcServerHandler()); }