From a44ad50e7459fbe67b2873b2b779eccc33088d7d Mon Sep 17 00:00:00 2001 From: yangzhenkun <1334036616@qq.com> Date: Mon, 26 Nov 2018 16:44:28 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=B3=A8=E5=86=8C=E4=B8=AD?= =?UTF-8?q?=E5=BF=83zk?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 24 +- com.krpc.client/pom.xml | 4 +- .../src/main/java/com/krpc/client/KRPC.java | 155 ++++++------ .../com/krpc/client/core/LoadBalance.java | 5 +- .../com/krpc/client/core/RequestHandler.java | 79 +++--- .../krpc/client/core/ZkRegisterCenter.java | 105 ++++++++ .../java/com/krpc/client/entity/Address.java | 4 +- .../com/krpc/client/entity/ServiceParams.java | 82 ++++--- com.krpc.common/pom.xml | 8 +- .../com/krpc/common/entity/ZookeeperInfo.java | 72 ++++++ com.krpc.server/pom.xml | 10 +- .../main/java/com/krpc/server/BootStrap.java | 10 + .../com/krpc/server/core/LoadConfigure.java | 225 +++++++++--------- .../java/com/krpc/server/entity/Global.java | 16 ++ .../server/register/ZkRegisterCenter.java | 51 ++++ .../java/com/krpc/server/BootStrapTest.java | 22 +- .../java/com/krpc/server/core/ZKTest.java | 132 ++++++++++ .../java/com/krpc/server/util/Dom4JTest.java | 32 ++- demo/com.a123.call/com.a123.call.iml | 24 -- demo/com.a123.call/pom.xml | 2 +- .../main/java/com/a123/com/a123/call/App.java | 7 +- demo/com.a123.call/src/resources/client.xml | 27 ++- .../com.a123.service.user.impl.iml | 16 -- .../com.a123.service.user.iml | 15 -- demo/config_file_template/client/client.xml | 20 +- demo/config_file_template/server/service.xml | 8 +- 26 files changed, 766 insertions(+), 389 deletions(-) create mode 100644 com.krpc.client/src/main/java/com/krpc/client/core/ZkRegisterCenter.java create mode 100644 com.krpc.common/src/main/java/com/krpc/common/entity/ZookeeperInfo.java create mode 100644 com.krpc.server/src/main/java/com/krpc/server/register/ZkRegisterCenter.java create mode 100644 com.krpc.server/src/test/java/com/krpc/server/core/ZKTest.java delete mode 100644 demo/com.a123.call/com.a123.call.iml delete mode 100644 demo/com.a123.service.user.impl/com.a123.service.user.impl.iml delete mode 100644 demo/com.a123.service.user/com.a123.service.user.iml diff --git a/README.md b/README.md index 7b7ba49..8a799ae 100644 --- a/README.md +++ b/README.md @@ -7,13 +7,13 @@ ### 如何使用 -编译好的环境 [release](https://github.com/yangzhenkun/krpc/releases/tag/1.0) +编译好的服务端环境 [release](https://github.com/yangzhenkun/krpc/releases/tag/1.0) #### 1.服务端 解压后server文件夹中就是服务端环境,如demo所示,server/service中有一个user文件,就是我们部署的user服务,下面有两个必须的文件夹conf(配置文件) log4j.xml是该服务日志的标准的log4j配置文件,如果想修改日志路径 -```java +```xml @@ -26,14 +26,17 @@ log4j.xml是该服务日志的标准的log4j配置文件,如果想修改日志 ``` -修改值即可 +修改值即可 server.xml文件为服务的配置文件 ```xml - + + + 127.0.0.1:2181,127.0.0.1:3333 + @@ -60,8 +63,8 @@ server.xml文件为服务的配置文件 **启动** 启动在server/bin里面,执行 -```java -java -jar com.krpc.server-0.0.1.jar 服务名 +``` + java -jar com.krpc.server-0.0.1.jar 服务名 ``` 命令,查看日志,如果看到 启动成功,监听端口*** 的日志,恭喜你,服务端启动成功。 @@ -77,8 +80,13 @@ krpc提供了服务端镜像,所以每个服务都可以在krpc提供的docker 使用需要先调用KRPC.init("client配置文件")进行初始化 配置在client/client.xml中 -```java - +```xml + + + + 127.0.0.1:2181,127.0.0.1:3333 + + diff --git a/com.krpc.client/pom.xml b/com.krpc.client/pom.xml index 22ecda8..0bbca4e 100644 --- a/com.krpc.client/pom.xml +++ b/com.krpc.client/pom.xml @@ -4,7 +4,7 @@ com.krpc com.krpc.client - 0.0.1 + 0.0.2 jar com.krpc.client @@ -25,7 +25,7 @@ com.krpc com.krpc.common - 0.0.1 + 0.0.2 junit diff --git a/com.krpc.client/src/main/java/com/krpc/client/KRPC.java b/com.krpc.client/src/main/java/com/krpc/client/KRPC.java index 213873c..1454dd3 100644 --- a/com.krpc.client/src/main/java/com/krpc/client/KRPC.java +++ b/com.krpc.client/src/main/java/com/krpc/client/KRPC.java @@ -1,87 +1,94 @@ package com.krpc.client; -import java.io.File; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - +import com.krpc.client.core.ZkRegisterCenter; +import com.krpc.client.entity.Address; +import com.krpc.client.entity.ServiceParams; +import com.krpc.common.entity.ZookeeperInfo; import org.dom4j.Document; import org.dom4j.Element; import org.dom4j.io.SAXReader; -import com.krpc.client.entity.Address; -import com.krpc.client.entity.ServiceParams; +import java.io.File; +import java.util.ArrayList; +import java.util.List; /** - - _ _______ _____ _____ - | |/ | __ \| __ \ / ____| - | ' /| |__) | |__) | | - | < | _ /| ___/| | - | . \| | \ \| | | |____ - |_|\_|_| \_|_| \_____| - - - @author yangzhenkun - + * _ _______ _____ _____ + * | |/ | __ \| __ \ / ____| + * | ' /| |__) | |__) | | + * | < | _ /| ___/| | + * | . \| | \ \| | | |____ + * |_|\_|_| \_|_| \_____| + * + * @author yangzhenkun */ - public class KRPC { - private static Map serviceCache = new HashMap(); - - /** - * 初始化客户端配置文件 - * - * @param clientPath - * @throws Exception - */ - public static void init(String clientPath) throws Exception { - - // 读取该服务的配置文件 - SAXReader reader = new SAXReader(); - Document document = reader.read(new File(clientPath)); - Element root = document.getRootElement(); - - List serviceNodes = root.elements("Service"); - - for(Element serviceNode:serviceNodes){ - ServiceParams serviceParams = new ServiceParams(); - - serviceParams.setServiceName(serviceNode.attributeValue("name")); - - Element loadBalanceNode = serviceNode.element("Loadbalance"); - Element serverNode = loadBalanceNode.element("Server"); - serviceParams.setTimeout(Integer.parseInt(serverNode.attributeValue("timeout"))); - List addrNodes = serverNode.elements("addr"); - - for(Element addrNode : addrNodes){ - Address addr = new Address(); - addr.setName(addrNode.attributeValue("name")); - addr.setHost(addrNode.attributeValue("host")); - addr.setPort(Integer.parseInt(addrNode.attributeValue("port"))); - - serviceParams.addAddress(addr); - } - - serviceCache.put(serviceParams.getServiceName(), serviceParams); - } - - } - - /** - * 获取服务配置 - * @param serviceName - * @return - */ - public static ServiceParams getService(String serviceName){ - - return serviceCache.get(serviceName); - } - - - - -} + /** + * 初始化客户端配置文件 + * + * @param clientPath + * @throws Exception + */ + public static void init(String clientPath) throws Exception { + + // 读取该服务的配置文件 + SAXReader reader = new SAXReader(); + Document document = reader.read(new File(clientPath)); + Element root = document.getRootElement(); + + List serviceNodes = root.elements("Service"); + + ZookeeperInfo zookeeperInfo = ZookeeperInfo.createByElement(root); + if (zookeeperInfo != null) { + ZkRegisterCenter.init(zookeeperInfo); + } + + /** + * 解析所有服务配置信息 + */ + for (Element serviceNode : serviceNodes) { + String serverName = serviceNode.attributeValue("name"); + ServiceParams serviceParams = new ServiceParams(serverName); + serviceParams.setServiceName(serverName); + serviceParams.setTimeout(Integer.valueOf(serviceNode.attributeValue("timeout"))); + + /** + * 如果配置了注册中心,直接获取 + */ + if (zookeeperInfo != null) { + serviceParams.setAddresses(ZkRegisterCenter.getServerAddr(serverName)); + } + + /** + * 解析直连ip,如果使用注册中心会覆盖zk中的配置信息 + */ + Element loadBalanceNode = serviceNode.element("Loadbalance"); + if (loadBalanceNode != null) { + + Element serverNode = loadBalanceNode.element("Server"); + List addrNodes = serverNode.elements("addr"); + List
addresses = serviceParams.getAddresses(); + if (addresses != null && addresses.size() > 0) { + addresses.clear(); + } else { + addresses = new ArrayList<>(); + } + for (Element addrNode : addrNodes) { + Address addr = new Address(); + addr.setName(addrNode.attributeValue("name")); + addr.setHost(addrNode.attributeValue("host")); + addr.setPort(Integer.parseInt(addrNode.attributeValue("port"))); + addresses.add(addr); + } + + serviceParams.setAddresses(addresses); + } + + } + + } + +} \ No newline at end of file diff --git a/com.krpc.client/src/main/java/com/krpc/client/core/LoadBalance.java b/com.krpc.client/src/main/java/com/krpc/client/core/LoadBalance.java index be68077..58c07e2 100644 --- a/com.krpc.client/src/main/java/com/krpc/client/core/LoadBalance.java +++ b/com.krpc.client/src/main/java/com/krpc/client/core/LoadBalance.java @@ -20,7 +20,7 @@ public class LoadBalance { * @return */ public static Address loadbalanceRandom(String serviceName){ - ServiceParams serviceParams = KRPC.getService(serviceName); + ServiceParams serviceParams = ServiceParams.getService(serviceName); int total = serviceParams.getAddresses().size(); int index = (int) (System.currentTimeMillis()%total); @@ -31,8 +31,9 @@ public static Address loadbalanceRandom(String serviceName){ public static Address loadbalanceUniformity(String serviceName) { - ServiceParams serviceParams = KRPC.getService(serviceName); + ServiceParams serviceParams = ServiceParams.getService(serviceName); int total = serviceParams.getAddresses().size(); + count.weakCompareAndSet(Integer.MAX_VALUE,0); return serviceParams.getAddresses().get(count.getAndIncrement()%total); diff --git a/com.krpc.client/src/main/java/com/krpc/client/core/RequestHandler.java b/com.krpc.client/src/main/java/com/krpc/client/core/RequestHandler.java index 541a5dd..b1bb598 100644 --- a/com.krpc.client/src/main/java/com/krpc/client/core/RequestHandler.java +++ b/com.krpc.client/src/main/java/com/krpc/client/core/RequestHandler.java @@ -1,67 +1,66 @@ package com.krpc.client.core; -import java.io.IOException; -import java.util.Map; -import java.util.Objects; -import java.util.concurrent.ConcurrentHashMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.krpc.client.KRPC; import com.krpc.client.entity.Address; +import com.krpc.client.entity.ServiceParams; import com.krpc.client.net.TCPClient; import com.krpc.common.entity.Request; import com.krpc.common.serializer.HessianUtil; import com.krpc.common.util.CompressUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; /** * 选择服务,进行tcp请求 - * - * @author yangzhenkun * + * @author yangzhenkun */ public class RequestHandler { - private static Logger log = LoggerFactory.getLogger(RequestHandler.class); + private static Logger log = LoggerFactory.getLogger(RequestHandler.class); + + private static Map tcpClientCache = new ConcurrentHashMap(); + + private static Object lockHelper = new Object(); - private static Map tcpClientCache = new ConcurrentHashMap(); + public static Object request(String serviceName, Request request, Class returnType) throws Exception { - private static Object lockHelper = new Object(); + Address addr = LoadBalance.loadbalanceRandom(serviceName); - public static Object request(String serviceName, Request request, Class returnType) throws Exception { + byte[] requestBytes = CompressUtil.compress(HessianUtil.serialize(request)); - Address addr = LoadBalance.loadbalanceRandom(serviceName); - byte[] requestBytes = CompressUtil.compress(HessianUtil.serialize(request)); + TCPClient tcpClient = getTCPClient(addr, ServiceParams.getService(serviceName).getTimeout()); - TCPClient tcpClient = getTCPClient(addr,KRPC.getService(serviceName).getTimeout()); + log.debug("客户端发送数据:{}", requestBytes.length); + Integer sessionID = tcpClient.sendMsg(requestBytes); + if (Objects.isNull(sessionID)) { + throw new Exception("send data error!"); + } - log.debug("客户端发送数据:{}" , requestBytes.length); - Integer sessionID = tcpClient.sendMsg(requestBytes); - if(Objects.isNull(sessionID)) { - throw new Exception("send data error!"); - } - - byte[] responseBytessrc = tcpClient.getData(sessionID); - return HessianUtil.deserialize( CompressUtil.uncompress(responseBytessrc), null); - } + byte[] responseBytessrc = tcpClient.getData(sessionID); + return HessianUtil.deserialize(CompressUtil.uncompress(responseBytessrc), null); + } - private static TCPClient getTCPClient(Address address,Integer timeout) throws IOException { - TCPClient tcpClient= tcpClientCache.get(address); - if (Objects.isNull(tcpClient)) { + private static TCPClient getTCPClient(Address address, Integer timeout) throws IOException { + TCPClient tcpClient = tcpClientCache.get(address); + if (Objects.isNull(tcpClient)) { - synchronized (lockHelper) { - tcpClient = tcpClientCache.get(address); - if (Objects.isNull(tcpClient)) { - tcpClient = new TCPClient(address.getHost(), address.getPort(),timeout); - tcpClientCache.put(address, tcpClient); - } + synchronized (lockHelper) { + tcpClient = tcpClientCache.get(address); + if (Objects.isNull(tcpClient)) { + tcpClient = new TCPClient(address.getHost(), address.getPort(), timeout); + tcpClientCache.put(address, tcpClient); + } - } + } - } + } - return tcpClient; - } + return tcpClient; + } } diff --git a/com.krpc.client/src/main/java/com/krpc/client/core/ZkRegisterCenter.java b/com.krpc.client/src/main/java/com/krpc/client/core/ZkRegisterCenter.java new file mode 100644 index 0000000..747d5fa --- /dev/null +++ b/com.krpc.client/src/main/java/com/krpc/client/core/ZkRegisterCenter.java @@ -0,0 +1,105 @@ +package com.krpc.client.core; + +import com.krpc.client.entity.Address; +import com.krpc.client.entity.ServiceParams; +import com.krpc.common.entity.ZookeeperInfo; +import org.I0Itec.zkclient.IZkChildListener; +import org.I0Itec.zkclient.ZkClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +/** + * @author yangzhenkun + * @create 2018-11-26 14:12 + */ +public class ZkRegisterCenter { + + private static Logger log = LoggerFactory.getLogger(ZkRegisterCenter.class); + private static ZkClient zc = null; + + public static void init(ZookeeperInfo zookeeperInfo) { + if (zc != null) { + return; + } + + try { + zc = new ZkClient(zookeeperInfo.getAddr(), zookeeperInfo.getSessionTimeOut(), zookeeperInfo.getConnectionTimeOut()); + } catch (Exception e) { + e.printStackTrace(); + log.error("zk init error!", e); + } + } + + public static List
getServerAddr(String serverName) { + + + List
addresses = new ArrayList<>(); + try { + StringBuffer path = new StringBuffer("/krpc/"); + path.append(serverName); + + List serverlist = zc.getChildren(path.toString()); + + if (serverlist != null && serverlist.size() > 0) { + for (String ipport : serverlist) { + String[] content = ipport.split(":"); + Address address = new Address(); + address.setHost(content[0]); + address.setPort(Integer.valueOf(content[1])); + addresses.add(address); + } + + } + + subscribe(serverName); + } catch (Exception e) { + log.error("get server config from zk error!", e); + } finally { + + return addresses; + } + } + + /** + * 订阅 + * + * @param serverName + */ + public static void subscribe(String serverName) { + + ServiceParams serviceParams = ServiceParams.getService(serverName); + StringBuffer path = new StringBuffer("/krpc/"); + path.append(serverName); + zc.subscribeChildChanges(path.toString(), new IZkChildListener() { + @Override + public void handleChildChange(String s, List list) throws Exception { + + System.out.println("server change===" + list); + if (list != null && list.size() > 0) { + List
newAddr = new ArrayList<>(list.size()); + for (String ipport : list) { + String[] content = ipport.split(":"); + Address address = new Address(); + address.setHost(content[0]); + address.setPort(Integer.valueOf(content[1])); + + newAddr.add(address); + } + + serviceParams.setAddresses(newAddr); + log.info("{} server change,content={}", serverName, list.toString()); + } else { + serviceParams.getAddresses().clear(); + log.info("{} server change,no able server!", serverName); + } + + } + }); + + } + + +} diff --git a/com.krpc.client/src/main/java/com/krpc/client/entity/Address.java b/com.krpc.client/src/main/java/com/krpc/client/entity/Address.java index d2ebf47..6a678a8 100644 --- a/com.krpc.client/src/main/java/com/krpc/client/entity/Address.java +++ b/com.krpc.client/src/main/java/com/krpc/client/entity/Address.java @@ -30,6 +30,6 @@ public void setHost(String host) { public void setPort(Integer port) { this.port = port; } - - + + } diff --git a/com.krpc.client/src/main/java/com/krpc/client/entity/ServiceParams.java b/com.krpc.client/src/main/java/com/krpc/client/entity/ServiceParams.java index 92e7b65..4939f94 100644 --- a/com.krpc.client/src/main/java/com/krpc/client/entity/ServiceParams.java +++ b/com.krpc.client/src/main/java/com/krpc/client/entity/ServiceParams.java @@ -1,50 +1,56 @@ package com.krpc.client.entity; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * 服务参数 - * @author yangzhenkun * + * @author yangzhenkun */ public class ServiceParams { - private int timeout; - private List
addresses; - private String serviceName; - - public List
getAddresses() { - return addresses; - } - public String getServiceName() { - return serviceName; - } - public void setAddresses(List
addresses) { - this.addresses = addresses; - } - public void setServiceName(String serviceName) { - this.serviceName = serviceName; - } - - public void addAddress(Address address){ - if(addresses==null){ - addresses = new ArrayList
(); - } - addresses.add(address); - } - - public void removeAddress(int index){ - if(addresses!=null){ - addresses.remove(index); - } - - } - public int getTimeout() { - return timeout; - } - public void setTimeout(int timeout) { - this.timeout = timeout; - } - + private static Map serviceCache = new HashMap(); + + private int timeout; + private List
addresses; + private String serviceName; + + public ServiceParams(String serverName){ + this.serviceName=serverName; + } + + public List
getAddresses() { + return addresses; + } + + public String getServiceName() { + return serviceName; + } + + public void setAddresses(List
addresses) { + this.addresses = addresses; + serviceCache.put(serviceName,this); + } + + public void setServiceName(String serviceName) { + this.serviceName = serviceName; + serviceCache.put(serviceName,this); + } + + + public int getTimeout() { + return timeout; + } + + public void setTimeout(int timeout) { + this.timeout = timeout; + serviceCache.put(serviceName,this); + } + + public static ServiceParams getService(String serviceName){ + return serviceCache.get(serviceName); + } } diff --git a/com.krpc.common/pom.xml b/com.krpc.common/pom.xml index daa44b1..7f0a898 100644 --- a/com.krpc.common/pom.xml +++ b/com.krpc.common/pom.xml @@ -4,7 +4,7 @@ com.krpc com.krpc.common - 0.0.1 + 0.0.2 jar com.krpc.common @@ -51,6 +51,12 @@ 3.8.1 test + + com.101tec + zkclient + 0.11 + + diff --git a/com.krpc.common/src/main/java/com/krpc/common/entity/ZookeeperInfo.java b/com.krpc.common/src/main/java/com/krpc/common/entity/ZookeeperInfo.java new file mode 100644 index 0000000..6ba5e35 --- /dev/null +++ b/com.krpc.common/src/main/java/com/krpc/common/entity/ZookeeperInfo.java @@ -0,0 +1,72 @@ +package com.krpc.common.entity; + +import org.dom4j.Element; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author yangzhenkun + * @create 2018-11-26 11:37 + */ +public class ZookeeperInfo { + + private static Logger log = LoggerFactory.getLogger(ZookeeperInfo.class); + + public static ZookeeperInfo createByElement(Element root) { + /** + * 初始化注册中心数据 + */ + ZookeeperInfo zookeeperInfo = null; + try { + Element zkNode = root.element("zk"); + if (zkNode != null) { + String sessionTimeStr = zkNode.attributeValue("sessionTimeOut"); + String connectionTimeOutStr = zkNode.attributeValue("connectionTimeOut"); + Element addrNode = zkNode.element("addr"); + String addr = addrNode.getData().toString(); + + zookeeperInfo = new ZookeeperInfo(addr, Integer.valueOf(sessionTimeStr), Integer.valueOf(connectionTimeOutStr)); + } + } catch (Exception e) { + log.error("get zk info from server.xml error!",e); + } finally { + return zookeeperInfo; + } + } + + public ZookeeperInfo(String addr, int sessionTimeOut, int connectionTimeOut) { + this.addr = addr; + this.sessionTimeOut = sessionTimeOut; + this.connectionTimeOut = connectionTimeOut; + } + + private String addr; + + private int sessionTimeOut; + + private int connectionTimeOut; + + public String getAddr() { + return addr; + } + + public void setAddr(String addr) { + this.addr = addr; + } + + public int getSessionTimeOut() { + return sessionTimeOut; + } + + public void setSessionTimeOut(int sessionTimeOut) { + this.sessionTimeOut = sessionTimeOut; + } + + public int getConnectionTimeOut() { + return connectionTimeOut; + } + + public void setConnectionTimeOut(int connectionTimeOut) { + this.connectionTimeOut = connectionTimeOut; + } +} diff --git a/com.krpc.server/pom.xml b/com.krpc.server/pom.xml index 4f8001e..e78a525 100644 --- a/com.krpc.server/pom.xml +++ b/com.krpc.server/pom.xml @@ -4,7 +4,7 @@ com.krpc com.krpc.server - 0.0.1 + 0.0.2 jar com.krpc.server @@ -19,7 +19,7 @@ com.krpc com.krpc.common - 0.0.1 + 0.0.2 @@ -35,14 +35,14 @@ slf4j-log4j12 1.7.25 - - junit junit - 3.8.1 + 4.12 test + + diff --git a/com.krpc.server/src/main/java/com/krpc/server/BootStrap.java b/com.krpc.server/src/main/java/com/krpc/server/BootStrap.java index 04ff0a6..b9e72ec 100644 --- a/com.krpc.server/src/main/java/com/krpc/server/BootStrap.java +++ b/com.krpc.server/src/main/java/com/krpc/server/BootStrap.java @@ -4,6 +4,7 @@ import com.krpc.server.core.LoadConfigure; import com.krpc.server.entity.Global; import com.krpc.server.netty.ServerHandler; +import com.krpc.server.register.ZkRegisterCenter; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; @@ -73,9 +74,18 @@ protected void initChannel(SocketChannel ch) throws Exception { ChannelFuture f = bootstrap.bind(Global.getInstance().getPort()).sync(); + /** + * 使用注册中心 + */ + if (Global.getInstance().getZookeeperInfo() != null) { + ZkRegisterCenter.register(); + } + + log.info("启动成功,监听端口:" + Global.getInstance().getPort()); f.channel().closeFuture().sync(); + } else { System.out.println("请输入启动的服务名字"); } diff --git a/com.krpc.server/src/main/java/com/krpc/server/core/LoadConfigure.java b/com.krpc.server/src/main/java/com/krpc/server/core/LoadConfigure.java index 2c44015..f4ad23b 100644 --- a/com.krpc.server/src/main/java/com/krpc/server/core/LoadConfigure.java +++ b/com.krpc.server/src/main/java/com/krpc/server/core/LoadConfigure.java @@ -1,5 +1,15 @@ package com.krpc.server.core; +import com.krpc.server.entity.Global; +import com.krpc.common.entity.ZookeeperInfo; +import org.dom4j.Document; +import org.dom4j.Element; +import org.dom4j.io.OutputFormat; +import org.dom4j.io.SAXReader; +import org.dom4j.io.XMLWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.FileOutputStream; import java.io.FilenameFilter; @@ -13,120 +23,115 @@ import java.util.Map; import java.util.Map.Entry; -import org.dom4j.Document; -import org.dom4j.Element; -import org.dom4j.io.OutputFormat; -import org.dom4j.io.SAXReader; -import org.dom4j.io.XMLWriter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.krpc.server.entity.Global; - /** * 加载配置文件,初始化相关配置 - * - * @author yangzhenkun * + * @author yangzhenkun */ public class LoadConfigure { - - static Logger log = LoggerFactory.getLogger(LoadConfigure.class); - - /** - * 加载该服务下的配置文件,并初始化相关内容 - * @param serviceRootPath - * @throws Exception - */ - public static void load(String serviceRootPath) throws Exception { - - String serviceLib = serviceRootPath + File.separator + "lib"; - String serviceConf = serviceRootPath + File.separator + "conf"; - - // 读取该服务的配置文件 - SAXReader reader = new SAXReader(); - Document document = reader.read(new File(serviceConf + File.separator + "service.xml")); - document.setXMLEncoding("UTF-8"); - Element node = document.getRootElement(); - - Element proNode = node.element("property"); - - Element connectionNode = proNode.element("connection"); - Element nettyNode = proNode.element("netty"); - - Global.getInstance().setMaxBuf(Integer.parseInt(nettyNode.attributeValue("maxBuf"))); - - Global.getInstance().setIp(connectionNode.attributeValue("ip")); - - if(Global.getInstance().getPort()==null) { - Global.getInstance().setPort(Integer.parseInt(connectionNode.attributeValue("port"))); - }else { - connectionNode.setAttributeValue("port", String.valueOf(Global.getInstance().getPort())); - FileOutputStream fos = new FileOutputStream(serviceConf + File.separator + "service.xml"); - OutputStreamWriter osw = new OutputStreamWriter(fos, "UTF-8"); - OutputFormat of = new OutputFormat(); - of.setEncoding("UTF-8"); - XMLWriter write =new XMLWriter(osw,of); - write.write(document); - write.close(); - } - - - Global.getInstance().setTimeout(Integer.parseInt(connectionNode.attributeValue("timeout"))); - - Map serviceMap = new HashMap(); - Element servicesNode = node.element("services"); - - List serviceList = servicesNode.elements("service"); - for (Element e : serviceList) { - serviceMap.put(e.attributeValue("name"), e.attributeValue("impl")); - } - - initService(serviceMap, serviceLib); - - } - - /** - * 加载该服务所有的jar,并实例化jar中所有服务的实现 - * - * @param services - * @throws MalformedURLException - */ - private static void initService(Map services, String serviceLibPath) throws Exception { - - File serviceLibDir = new File(serviceLibPath); - - File[] jarFiles = serviceLibDir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File dir, String name) { - return name.endsWith(".jar"); - } - }); - - URL[] jarURLS = new URL[jarFiles.length]; - for (int i = 0; i < jarFiles.length; i++) { - log.info("加载的类有:"+jarFiles[i].getName()); - jarURLS[i] = jarFiles[i].toURI().toURL(); - } - URLClassLoader classLoader = new URLClassLoader(jarURLS, ClassLoader.getSystemClassLoader()); - - - /** - * 懒加载模式,在启动服务时,初始化所有实现类 - */ - Map instances = new HashMap(); - Map types = new HashMap(); - Iterator> it = services.entrySet().iterator(); - while(it.hasNext()){ - Entry e = it.next(); - Class clazz = classLoader.loadClass(e.getValue()); - instances.put(e.getKey(), clazz.newInstance()); - types.put(e.getKey(), clazz); - } - - Global.getInstance().setClassLoader(classLoader); - Global.getInstance().setServiceImpl(instances); - Global.getInstance().setServiceClass(types); - } + + static Logger log = LoggerFactory.getLogger(LoadConfigure.class); + + /** + * 加载该服务下的配置文件,并初始化相关内容 + * + * @param serviceRootPath + * @throws Exception + */ + public static void load(String serviceRootPath) throws Exception { + + String serviceLib = serviceRootPath + File.separator + "lib"; + String serviceConf = serviceRootPath + File.separator + "conf"; + + // 读取该服务的配置文件 + SAXReader reader = new SAXReader(); + Document document = reader.read(new File(serviceConf + File.separator + "service.xml")); + document.setXMLEncoding("UTF-8"); + Element node = document.getRootElement(); + + Element proNode = node.element("property"); + + Element connectionNode = proNode.element("connection"); + Element nettyNode = proNode.element("netty"); + + Global.getInstance().setMaxBuf(Integer.parseInt(nettyNode.attributeValue("maxBuf"))); + + Global.getInstance().setIp(connectionNode.attributeValue("ip")); + + if (Global.getInstance().getPort() == null) { + Global.getInstance().setPort(Integer.parseInt(connectionNode.attributeValue("port"))); + } else { + connectionNode.setAttributeValue("port", String.valueOf(Global.getInstance().getPort())); + FileOutputStream fos = new FileOutputStream(serviceConf + File.separator + "service.xml"); + OutputStreamWriter osw = new OutputStreamWriter(fos, "UTF-8"); + OutputFormat of = new OutputFormat(); + of.setEncoding("UTF-8"); + XMLWriter write = new XMLWriter(osw, of); + write.write(document); + write.close(); + } + + + Global.getInstance().setTimeout(Integer.parseInt(connectionNode.attributeValue("timeout"))); + + Map serviceMap = new HashMap(); + Element servicesNode = node.element("services"); + + List serviceList = servicesNode.elements("service"); + for (Element e : serviceList) { + serviceMap.put(e.attributeValue("name"), e.attributeValue("impl")); + } + + initService(serviceMap, serviceLib); + + /** + * 从配置文件中获取注册中心zk的信息 + */ + Global.getInstance().setZookeeperInfo(ZookeeperInfo.createByElement(node)); + + } + + /** + * 加载该服务所有的jar,并实例化jar中所有服务的实现 + * + * @param services + * @throws MalformedURLException + */ + private static void initService(Map services, String serviceLibPath) throws Exception { + + File serviceLibDir = new File(serviceLibPath); + + File[] jarFiles = serviceLibDir.listFiles(new FilenameFilter() { + @Override + public boolean accept(File dir, String name) { + return name.endsWith(".jar"); + } + }); + + URL[] jarURLS = new URL[jarFiles.length]; + for (int i = 0; i < jarFiles.length; i++) { + log.info("加载的类有:" + jarFiles[i].getName()); + jarURLS[i] = jarFiles[i].toURI().toURL(); + } + URLClassLoader classLoader = new URLClassLoader(jarURLS, ClassLoader.getSystemClassLoader()); + + + /** + * 懒加载模式,在启动服务时,初始化所有实现类 + */ + Map instances = new HashMap(); + Map types = new HashMap(); + Iterator> it = services.entrySet().iterator(); + while (it.hasNext()) { + Entry e = it.next(); + Class clazz = classLoader.loadClass(e.getValue()); + instances.put(e.getKey(), clazz.newInstance()); + types.put(e.getKey(), clazz); + } + + Global.getInstance().setClassLoader(classLoader); + Global.getInstance().setServiceImpl(instances); + Global.getInstance().setServiceClass(types); + } } diff --git a/com.krpc.server/src/main/java/com/krpc/server/entity/Global.java b/com.krpc.server/src/main/java/com/krpc/server/entity/Global.java index 0a91167..aee593a 100644 --- a/com.krpc.server/src/main/java/com/krpc/server/entity/Global.java +++ b/com.krpc.server/src/main/java/com/krpc/server/entity/Global.java @@ -1,5 +1,7 @@ package com.krpc.server.entity; +import com.krpc.common.entity.ZookeeperInfo; + import java.lang.reflect.Method; import java.util.List; import java.util.Map; @@ -48,6 +50,12 @@ public static Global getInstance(){ private Map methodCache; private ClassLoader classLoader; + + /** + * 用于注册中心的zookeeper的信息 + */ + private ZookeeperInfo zookeeperInfo; + public String getServiceName() { return serviceName; @@ -139,4 +147,12 @@ private String buildKey(String serviceName,String methodName,List params return methodKey.toString(); } + + public ZookeeperInfo getZookeeperInfo() { + return zookeeperInfo; + } + + public void setZookeeperInfo(ZookeeperInfo zookeeperInfo) { + this.zookeeperInfo = zookeeperInfo; + } } diff --git a/com.krpc.server/src/main/java/com/krpc/server/register/ZkRegisterCenter.java b/com.krpc.server/src/main/java/com/krpc/server/register/ZkRegisterCenter.java new file mode 100644 index 0000000..20473a3 --- /dev/null +++ b/com.krpc.server/src/main/java/com/krpc/server/register/ZkRegisterCenter.java @@ -0,0 +1,51 @@ +package com.krpc.server.register; + +import com.krpc.server.entity.Global; +import com.krpc.common.entity.ZookeeperInfo; +import org.I0Itec.zkclient.ZkClient; +import org.apache.zookeeper.CreateMode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * 注册中心处理 + * + * @author yangzhenkun + * @create 2018-11-24 17:53 + */ +public class ZkRegisterCenter { + + private static Logger log = LoggerFactory.getLogger(ZkRegisterCenter.class); + + private static ZkClient zc; + + public static void register() { + try { + + ZookeeperInfo zookeeperInfo = Global.getInstance().getZookeeperInfo(); + zc = new ZkClient(zookeeperInfo.getAddr(), zookeeperInfo.getSessionTimeOut(), zookeeperInfo.getConnectionTimeOut()); + + StringBuffer stringBuffer = new StringBuffer("/krpc"); + + if (!zc.exists(stringBuffer.toString())) { + zc.create(stringBuffer.toString(), "", CreateMode.PERSISTENT); + + log.info("创建根节点krpc"); + } + stringBuffer.append("/").append(Global.getInstance().getServiceName()); + + if (!zc.exists(stringBuffer.toString())) { + zc.create(stringBuffer.toString(), "", CreateMode.PERSISTENT); + log.info("创建{}服务节点", Global.getInstance().getServiceName()); + } + + stringBuffer.append("/").append(Global.getInstance().getIp()).append(":").append(Global.getInstance().getPort()); + + zc.create(stringBuffer.toString(), Global.getInstance().getServiceName(), CreateMode.EPHEMERAL); + } catch (Exception e) { + log.error("register error!", e); + } + } + + +} diff --git a/com.krpc.server/src/test/java/com/krpc/server/BootStrapTest.java b/com.krpc.server/src/test/java/com/krpc/server/BootStrapTest.java index a471d6e..9cb2ad3 100644 --- a/com.krpc.server/src/test/java/com/krpc/server/BootStrapTest.java +++ b/com.krpc.server/src/test/java/com/krpc/server/BootStrapTest.java @@ -1,18 +1,18 @@ package com.krpc.server; -import com.krpc.common.serializer.SerializeUtil; -import com.krpc.server.entity.User1; +import java.util.ArrayList; +import java.util.List; public class BootStrapTest { - public static void main(String[] args){ - - - byte[] bytes = SerializeUtil.serialize(new User1(1,"yasin","18888888888")); - - - - } - + public static void main(String[] args) { + + List list = new ArrayList<>(); + list.add("1"); + + System.out.println(list.toString()); + + } + } diff --git a/com.krpc.server/src/test/java/com/krpc/server/core/ZKTest.java b/com.krpc.server/src/test/java/com/krpc/server/core/ZKTest.java new file mode 100644 index 0000000..38c500e --- /dev/null +++ b/com.krpc.server/src/test/java/com/krpc/server/core/ZKTest.java @@ -0,0 +1,132 @@ +package com.krpc.server.core; + +import org.I0Itec.zkclient.IZkChildListener; +import org.I0Itec.zkclient.ZkClient; +import org.I0Itec.zkclient.exception.ZkMarshallingError; +import org.I0Itec.zkclient.serialize.ZkSerializer; +import org.apache.zookeeper.CreateMode; +import org.junit.Before; +import org.junit.Test; + +import java.io.UnsupportedEncodingException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * @author yangzhenkun + * @create 2018-11-24 11:30 + */ +public class ZKTest { + + private String addr = ""; + + private int outTime = 20000; + + private ZkClient zc = null; + + @Before + public void init() { + try { + zc = new ZkClient(addr, outTime, outTime); + } catch (Exception e) { + e.printStackTrace(); + } + + } + + + @Test + public void run() { + + zc.subscribeChildChanges("/krpc/user", new IZkChildListener() { + @Override + public void handleChildChange(String s, List list) throws Exception { + + System.out.println("s====" + s); + if (list != null && list.size() > 0) { + list.forEach(l -> { + System.out.println("list====" + l); + }); + } else { + + System.out.println("list====null"); + } + + } + }); + + System.out.println("监听打开"); + + try { + TimeUnit.HOURS.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + + } + + @Test + public void get() { + + String path = "/krpc/user"; + List serverList = zc.getChildren(path); + + + System.out.println(serverList); + + } + + + @Test + public void add() { + +// zc.writeData("/FirstZnode","1"); + if (!zc.exists("/krpc")) { + zc.create("/krpc", "", CreateMode.PERSISTENT); + + System.out.println("创建基点krpc"); + } + if (!zc.exists("/krpc/user")) { + zc.create("/krpc/user", "", CreateMode.PERSISTENT); + System.out.println("创建user服务节点"); + } + + + System.out.println(zc.create("/krpc/user/127.0.0.2:8080", "user", CreateMode.EPHEMERAL)); + + + try { + TimeUnit.MINUTES.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + + } + + + public class StringZkSerialize implements ZkSerializer { + @Override + public byte[] serialize(Object o) throws ZkMarshallingError { + try { + return o.toString().getBytes("utf-8"); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + + return null; + } + + @Override + public Object deserialize(byte[] bytes) throws ZkMarshallingError { + try { + return new String(bytes, "utf-8"); + } catch (UnsupportedEncodingException e) { + e.printStackTrace(); + } + return null; + } + } + +} diff --git a/com.krpc.server/src/test/java/com/krpc/server/util/Dom4JTest.java b/com.krpc.server/src/test/java/com/krpc/server/util/Dom4JTest.java index b049d63..cb566d6 100644 --- a/com.krpc.server/src/test/java/com/krpc/server/util/Dom4JTest.java +++ b/com.krpc.server/src/test/java/com/krpc/server/util/Dom4JTest.java @@ -6,34 +6,30 @@ import org.dom4j.Document; import org.dom4j.Element; import org.dom4j.io.SAXReader; +import org.junit.Test; public class Dom4JTest { + + @Test public void test() throws Exception{ // 创建saxReader对象 SAXReader reader = new SAXReader(); // 通过read方法读取一个文件 转换成Document对象 - Document document = reader.read(new File("D:/krpc/service/demo/conf/service.xml")); + Document document = reader.read(new File("/opt/krpc-dev/krpc/demo/config_file_template/server/service.xml")); //获取根节点元素对象 Element node = document.getRootElement(); - Element proNode = node.element("property"); - Element connectionNode = proNode.element("connection"); - Element nettyNode = proNode.element("netty"); - System.out.println(connectionNode.attributeValue("ip")); - System.out.println(connectionNode.attributeValue("port")); - System.out.println(connectionNode.attributeValue("timeout")); - - System.out.println(nettyNode.attributeValue("workerCount")); - - Element servicesNode = node.element("services"); - - List serviceList = servicesNode.elements("service"); - for(Element e:serviceList){ - System.out.println(e.attributeValue("name")); - System.out.println(e.attributeValue("impl")); - } - + Element zkNode = node.element("zk1"); + + + System.out.println(zkNode.attribute("sessionTimeOut")); + + + Element addrNode = zkNode.element("addr"); + + + System.out.println(addrNode.getData().toString()); } } diff --git a/demo/com.a123.call/com.a123.call.iml b/demo/com.a123.call/com.a123.call.iml deleted file mode 100644 index a5668c3..0000000 --- a/demo/com.a123.call/com.a123.call.iml +++ /dev/null @@ -1,24 +0,0 @@ - - - - - - - - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/demo/com.a123.call/pom.xml b/demo/com.a123.call/pom.xml index 221cbd9..205b0ab 100644 --- a/demo/com.a123.call/pom.xml +++ b/demo/com.a123.call/pom.xml @@ -45,7 +45,7 @@ com.krpc com.krpc.client - 0.0.1 + 0.0.2 diff --git a/demo/com.a123.call/src/main/java/com/a123/com/a123/call/App.java b/demo/com.a123.call/src/main/java/com/a123/com/a123/call/App.java index 3a263f8..4b390e2 100644 --- a/demo/com.a123.call/src/main/java/com/a123/com/a123/call/App.java +++ b/demo/com.a123.call/src/main/java/com/a123/com/a123/call/App.java @@ -7,6 +7,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.log4j.xml.DOMConfigurator; import org.slf4j.Logger; @@ -33,7 +34,7 @@ public static void main(String[] args) { final Logger log = LoggerFactory.getLogger(App.class); // 初始KRPC服务 - KRPC.init("src/resources/client.xml"); + KRPC.init("/opt/krpc-dev/krpc/demo/com.a123.call/src/resources/client.xml"); // 通过代理获取接口类,第二个参数为client.xml文件中服务的名字,第三个参数为该接口具体实现的名字,需要跟该服务的配置文件的name值一样 UserService service = ProxyFactory.create(UserService.class, "user", "userService"); @@ -49,10 +50,10 @@ public static void main(String[] args) { Executor pool = Executors.newFixedThreadPool(200); final CountDownLatch count = new CountDownLatch(100000); start = System.currentTimeMillis(); - for (int i = 0; i < 100000; i++) { + for (int i = 0; i < 30; i++) { pool.execute(new Task(service, i, log,count)); - + TimeUnit.SECONDS.sleep(5); } count.await(); diff --git a/demo/com.a123.call/src/resources/client.xml b/demo/com.a123.call/src/resources/client.xml index 3aaef85..de6750d 100644 --- a/demo/com.a123.call/src/resources/client.xml +++ b/demo/com.a123.call/src/resources/client.xml @@ -1,19 +1,24 @@ - - + + + + 127.0.0.1:2181,127.0.0.1:3333 + + + - - - - - + + + - - - + + diff --git a/demo/com.a123.service.user.impl/com.a123.service.user.impl.iml b/demo/com.a123.service.user.impl/com.a123.service.user.impl.iml deleted file mode 100644 index 0102045..0000000 --- a/demo/com.a123.service.user.impl/com.a123.service.user.impl.iml +++ /dev/null @@ -1,16 +0,0 @@ - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/demo/com.a123.service.user/com.a123.service.user.iml b/demo/com.a123.service.user/com.a123.service.user.iml deleted file mode 100644 index 2eb246a..0000000 --- a/demo/com.a123.service.user/com.a123.service.user.iml +++ /dev/null @@ -1,15 +0,0 @@ - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/demo/config_file_template/client/client.xml b/demo/config_file_template/client/client.xml index 200f133..132e2a4 100644 --- a/demo/config_file_template/client/client.xml +++ b/demo/config_file_template/client/client.xml @@ -1,16 +1,26 @@ - + + + + 127.0.0.1:2181,127.0.0.1:3333 + + + + + + - + - - + - + + + diff --git a/demo/config_file_template/server/service.xml b/demo/config_file_template/server/service.xml index eff8136..0f2b437 100644 --- a/demo/config_file_template/server/service.xml +++ b/demo/config_file_template/server/service.xml @@ -1,11 +1,10 @@ - - + @@ -17,6 +16,9 @@ - + + + 127.0.0.1:2181,127.0.0.1:3333 + \ No newline at end of file