Skip to content

Commit

Permalink
Merge pull request #2 from yangzhenkun/zk
Browse files Browse the repository at this point in the history
增加注册中心zk
  • Loading branch information
yangzhenkun authored Nov 26, 2018
2 parents ad2f0d0 + a44ad50 commit 4c4cae3
Show file tree
Hide file tree
Showing 26 changed files with 766 additions and 389 deletions.
24 changes: 16 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@

### 如何使用

编译好的环境 [release](https://github.com/yangzhenkun/krpc/releases/tag/1.0)
编译好的服务端环境 [release](https://github.com/yangzhenkun/krpc/releases/tag/1.0)

#### 1.服务端
解压后server文件夹中就是服务端环境,如demo所示,server/service中有一个user文件,就是我们部署的user服务,下面有两个必须的文件夹conf(配置文件)

log4j.xml是该服务日志的标准的log4j配置文件,如果想修改日志路径
```java
```xml
<!-- 输出日志到文件 每天一个文件 -->
<appender name="dailyRollingFile"
class="org.apache.log4j.DailyRollingFileAppender">
Expand All @@ -26,14 +26,17 @@ log4j.xml是该服务日志的标准的log4j配置文件,如果想修改日志
</layout>
</appender>
```
修改<param name="File" value="D:/opt/krpc/log/user/krpc.log"></param>值即可
修改<param name="File" value="/opt/krpc/log/user/krpc.log"></param>值即可

server.xml文件为服务的配置文件

```xml
<configuration>


<!-- 配置注册中心 如果不配置,则不使用 -->
<zk sessionTimeOut="20000" connectionTimeOut="20000">
<addr>127.0.0.1:2181,127.0.0.1:3333</addr>
</zk>
<!-- 连接相关参数 -->
<property>
<!-- 该服务监听的本机IP和tcp端口 -->
Expand All @@ -60,8 +63,8 @@ server.xml文件为服务的配置文件

**启动**
启动在server/bin里面,执行
```java
java -jar com.krpc.server-0.0.1.jar 服务名
```
java -jar com.krpc.server-0.0.1.jar 服务名
```
命令,查看日志,如果看到 启动成功,监听端口*** 的日志,恭喜你,服务端启动成功。

Expand All @@ -77,8 +80,13 @@ krpc提供了服务端镜像,所以每个服务都可以在krpc提供的docker
使用需要先调用KRPC.init("client配置文件")进行初始化
配置在client/client.xml中

```java

```xml

<!-- 配置注册中心-->
<zk sessionTimeOut="2000" connectionTimeOut="2000">
<addr>127.0.0.1:2181,127.0.0.1:3333</addr>
</zk>

<!-- 所连接的服务配置文件 name的值可以任意指定,只要在ProxyFactory.create的第二个参数值相同即可 -->
<!--用户服务 -->
<Service name="user" id="1" maxThreadCount="50">
Expand Down
4 changes: 2 additions & 2 deletions com.krpc.client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.krpc</groupId>
<artifactId>com.krpc.client</artifactId>
<version>0.0.1</version>
<version>0.0.2</version>
<packaging>jar</packaging>

<name>com.krpc.client</name>
Expand All @@ -25,7 +25,7 @@
<dependency>
<groupId>com.krpc</groupId>
<artifactId>com.krpc.common</artifactId>
<version>0.0.1</version>
<version>0.0.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
155 changes: 81 additions & 74 deletions com.krpc.client/src/main/java/com/krpc/client/KRPC.java
Original file line number Diff line number Diff line change
@@ -1,87 +1,94 @@
package com.krpc.client;

import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import com.krpc.client.core.ZkRegisterCenter;
import com.krpc.client.entity.Address;
import com.krpc.client.entity.ServiceParams;
import com.krpc.common.entity.ZookeeperInfo;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.io.SAXReader;

import com.krpc.client.entity.Address;
import com.krpc.client.entity.ServiceParams;
import java.io.File;
import java.util.ArrayList;
import java.util.List;

/**
_ _______ _____ _____
| |/ | __ \| __ \ / ____|
| ' /| |__) | |__) | |
| < | _ /| ___/| |
| . \| | \ \| | | |____
|_|\_|_| \_|_| \_____|
@author yangzhenkun
* _ _______ _____ _____
* | |/ | __ \| __ \ / ____|
* | ' /| |__) | |__) | |
* | < | _ /| ___/| |
* | . \| | \ \| | | |____
* |_|\_|_| \_|_| \_____|
*
* @author yangzhenkun
*/



public class KRPC {

private static Map<String,ServiceParams> serviceCache = new HashMap<String,ServiceParams>();

/**
* 初始化客户端配置文件
*
* @param clientPath
* @throws Exception
*/
public static void init(String clientPath) throws Exception {

// 读取该服务的配置文件
SAXReader reader = new SAXReader();
Document document = reader.read(new File(clientPath));
Element root = document.getRootElement();

List<Element> serviceNodes = root.elements("Service");

for(Element serviceNode:serviceNodes){
ServiceParams serviceParams = new ServiceParams();

serviceParams.setServiceName(serviceNode.attributeValue("name"));

Element loadBalanceNode = serviceNode.element("Loadbalance");
Element serverNode = loadBalanceNode.element("Server");
serviceParams.setTimeout(Integer.parseInt(serverNode.attributeValue("timeout")));
List<Element> addrNodes = serverNode.elements("addr");

for(Element addrNode : addrNodes){
Address addr = new Address();
addr.setName(addrNode.attributeValue("name"));
addr.setHost(addrNode.attributeValue("host"));
addr.setPort(Integer.parseInt(addrNode.attributeValue("port")));

serviceParams.addAddress(addr);
}

serviceCache.put(serviceParams.getServiceName(), serviceParams);
}

}

/**
* 获取服务配置
* @param serviceName
* @return
*/
public static ServiceParams getService(String serviceName){

return serviceCache.get(serviceName);
}




}
/**
* 初始化客户端配置文件
*
* @param clientPath
* @throws Exception
*/
public static void init(String clientPath) throws Exception {

// 读取该服务的配置文件
SAXReader reader = new SAXReader();
Document document = reader.read(new File(clientPath));
Element root = document.getRootElement();

List<Element> serviceNodes = root.elements("Service");

ZookeeperInfo zookeeperInfo = ZookeeperInfo.createByElement(root);
if (zookeeperInfo != null) {
ZkRegisterCenter.init(zookeeperInfo);
}

/**
* 解析所有服务配置信息
*/
for (Element serviceNode : serviceNodes) {
String serverName = serviceNode.attributeValue("name");
ServiceParams serviceParams = new ServiceParams(serverName);
serviceParams.setServiceName(serverName);
serviceParams.setTimeout(Integer.valueOf(serviceNode.attributeValue("timeout")));

/**
* 如果配置了注册中心,直接获取
*/
if (zookeeperInfo != null) {
serviceParams.setAddresses(ZkRegisterCenter.getServerAddr(serverName));
}

/**
* 解析直连ip,如果使用注册中心会覆盖zk中的配置信息
*/
Element loadBalanceNode = serviceNode.element("Loadbalance");
if (loadBalanceNode != null) {

Element serverNode = loadBalanceNode.element("Server");
List<Element> addrNodes = serverNode.elements("addr");
List<Address> addresses = serviceParams.getAddresses();
if (addresses != null && addresses.size() > 0) {
addresses.clear();
} else {
addresses = new ArrayList<>();
}
for (Element addrNode : addrNodes) {
Address addr = new Address();
addr.setName(addrNode.attributeValue("name"));
addr.setHost(addrNode.attributeValue("host"));
addr.setPort(Integer.parseInt(addrNode.attributeValue("port")));
addresses.add(addr);
}

serviceParams.setAddresses(addresses);
}

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public class LoadBalance {
* @return
*/
public static Address loadbalanceRandom(String serviceName){
ServiceParams serviceParams = KRPC.getService(serviceName);
ServiceParams serviceParams = ServiceParams.getService(serviceName);
int total = serviceParams.getAddresses().size();
int index = (int) (System.currentTimeMillis()%total);

Expand All @@ -31,8 +31,9 @@ public static Address loadbalanceRandom(String serviceName){


public static Address loadbalanceUniformity(String serviceName) {
ServiceParams serviceParams = KRPC.getService(serviceName);
ServiceParams serviceParams = ServiceParams.getService(serviceName);
int total = serviceParams.getAddresses().size();
count.weakCompareAndSet(Integer.MAX_VALUE,0);
return serviceParams.getAddresses().get(count.getAndIncrement()%total);


Expand Down
Original file line number Diff line number Diff line change
@@ -1,67 +1,66 @@
package com.krpc.client.core;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.krpc.client.KRPC;
import com.krpc.client.entity.Address;
import com.krpc.client.entity.ServiceParams;
import com.krpc.client.net.TCPClient;
import com.krpc.common.entity.Request;
import com.krpc.common.serializer.HessianUtil;
import com.krpc.common.util.CompressUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

/**
* 选择服务,进行tcp请求
*
* @author yangzhenkun
*
* @author yangzhenkun
*/
public class RequestHandler {

private static Logger log = LoggerFactory.getLogger(RequestHandler.class);
private static Logger log = LoggerFactory.getLogger(RequestHandler.class);

private static Map<Address, TCPClient> tcpClientCache = new ConcurrentHashMap();

private static Object lockHelper = new Object();

private static Map<Address, TCPClient> tcpClientCache = new ConcurrentHashMap();
public static Object request(String serviceName, Request request, Class returnType) throws Exception {

private static Object lockHelper = new Object();
Address addr = LoadBalance.loadbalanceRandom(serviceName);

public static Object request(String serviceName, Request request, Class returnType) throws Exception {
byte[] requestBytes = CompressUtil.compress(HessianUtil.serialize(request));

Address addr = LoadBalance.loadbalanceRandom(serviceName);
byte[] requestBytes = CompressUtil.compress(HessianUtil.serialize(request));
TCPClient tcpClient = getTCPClient(addr, ServiceParams.getService(serviceName).getTimeout());

TCPClient tcpClient = getTCPClient(addr,KRPC.getService(serviceName).getTimeout());
log.debug("客户端发送数据:{}", requestBytes.length);
Integer sessionID = tcpClient.sendMsg(requestBytes);
if (Objects.isNull(sessionID)) {
throw new Exception("send data error!");
}

log.debug("客户端发送数据:{}" , requestBytes.length);
Integer sessionID = tcpClient.sendMsg(requestBytes);
if(Objects.isNull(sessionID)) {
throw new Exception("send data error!");
}

byte[] responseBytessrc = tcpClient.getData(sessionID);
return HessianUtil.deserialize( CompressUtil.uncompress(responseBytessrc), null);
}
byte[] responseBytessrc = tcpClient.getData(sessionID);
return HessianUtil.deserialize(CompressUtil.uncompress(responseBytessrc), null);
}

private static TCPClient getTCPClient(Address address,Integer timeout) throws IOException {
TCPClient tcpClient= tcpClientCache.get(address);
if (Objects.isNull(tcpClient)) {
private static TCPClient getTCPClient(Address address, Integer timeout) throws IOException {
TCPClient tcpClient = tcpClientCache.get(address);
if (Objects.isNull(tcpClient)) {

synchronized (lockHelper) {
tcpClient = tcpClientCache.get(address);
if (Objects.isNull(tcpClient)) {
tcpClient = new TCPClient(address.getHost(), address.getPort(),timeout);
tcpClientCache.put(address, tcpClient);
}
synchronized (lockHelper) {
tcpClient = tcpClientCache.get(address);
if (Objects.isNull(tcpClient)) {
tcpClient = new TCPClient(address.getHost(), address.getPort(), timeout);
tcpClientCache.put(address, tcpClient);
}

}
}

}
}

return tcpClient;
}
return tcpClient;
}

}
Loading

0 comments on commit 4c4cae3

Please sign in to comment.