Skip to content

Commit

Permalink
客户端心跳
Browse files Browse the repository at this point in the history
  • Loading branch information
brucelwl committed Dec 16, 2020
1 parent e78118d commit b727af9
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
* 如果客户端接收到的是ping请求,则直接回复pong
*/
@ChannelHandler.Sharable
public class HeartBeatPongHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(HeartBeatPongHandler.class);
public class ClientHeartBeatHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(ClientHeartBeatHandler.class);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Expand All @@ -39,6 +39,10 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
if (idleStateEvent.state() == IdleState.ALL_IDLE) {
//向服务端发送心跳检测
ctx.writeAndFlush("ping");
} else if (idleStateEvent.state() == IdleState.READER_IDLE) {
//超过指定时间没有读事件,关闭连接
log.info("超过心跳时间,关闭和服务端的连接:{}", ctx.channel().remoteAddress());
ctx.channel().close();
}
} else {
super.userEventTriggered(ctx, evt);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@ public class RpcClientHandler extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("客户端接收:{}", msg);

DefaultResponseFuture.receive((RpcResponse)msg);
DefaultResponseFuture.receive((RpcResponse) msg);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.warn("远程主机主动关闭了连接,{}", ctx.channel().remoteAddress());
ctx.close();
}

@Override
Expand All @@ -25,5 +31,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
} else {
cause.printStackTrace();
}
ctx.close();
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.bruce.lightning.rpc.client.initial.json;

import com.bruce.lightning.rpc.client.handler.HeartBeatPongHandler;
import com.bruce.lightning.rpc.client.handler.ClientHeartBeatHandler;
import com.bruce.lightning.rpc.client.handler.RpcClientHandler;
import com.bruce.lightning.rpc.common.AppendDelimiterOutboundHandler;
import com.bruce.lightning.rpc.common.serial.JsonEncodeHandler;
Expand All @@ -21,7 +21,7 @@ public class ClientHandlerChannelInitializer extends ChannelInitializer<SocketCh
ClientJsonDecodeHandler clientJsonDecodeHandler = new ClientJsonDecodeHandler();
JsonEncodeHandler jsonEncodeHandler = new JsonEncodeHandler();

HeartBeatPongHandler heartBeatPongHandler = new HeartBeatPongHandler();
ClientHeartBeatHandler clientHeartBeatHandler = new ClientHeartBeatHandler();

@Override
protected void initChannel(SocketChannel ch) throws Exception {
Expand All @@ -32,7 +32,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
pipeline.addLast(stringEncoder); //字符串编码器
pipeline.addLast(appendDelimiterOutboundHandler); //写出添加分隔符处理器

pipeline.addLast(heartBeatPongHandler);
pipeline.addLast(clientHeartBeatHandler);

pipeline.addLast(clientJsonDecodeHandler);
pipeline.addLast(jsonEncodeHandler);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.bruce.lightning.rpc.client.initial.marshalling;

import com.bruce.lightning.rpc.client.handler.HeartBeatPongHandler;
import com.bruce.lightning.rpc.client.handler.ClientHeartBeatHandler;
import com.bruce.lightning.rpc.client.handler.RpcClientHandler;
import com.bruce.lightning.rpc.common.serial.MarshallingCodeFactory;
import io.netty.channel.ChannelInitializer;
Expand All @@ -10,7 +10,7 @@

public class MarshallingClientHandlerChannelInitializer extends ChannelInitializer<SocketChannel> {

HeartBeatPongHandler heartBeatPongHandler = new HeartBeatPongHandler();
ClientHeartBeatHandler clientHeartBeatHandler = new ClientHeartBeatHandler();

@Override
protected void initChannel(SocketChannel ch) throws Exception {
Expand All @@ -19,8 +19,8 @@ protected void initChannel(SocketChannel ch) throws Exception {
pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());
pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());

pipeline.addLast(new IdleStateHandler(0, 0, 5));
pipeline.addLast(heartBeatPongHandler);
pipeline.addLast(new IdleStateHandler(15, 0, 5));
pipeline.addLast(clientHeartBeatHandler);

pipeline.addLast(new RpcClientHandler());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
ctx.writeAndFlush(response);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.warn("远程主机主动关闭了连接,{}", ctx.channel().remoteAddress());
ctx.close();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Expand All @@ -31,5 +36,6 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
} else {
cause.printStackTrace();
}
ctx.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
* </pre>
*/
@ChannelHandler.Sharable
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(HeartBeatHandler.class);
public class ServerHeartBeatHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(ServerHeartBeatHandler.class);

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.bruce.lightning.rpc.common.AppendDelimiterOutboundHandler;
import com.bruce.lightning.rpc.common.serial.JsonEncodeHandler;
import com.bruce.lightning.rpc.server.handler.HeartBeatHandler;
import com.bruce.lightning.rpc.server.handler.ServerHeartBeatHandler;
import com.bruce.lightning.rpc.server.handler.RpcServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
Expand All @@ -22,7 +22,7 @@ public class ServerHandlerChannelInitializer extends ChannelInitializer<SocketCh
JsonEncodeHandler jsonEncodeHandler = new JsonEncodeHandler();
ServerJsonDecodeHandler serverJsonDecodeHandler = new ServerJsonDecodeHandler();

HeartBeatHandler heartBeatHandler = new HeartBeatHandler();
ServerHeartBeatHandler serverHeartBeatHandler = new ServerHeartBeatHandler();

@Override
protected void initChannel(SocketChannel ch) throws Exception {
Expand All @@ -34,7 +34,7 @@ protected void initChannel(SocketChannel ch) throws Exception {

//心跳检测机制(这里采用服务端向客户端发送ping机制)
pipeline.addLast(new IdleStateHandler(25, 0, 10));
pipeline.addLast(heartBeatHandler);
pipeline.addLast(serverHeartBeatHandler);

pipeline.addLast(serverJsonDecodeHandler);
pipeline.addLast(jsonEncodeHandler);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.bruce.lightning.rpc.server.initial.marshalling;

import com.bruce.lightning.rpc.common.serial.MarshallingCodeFactory;
import com.bruce.lightning.rpc.server.handler.HeartBeatHandler;
import com.bruce.lightning.rpc.server.handler.ServerHeartBeatHandler;
import com.bruce.lightning.rpc.server.handler.RpcServerHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
Expand All @@ -10,7 +10,7 @@

public class MarshallingServerHandlerChannelInitializer extends ChannelInitializer<SocketChannel> {

HeartBeatHandler heartBeatHandler = new HeartBeatHandler();
ServerHeartBeatHandler serverHeartBeatHandler = new ServerHeartBeatHandler();

@Override
protected void initChannel(SocketChannel ch) throws Exception {
Expand All @@ -20,7 +20,7 @@ protected void initChannel(SocketChannel ch) throws Exception {

//心跳检测机制(这里采用服务端向客户端发送ping机制)
pipeline.addLast(new IdleStateHandler(25, 0, 10));
pipeline.addLast(heartBeatHandler);
pipeline.addLast(serverHeartBeatHandler);

pipeline.addLast(new RpcServerHandler());
}
Expand Down

0 comments on commit b727af9

Please sign in to comment.