Skip to content

Commit

Permalink
公众号:bugstack虫洞栈 | Netty心跳服务与断线重连
Browse files Browse the repository at this point in the history
  • Loading branch information
fuzhengwei committed Apr 15, 2020
1 parent c4d4a28 commit 4d1c0ba
Show file tree
Hide file tree
Showing 9 changed files with 364 additions and 0 deletions.
15 changes: 15 additions & 0 deletions src/itstack-demo-netty/itstack-demo-netty-2-08/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>itstack-demo-netty</artifactId>
<groupId>org.itatack.demo</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>itstack-demo-netty-2-08</artifactId>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.itstack.demo.netty.client;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.EventLoop;

import java.util.concurrent.TimeUnit;

/**
* 虫洞栈:https://bugstack.cn
* 公众号:bugstack虫洞栈 {获取学习源码}
* Create by fuzhengwei on 2019
*/
public class MyChannelFutureListener implements ChannelFutureListener {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("itstack-demo-netty client start done. {关注公众号:bugstack虫洞栈,获取源码}");
return;
}
final EventLoop loop = channelFuture.channel().eventLoop();
loop.schedule(new Runnable() {
@Override
public void run() {
try {
new NettyClient().connect("127.0.0.1", 7397);
System.out.println("itstack-demo-netty client start done. {关注公众号:bugstack虫洞栈,获取源码}");
Thread.sleep(500);
} catch (Exception e){
System.out.println("itstack-demo-netty client start error go reconnect ... {关注公众号:bugstack虫洞栈,获取源码}");
}
}
}, 1L, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package org.itstack.demo.netty.client;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.nio.charset.Charset;

/**
* 虫洞栈:https://bugstack.cn
* 公众号:bugstack虫洞栈 {获取学习源码}
* Create by fuzhengwei on 2019
*/
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel channel) throws Exception {
// 基于换行符号
channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
// 解码转String,注意调整自己的编码格式GBK、UTF-8
channel.pipeline().addLast(new StringDecoder(Charset.forName("GBK")));
// 解码转String,注意调整自己的编码格式GBK、UTF-8
channel.pipeline().addLast(new StringEncoder(Charset.forName("GBK")));
// 在管道中添加我们自己的接收数据实现方法
channel.pipeline().addLast(new MyClientHandler());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package org.itstack.demo.netty.client;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.SocketChannel;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;

/**
* 虫洞栈:https://bugstack.cn
* 公众号:bugstack虫洞栈 {获取学习源码}
* Create by fuzhengwei on 2019
*/
public class MyClientHandler extends ChannelInboundHandlerAdapter {

/**
* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();
System.out.println("链接报告开始");
System.out.println("链接报告信息:本客户端链接到服务端。channelId:" + channel.id());
System.out.println("链接报告IP:" + channel.localAddress().getHostString());
System.out.println("链接报告Port:" + channel.localAddress().getPort());
System.out.println("链接报告完毕");
}

/**
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("断开链接重连" + ctx.channel().localAddress().toString());
//使用过程中断线重连
new Thread(new Runnable() {
@Override
public void run() {
try {
new NettyClient().connect("127.0.0.1", 7397);
System.out.println("itstack-demo-netty client start done. {关注公众号:bugstack虫洞栈,获取源码}");
Thread.sleep(500);
} catch (Exception e){
System.out.println("itstack-demo-netty client start error go reconnect ... {关注公众号:bugstack虫洞栈,获取源码}");
}
}
}).start();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收msg消息{与上一章节相比,此处已经不需要自己进行解码}
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);
}

/**
* 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常信息,断开重连:\r\n" + cause.getMessage());
ctx.close();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.itstack.demo.netty.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.util.concurrent.TimeUnit;

/**
* 虫洞栈:https://bugstack.cn
* 公众号:bugstack虫洞栈 {获取学习源码}
* Create by fuzhengwei on 2019
*/
public class NettyClient {

public static void main(String[] args) {
new NettyClient().connect("127.0.0.1", 7397);
}

public void connect(String inetHost, int inetPort) {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.AUTO_READ, true);
b.handler(new MyChannelInitializer());
ChannelFuture f = b.connect(inetHost, inetPort).sync();
f.addListener(new MyChannelFutureListener()); //添加监听,处理重连
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package org.itstack.demo.netty.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.timeout.IdleStateHandler;

import java.nio.charset.Charset;

/**
* 虫洞栈:https://bugstack.cn
* 公众号:bugstack虫洞栈 {获取学习源码}
* Create by fuzhengwei on 2019
*/
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel channel) {
/**
* 心跳监测
* 1、readerIdleTimeSeconds 读超时时间
* 2、writerIdleTimeSeconds 写超时时间
* 3、allIdleTimeSeconds 读写超时时间
* 4、TimeUnit.SECONDS 秒[默认为秒,可以指定]
*/
channel.pipeline().addLast(new IdleStateHandler(2, 2, 2));
// 基于换行符号
channel.pipeline().addLast(new LineBasedFrameDecoder(1024));
// 解码转String,注意调整自己的编码格式GBK、UTF-8
channel.pipeline().addLast(new StringDecoder(Charset.forName("GBK")));
// 解码转String,注意调整自己的编码格式GBK、UTF-8
channel.pipeline().addLast(new StringEncoder(Charset.forName("GBK")));
// 在管道中添加我们自己的接收数据实现方法
channel.pipeline().addLast(new MyServerHandler());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package org.itstack.demo.netty.server;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
* 虫洞栈:https://bugstack.cn
* 公众号:bugstack虫洞栈 {获取学习源码}
* Create by fuzhengwei on 2019
*/
public class MyServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
System.out.println("bugstack虫洞栈提醒=> Reader Idle");
ctx.writeAndFlush("读取等待:公众号bugstack虫洞栈,客户端你在吗[ctx.close()]{我结尾是一个换行符用于处理半包粘包}... ...\r\n");
ctx.close();
} else if (e.state() == IdleState.WRITER_IDLE) {
System.out.println("bugstack虫洞栈提醒=> Write Idle");
ctx.writeAndFlush("写入等待:公众号bugstack虫洞栈,客户端你在吗{我结尾是一个换行符用于处理半包粘包}... ...\r\n");
} else if (e.state() == IdleState.ALL_IDLE) {
System.out.println("bugstack虫洞栈提醒=> All_IDLE");
ctx.writeAndFlush("全部时间:公众号bugstack虫洞栈,客户端你在吗{我结尾是一个换行符用于处理半包粘包}... ...\r\n");
}
}
ctx.flush();
}

/**
* 当客户端主动链接服务端的链接后,这个通道就是活跃的了。也就是客户端与服务端建立了通信通道并且可以传输数据
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
SocketChannel channel = (SocketChannel) ctx.channel();
System.out.println("链接报告开始");
System.out.println("链接报告信息:有一客户端链接到本服务端");
System.out.println("链接报告IP:" + channel.localAddress().getHostString());
System.out.println("链接报告Port:" + channel.localAddress().getPort());
System.out.println("链接报告完毕");
//通知客户端链接建立成功
String str = "通知客户端链接建立成功" + " " + new Date() + " " + channel.localAddress().getHostString() + "\r\n";
ctx.writeAndFlush(str);
}

/**
* 当客户端主动断开服务端的链接后,这个通道就是不活跃的。也就是说客户端与服务端的关闭了通信通道并且不可以传输数据
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("客户端断开链接" + ctx.channel().localAddress().toString());
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//接收msg消息{与上一章节相比,此处已经不需要自己进行解码}
System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 接收到消息:" + msg);
//通知客户端链消息发送成功
String str = "服务端收到:" + new Date() + " " + msg + "\r\n";
ctx.writeAndFlush(str);
}

/**
* 抓住异常,当发生异常的时候,可以做一些相应的处理,比如打印日志、关闭链接
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
System.out.println("异常信息:\r\n" + cause.getMessage());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.itstack.demo.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
* 虫洞栈:https://bugstack.cn
* 公众号:bugstack虫洞栈 {获取学习源码}
* Create by fuzhengwei on 2019
*/
public class NettyServer {

public static void main(String[] args) {
new NettyServer().bing(7397);
}

private void bing(int port) {
//配置服务端NIO线程组
EventLoopGroup parentGroup = new NioEventLoopGroup(); //NioEventLoopGroup extends MultithreadEventLoopGroup Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
EventLoopGroup childGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(parentGroup, childGroup)
.channel(NioServerSocketChannel.class) //非阻塞模式
.option(ChannelOption.SO_BACKLOG, 128)
.childHandler(new MyChannelInitializer());
ChannelFuture f = b.bind(port).sync();
System.out.println("itstack-demo-netty server start done. {关注公众号:bugstack虫洞栈,获取源码}");
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
childGroup.shutdownGracefully();
parentGroup.shutdownGracefully();
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.itstack.demo.test;

/**
* 虫洞栈:https://bugstack.cn
* 公众号:bugstack虫洞栈 {获取学习源码}
* Create by fuzhengwei on 2019
*/
public class ApiTest {

public static void main(String[] args) {
System.out.println("公众号:bugstack虫洞栈 | 获取学习源码");
}

}

0 comments on commit 4d1c0ba

Please sign in to comment.