Skip to content

Commit

Permalink
修改
Browse files Browse the repository at this point in the history
  • Loading branch information
yangzhenkun committed Jul 27, 2018
1 parent 2cf0fe8 commit aef425d
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 24 deletions.
71 changes: 60 additions & 11 deletions com.krpc.client/src/main/java/com/krpc/client/net/TCPClient.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
package com.krpc.client.net;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.krpc.client.core.RequestHandler;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
Expand All @@ -12,21 +22,27 @@
import io.netty.handler.codec.bytes.ByteArrayDecoder;
import io.netty.handler.codec.bytes.ByteArrayEncoder;


public class TCPClient {

public static String HOST = "127.0.0.1";
public static int PORT = 9999;
private Logger log = LoggerFactory.getLogger(this.getClass());
private AtomicInteger sessionId = new AtomicInteger(0);

private Map<Integer, ReceiverData> receiverDataWindow = new ConcurrentHashMap<Integer, ReceiverData>();

private static Bootstrap bootstrap;

public static Bootstrap bootstrap = getBootstrap();
public static Channel channel = getChannel(HOST, PORT);
static {
bootstrap = getBootstrap();
}

private Channel channel;

/**
* 初始化Bootstrap
*
* @return
*/
public static final Bootstrap getBootstrap() {
public static Bootstrap getBootstrap() {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class);
Expand All @@ -43,25 +59,58 @@ protected void initChannel(Channel ch) throws Exception {
});
return b;
}


public TCPClient(String host,Integer port){
this.channel = getChannel(host, port);
}

public static final Channel getChannel(String host, int port) {
Channel channel = null;
private Channel getChannel(String host, int port) {
try {
channel = bootstrap.connect(host, port).sync().channel();
} catch (Exception e) {
System.out.println(String.format("连接Server(IP[%s],PORT[%s])失败", host, port));
log.error("连接Server(IP{},PORT{})失败", host, port);
return null;
}
return channel;
}

public static void sendMsg(byte[] msg) throws Exception {
public void sendMsg(byte[] msg) throws Exception {
if (channel != null) {
channel.writeAndFlush(msg).sync();
} else {
System.out.println("消息发送失败,连接尚未建立!");
log.error("消息发送失败,连接尚未建立!");
}
}

/**
* 获取返回数据接口
*
* @return
*/
public byte[] getData(int sessionId, long timeout) throws Exception {

ReceiverData receiverData = receiverDataWindow.get(sessionId);
if (Objects.isNull(receiverData)) {
throw new Exception("未从等待窗口中取到数据");
}
byte[] respData = receiverData.getData(timeout);
if (Objects.isNull(respData)) {
throw new Exception("获取数据超时...");
}
receiverDataWindow.remove(sessionId);

return respData;
}


private Integer createSessionID() {

if (sessionId.get() == 1073741824) {// 1024^3
sessionId.compareAndSet(1073741824, 0);
}

return sessionId.getAndIncrement();
}

}
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
package com.krpc.client.net;

import com.krpc.common.serializer.HessianUtil;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.channel.ChannelInitializer;

public class TcpClientHandler extends ChannelInboundHandlerAdapter {

private Logger log = LoggerFactory.getLogger(this.getClass());



public TcpClientHandler() {
}

Expand All @@ -15,22 +26,12 @@ public TcpClientHandler() {
* 链路链接成功
*/
public void channelActive(ChannelHandlerContext ctx) throws Exception {
/*
* for (int i = 0; i < 1000; i++) { ctx.writeAndFlush("1ac"); }
*/
// 链接成功后发送
System.out.println("连接成功!!");
log.debug("连接成功!");
}

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

// String msgStr = (String) msg;
byte[] data = (byte[]) msg;
// byte[] data = msgStr.getBytes(CharsetUtil.UTF_8);
// System.out.println("client收到byte长度"+data.length);
// ctx.write("收到数据!");
// ctx.write(msg);
// ctx.write("w2d");

}

Expand Down
1 change: 1 addition & 0 deletions demo/com.a123.call/.gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/target/
/krpc.log

0 comments on commit aef425d

Please sign in to comment.