From aef425daabdc9bd1bc0bb2f46ce7930cb594cb6b Mon Sep 17 00:00:00 2001 From: yangzhenkun <1334036616@qq.com> Date: Fri, 27 Jul 2018 20:39:23 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/krpc/client/net/TCPClient.java | 71 ++++++++++++++++--- .../com/krpc/client/net/TcpClientHandler.java | 27 +++---- demo/com.a123.call/.gitignore | 1 + 3 files changed, 75 insertions(+), 24 deletions(-) diff --git a/com.krpc.client/src/main/java/com/krpc/client/net/TCPClient.java b/com.krpc.client/src/main/java/com/krpc/client/net/TCPClient.java index 93770ca..e6f2113 100644 --- a/com.krpc.client/src/main/java/com/krpc/client/net/TCPClient.java +++ b/com.krpc.client/src/main/java/com/krpc/client/net/TCPClient.java @@ -1,5 +1,15 @@ package com.krpc.client.net; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.krpc.client.core.RequestHandler; + import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; @@ -12,21 +22,27 @@ import io.netty.handler.codec.bytes.ByteArrayDecoder; import io.netty.handler.codec.bytes.ByteArrayEncoder; - public class TCPClient { - public static String HOST = "127.0.0.1"; - public static int PORT = 9999; + private Logger log = LoggerFactory.getLogger(this.getClass()); + private AtomicInteger sessionId = new AtomicInteger(0); + + private Map receiverDataWindow = new ConcurrentHashMap(); + + private static Bootstrap bootstrap; - public static Bootstrap bootstrap = getBootstrap(); - public static Channel channel = getChannel(HOST, PORT); + static { + bootstrap = getBootstrap(); + } + + private Channel channel; /** * 初始化Bootstrap * * @return */ - public static final Bootstrap getBootstrap() { + public static Bootstrap getBootstrap() { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class); @@ -43,25 +59,58 @@ protected void initChannel(Channel ch) throws Exception { }); return b; } + + + public TCPClient(String host,Integer port){ + this.channel = getChannel(host, port); + } - public static final Channel getChannel(String host, int port) { - Channel channel = null; + private Channel getChannel(String host, int port) { try { channel = bootstrap.connect(host, port).sync().channel(); } catch (Exception e) { - System.out.println(String.format("连接Server(IP[%s],PORT[%s])失败", host, port)); + log.error("连接Server(IP{},PORT{})失败", host, port); return null; } return channel; } - public static void sendMsg(byte[] msg) throws Exception { + public void sendMsg(byte[] msg) throws Exception { if (channel != null) { channel.writeAndFlush(msg).sync(); } else { - System.out.println("消息发送失败,连接尚未建立!"); + log.error("消息发送失败,连接尚未建立!"); + } + } + + /** + * 获取返回数据接口 + * + * @return + */ + public byte[] getData(int sessionId, long timeout) throws Exception { + + ReceiverData receiverData = receiverDataWindow.get(sessionId); + if (Objects.isNull(receiverData)) { + throw new Exception("未从等待窗口中取到数据"); } + byte[] respData = receiverData.getData(timeout); + if (Objects.isNull(respData)) { + throw new Exception("获取数据超时..."); + } + receiverDataWindow.remove(sessionId); + + return respData; } + + + private Integer createSessionID() { + if (sessionId.get() == 1073741824) {// 1024^3 + sessionId.compareAndSet(1073741824, 0); + } + + return sessionId.getAndIncrement(); + } } diff --git a/com.krpc.client/src/main/java/com/krpc/client/net/TcpClientHandler.java b/com.krpc.client/src/main/java/com/krpc/client/net/TcpClientHandler.java index 8dc1650..ab502d7 100644 --- a/com.krpc.client/src/main/java/com/krpc/client/net/TcpClientHandler.java +++ b/com.krpc.client/src/main/java/com/krpc/client/net/TcpClientHandler.java @@ -1,12 +1,23 @@ package com.krpc.client.net; -import com.krpc.common.serializer.HessianUtil; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.util.CharsetUtil; +import io.netty.channel.ChannelInitializer; public class TcpClientHandler extends ChannelInboundHandlerAdapter { + + private Logger log = LoggerFactory.getLogger(this.getClass()); + + + public TcpClientHandler() { } @@ -15,22 +26,12 @@ public TcpClientHandler() { * 链路链接成功 */ public void channelActive(ChannelHandlerContext ctx) throws Exception { - /* - * for (int i = 0; i < 1000; i++) { ctx.writeAndFlush("1ac"); } - */ - // 链接成功后发送 - System.out.println("连接成功!!"); + log.debug("连接成功!"); } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { -// String msgStr = (String) msg; byte[] data = (byte[]) msg; -// byte[] data = msgStr.getBytes(CharsetUtil.UTF_8); -// System.out.println("client收到byte长度"+data.length); - // ctx.write("收到数据!"); - // ctx.write(msg); - // ctx.write("w2d"); } diff --git a/demo/com.a123.call/.gitignore b/demo/com.a123.call/.gitignore index b83d222..901019b 100644 --- a/demo/com.a123.call/.gitignore +++ b/demo/com.a123.call/.gitignore @@ -1 +1,2 @@ /target/ +/krpc.log