serverPaths = curatorFramework.getChildren().forPath(path);
+//
+// for (String serverPath : serverPaths) {
+//
+// log.info(serverPath);
+//
+// }
+//
+// }
+// }
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/registry/zookeeper/ZookeeperRegistry.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/registry/zookeeper/ZookeeperRegistry.java
new file mode 100644
index 0000000..aa0d820
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/registry/zookeeper/ZookeeperRegistry.java
@@ -0,0 +1,26 @@
+package com.bruce.lightning.rpc.registry.zookeeper;
+
+import com.bruce.lightning.rpc.registry.RegistryApi;
+
+public class ZookeeperRegistry implements RegistryApi {
+
+ @Override
+ public void register() {
+
+ }
+
+ @Override
+ public void unregister() {
+
+ }
+
+ @Override
+ public void subscribe() {
+
+ }
+
+ @Override
+ public void unsubscribe() {
+
+ }
+}
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/LightningServer.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/LightningServer.java
new file mode 100644
index 0000000..55c303b
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/LightningServer.java
@@ -0,0 +1,114 @@
+package com.bruce.lightning.rpc.server;
+
+import com.bruce.lightning.rpc.server.initial.marshalling.MarshallingServerHandlerChannelInitializer;
+import com.bruce.lightning.rpc.util.PlatformUtil;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.epoll.EpollEventLoopGroup;
+import io.netty.channel.epoll.EpollServerSocketChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.ServerSocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * ChannelOption.SO_BACKLOG:指定了内核为此套接口排队的最大连接数,对于给定的监听套接口
+ * 内核维护两个队列:未连接队列和已连接队列,三次握手完成后,将会从未完成队列移动到已完成
+ * 队列的尾部,当进程调用accept时,从已完成队列的头部取出一个给进程.
+ * ChannelOption.SO_BACKLOG,被规定为两个队列总和的最大值,大多数实现
+ * 默认值为5,在高并发的情况下明显不够,netty,默认设置为windows200,其他为128
+ *
+ */
+public class LightningServer {
+ private static final Logger log = LoggerFactory.getLogger(LightningServer.class);
+
+ private EventLoopGroup acceptGroup;
+ private EventLoopGroup workerGroup;
+
+ private int port = 8088;
+
+ public LightningServer(int port) {
+ this.port = port;
+ }
+
+ /**
+ * 同步连接
+ */
+ public void startSync() {
+ acceptGroup = PlatformUtil.isLinux() ? new EpollEventLoopGroup(2) : new NioEventLoopGroup(1);
+ workerGroup = PlatformUtil.isLinux() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
+ Class extends ServerSocketChannel> serverSocketChannelClass = PlatformUtil.isLinux() ? EpollServerSocketChannel.class : NioServerSocketChannel.class;
+
+ ServerBootstrap bootstrap = new ServerBootstrap();
+ bootstrap.group(acceptGroup, workerGroup)
+ .channel(serverSocketChannelClass)
+ .option(ChannelOption.SO_BACKLOG, 128)
+ .childOption(ChannelOption.SO_KEEPALIVE, false) //默认为false
+ .childHandler(new MarshallingServerHandlerChannelInitializer());
+
+ try {
+ //异步建立连接
+ bootstrap.bind(port).sync();
+
+ //netty启动后创建zk临时节点,创建之前先检测是否存在节点,存在先删除再创建
+ // if (existZkPath()) {
+ // deleteZkPath();
+ // }
+ // createZkPath();
+
+ log.info("Lightning rpc server started !!! port:{}", port);
+
+ } catch (Exception e) {
+ log.error("Lightning rpc server 动异常:", e);
+ this.close();
+ }
+
+
+ }
+
+ // private boolean existZkPath() throws Exception {
+ // InetAddress localHost = InetAddress.getLocalHost();
+ // Stat stat = ZKCuratorFactory.create().checkExists()
+ // .forPath(ZKConstants.SERVER_PATH + "/" + localHost.getHostAddress() + ":" + port);
+ // return stat != null;
+ // }
+ //
+ // private void createZkPath() throws Exception {
+ // InetAddress localHost = InetAddress.getLocalHost();
+ // ZKCuratorFactory.create().create()
+ // .creatingParentsIfNeeded()
+ // .withMode(CreateMode.EPHEMERAL) //创建ZK临时节点,连接断开后删除
+ // .forPath(ZKConstants.SERVER_PATH + "/" + localHost.getHostAddress() + ":" + port);
+ // log.info("netty 服务端注册到Zookeeper服务器");
+ // }
+ //
+ // private void deleteZkPath() throws Exception {
+ // InetAddress localHost = InetAddress.getLocalHost();
+ // ZKCuratorFactory.create().delete()
+ // .guaranteed()
+ // .deletingChildrenIfNeeded()
+ // .forPath(ZKConstants.SERVER_PATH + "/" + localHost.getHostAddress() + ":" + port);
+ // log.info("删除注册的节点");
+ // }
+
+ public void close() {
+ try {
+ // deleteZkPath();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ if (acceptGroup != null) {
+ acceptGroup.shutdownGracefully();
+ }
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
+ }
+ log.info("destroy Lightning rpc server");
+ }
+
+
+}
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/annotation/ExcludeRpc.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/annotation/ExcludeRpc.java
new file mode 100644
index 0000000..18edf56
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/annotation/ExcludeRpc.java
@@ -0,0 +1,23 @@
+package com.bruce.lightning.rpc.server.annotation;
+
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Created by bruce on 2019/1/11 19:44
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface ExcludeRpc {
+
+ /**
+ * 排除不需要暴漏的接口
+ */
+ Class>[] exclude() default {};
+
+}
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/handler/HeartBeatHandler.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/handler/HeartBeatHandler.java
new file mode 100644
index 0000000..16f4f7c
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/handler/HeartBeatHandler.java
@@ -0,0 +1,65 @@
+package com.bruce.lightning.rpc.server.handler;
+
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ * 心跳检查,如果指定时间内都没有读写请求,会执行userEventTriggered方法,触发IdleStateEvent事件
+ *
+ * 服务端"读"事件,代表客户端向服务端发送数据
+ * 服务端"写"事件,代表服务端向客户端发送数据
+ * 服务端"读写"事件,双向发送数据
+ *
+ * 如果服务端在10s内没有收到"读写"事件,则向客户端发送心跳
+ * 如果服务端在20s内没有收到"读"事件,则认为客户端断开连接,关闭与客户端的连接
+ *
+ * 所以注意: "读写"检测的时间需要小于"读"检测时间,否则在没有发送心跳之前channel可能已经被关闭
+ *
+ * 另一种服务端心跳机制: 在指定时间内如果没有"读写"事件,则发送心跳检测,
+ * 通过计数器统计发送的心跳的次数,超过指定次数则断开与客户端的连接
+ *
+ *
+ */
+@ChannelHandler.Sharable
+public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
+ private static final Logger log = LoggerFactory.getLogger(HeartBeatHandler.class);
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if ("pong".equals(msg)) {
+ log.info("接收到客户端{}的pong", ctx.channel().remoteAddress());
+ } else if ("ping".equals(msg)) {
+ ctx.writeAndFlush("pong");
+ log.info("接收到客户端{}的ping", ctx.channel().remoteAddress());
+ } else {
+ super.channelRead(ctx, msg);
+ }
+ }
+
+ //事件处理.
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent) {
+ //该事件需要配合 io.netty.handler.timeout.IdleStateHandler使用
+ IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
+ if (idleStateEvent.state() == IdleState.ALL_IDLE) {
+ //向客户端发送心跳检测
+ ctx.writeAndFlush("ping");
+ } else if (idleStateEvent.state() == IdleState.READER_IDLE) {
+ //超过指定时间没有读事件,关闭连接
+ ctx.channel().close();
+ log.info("READER_IDLE 关闭远程客户端{}", ctx.channel().remoteAddress());
+ }
+ } else {
+ super.userEventTriggered(ctx, evt);
+ }
+
+ }
+
+}
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/handler/RpcServerHandler.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/handler/RpcServerHandler.java
new file mode 100644
index 0000000..5774d37
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/handler/RpcServerHandler.java
@@ -0,0 +1,35 @@
+package com.bruce.lightning.rpc.server.handler;
+
+import com.bruce.lightning.rpc.common.RpcRequest;
+import com.bruce.lightning.rpc.common.RpcResponse;
+import com.bruce.lightning.rpc.server.mapping.BeanMethodContext;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RpcServerHandler extends ChannelInboundHandlerAdapter {
+ private static final Logger log = LoggerFactory.getLogger(RpcServerHandler.class);
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+
+ log.info("服务端接收:{}", msg);
+
+ RpcRequest request = (RpcRequest) msg;
+ //TODO 应该在异步线程中执行业务逻辑处理
+ RpcResponse response = BeanMethodContext.process(request);
+
+ ctx.writeAndFlush(response);
+ }
+
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ if (cause instanceof java.io.IOException) {
+ log.warn("远程主机强迫关闭了一个现有的连接,{}", ctx.channel().remoteAddress());
+ } else {
+ cause.printStackTrace();
+ }
+ }
+}
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/initial/json/ServerHandlerChannelInitializer.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/initial/json/ServerHandlerChannelInitializer.java
new file mode 100644
index 0000000..e9f62b2
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/initial/json/ServerHandlerChannelInitializer.java
@@ -0,0 +1,44 @@
+package com.bruce.lightning.rpc.server.initial.json;
+
+import com.bruce.lightning.rpc.common.AppendDelimiterOutboundHandler;
+import com.bruce.lightning.rpc.common.serial.JsonEncodeHandler;
+import com.bruce.lightning.rpc.server.handler.HeartBeatHandler;
+import com.bruce.lightning.rpc.server.handler.RpcServerHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.DelimiterBasedFrameDecoder;
+import io.netty.handler.codec.Delimiters;
+import io.netty.handler.codec.string.StringDecoder;
+import io.netty.handler.codec.string.StringEncoder;
+import io.netty.handler.timeout.IdleStateHandler;
+
+public class ServerHandlerChannelInitializer extends ChannelInitializer {
+
+ AppendDelimiterOutboundHandler appendDelimiterOutboundHandler = new AppendDelimiterOutboundHandler();
+ StringDecoder stringDecoder = new StringDecoder();
+ StringEncoder stringEncoder = new StringEncoder();
+
+ JsonEncodeHandler jsonEncodeHandler = new JsonEncodeHandler();
+ ServerJsonDecodeHandler serverJsonDecodeHandler = new ServerJsonDecodeHandler();
+
+ HeartBeatHandler heartBeatHandler = new HeartBeatHandler();
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast(new DelimiterBasedFrameDecoder(65535, Delimiters.lineDelimiter()));
+ pipeline.addLast(stringDecoder); //字符串解码
+ pipeline.addLast(stringEncoder); //字符串编码
+ pipeline.addLast(appendDelimiterOutboundHandler); //写出添加分隔符处理器
+
+ //心跳检测机制(这里采用服务端向客户端发送ping机制)
+ pipeline.addLast(new IdleStateHandler(25, 0, 10));
+ pipeline.addLast(heartBeatHandler);
+
+ pipeline.addLast(serverJsonDecodeHandler);
+ pipeline.addLast(jsonEncodeHandler);
+
+ pipeline.addLast(new RpcServerHandler());
+ }
+}
\ No newline at end of file
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/initial/json/ServerJsonDecodeHandler.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/initial/json/ServerJsonDecodeHandler.java
new file mode 100644
index 0000000..c131491
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/initial/json/ServerJsonDecodeHandler.java
@@ -0,0 +1,37 @@
+package com.bruce.lightning.rpc.server.initial.json;
+
+import com.bruce.lightning.rpc.common.RpcRequest;
+import com.bruce.lightning.rpc.server.mapping.BeanMethod;
+import com.bruce.lightning.rpc.server.mapping.BeanMethodContext;
+import com.bruce.lightning.rpc.util.JsonUtils;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Type;
+
+public class ServerJsonDecodeHandler extends ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+
+ RpcRequest request = JsonUtils.jsonToObj(msg.toString(), RpcRequest.class);
+
+ BeanMethod beanMethod = BeanMethodContext.getRpcBeanMap().get(request.getFullMethodName());
+ Method method = beanMethod.getMethod();
+ Object[] args = request.getArgs();
+
+ //TODO 将json默认的对象转为具体的实体类
+ //消费端User到服务提供端后可能是LinkedHashMap, 需要再转一下转成User
+ Type[] genericParameterTypes = method.getGenericParameterTypes();
+ Object[] objects = new Object[genericParameterTypes.length];
+ for (int i = 0; i < genericParameterTypes.length; i++) {
+ Object arg = JsonUtils.parse(JsonUtils.toJson(args[i]), genericParameterTypes[i]);
+ objects[i] =arg;
+ }
+ //替换为真实的args
+ request.setArgs(objects);
+
+ super.channelRead(ctx, request);
+ }
+}
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/initial/marshalling/MarshallingServerHandlerChannelInitializer.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/initial/marshalling/MarshallingServerHandlerChannelInitializer.java
new file mode 100644
index 0000000..ee2cba6
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/initial/marshalling/MarshallingServerHandlerChannelInitializer.java
@@ -0,0 +1,27 @@
+package com.bruce.lightning.rpc.server.initial.marshalling;
+
+import com.bruce.lightning.rpc.common.serial.MarshallingCodeFactory;
+import com.bruce.lightning.rpc.server.handler.HeartBeatHandler;
+import com.bruce.lightning.rpc.server.handler.RpcServerHandler;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.timeout.IdleStateHandler;
+
+public class MarshallingServerHandlerChannelInitializer extends ChannelInitializer {
+
+ HeartBeatHandler heartBeatHandler = new HeartBeatHandler();
+
+ @Override
+ protected void initChannel(SocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());
+ pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());
+
+ //心跳检测机制(这里采用服务端向客户端发送ping机制)
+ pipeline.addLast(new IdleStateHandler(25, 0, 10));
+ pipeline.addLast(heartBeatHandler);
+
+ pipeline.addLast(new RpcServerHandler());
+ }
+}
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/mapping/BeanMethod.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/mapping/BeanMethod.java
new file mode 100644
index 0000000..ba0fe3a
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/mapping/BeanMethod.java
@@ -0,0 +1,25 @@
+package com.bruce.lightning.rpc.server.mapping;
+
+
+import java.lang.reflect.Method;
+
+public class BeanMethod {
+
+ public BeanMethod(Object bean, Method method) {
+ this.bean = bean;
+ this.method = method;
+ }
+
+ private Object bean;
+
+ private Method method;
+
+ public Object getBean() {
+ return bean;
+ }
+
+ public Method getMethod() {
+ return method;
+ }
+
+}
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/mapping/BeanMethodContext.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/mapping/BeanMethodContext.java
new file mode 100644
index 0000000..08184b8
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/server/mapping/BeanMethodContext.java
@@ -0,0 +1,78 @@
+package com.bruce.lightning.rpc.server.mapping;
+
+import com.bruce.lightning.rpc.common.RpcMethodUtil;
+import com.bruce.lightning.rpc.common.RpcRequest;
+import com.bruce.lightning.rpc.common.RpcResponse;
+import com.bruce.lightning.rpc.server.annotation.ExcludeRpc;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class BeanMethodContext {
+ private static final Logger logger = LoggerFactory.getLogger(BeanMethodContext.class);
+
+ private static final ConcurrentHashMap rpcBeanMap = new ConcurrentHashMap<>();
+
+
+ public static void addServiceBean(Object serviceBean) {
+ Class>[] interfaces = serviceBean.getClass().getInterfaces();
+ if (interfaces.length == 0) {
+ throw new IllegalStateException(serviceBean.getClass().getName() + " is not an interface implementation class");
+ }
+
+ ExcludeRpc excludeRpcInterface = serviceBean.getClass().getAnnotation(ExcludeRpc.class);
+
+ for (Class> rpcInterface : interfaces) {
+ if (excludeRpcInterface != null && Arrays.asList(excludeRpcInterface.exclude()).contains(rpcInterface)) {
+ continue;
+ }
+ Method[] declaredMethods = rpcInterface.getDeclaredMethods();
+ for (Method declaredMethod : declaredMethods) {
+ //只有public方法才能发布为远程服务
+ int modifiers = declaredMethod.getModifiers();
+ if ((modifiers == (Modifier.PUBLIC | Modifier.ABSTRACT)) || modifiers == Modifier.PUBLIC) {
+ String fullMethodName = RpcMethodUtil.getFullMethodName(declaredMethod);
+ BeanMethod beanMethod = new BeanMethod(serviceBean, declaredMethod);
+ rpcBeanMap.put(fullMethodName, beanMethod);
+ logger.info("rpc service:" + fullMethodName);
+ }
+ }
+ }
+ }
+
+ public static ConcurrentHashMap getRpcBeanMap() {
+ return rpcBeanMap;
+ }
+
+ //中介者模式
+ public static RpcResponse process(RpcRequest request) {
+ String fullMethodName = request.getFullMethodName();
+ BeanMethod beanMethod = rpcBeanMap.get(fullMethodName);
+
+ RpcResponse response = new RpcResponse();
+ response.setRequestId(request.getId());
+
+ if (beanMethod == null) {
+ logger.warn("未找到服务方法{}", fullMethodName);
+ response.setCode("not found");
+ response.setErrorMesg("未找到服务方法:" + fullMethodName);
+ return response;
+ }
+ Object bean = beanMethod.getBean();
+ Method method = beanMethod.getMethod();
+ Object[] args = request.getArgs();
+ try {
+ Object result = method.invoke(bean, args);
+ response.setResult(result);
+ } catch (Exception e) {
+ e.printStackTrace();
+ response.setErrorMesg(e.getMessage());
+ }
+ return response;
+ }
+
+}
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/client/EnableNettyRpcClient.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/client/EnableNettyRpcClient.java
new file mode 100644
index 0000000..196e9d0
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/client/EnableNettyRpcClient.java
@@ -0,0 +1,28 @@
+package com.bruce.lightning.rpc.spring.client;
+
+import org.springframework.context.annotation.Import;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+
+/**
+ * Created by bruce on 2019/1/11 21:32
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Import({NettyRpcClientAutoConfigRegistrar.class})
+public @interface EnableNettyRpcClient {
+
+ /** 服务提供者地址 */
+ String host();
+
+ /** 服务提供者端口 */
+ int port() default 8088;
+
+
+}
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/client/NettyRpcClientAutoConfig.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/client/NettyRpcClientAutoConfig.java
new file mode 100644
index 0000000..1293538
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/client/NettyRpcClientAutoConfig.java
@@ -0,0 +1,101 @@
+package com.bruce.lightning.rpc.spring.client;
+
+import com.bruce.lightning.rpc.client.LightningClient;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.boot.context.event.ApplicationReadyEvent;
+import org.springframework.context.ApplicationListener;
+import org.springframework.context.SmartLifecycle;
+import org.springframework.core.annotation.Order;
+
+import java.lang.reflect.Field;
+
+/**
+ * Created by bruce on 2019/1/11 20:52
+ */
+@Order(1000)
+public class NettyRpcClientAutoConfig implements BeanPostProcessor, ApplicationListener, InitializingBean, SmartLifecycle, DisposableBean {
+
+ private int port;
+ private String host;
+
+ private LightningClient client;
+
+ /** 优先使用注册中心上的服务地址,否则使用默认的 */
+ public NettyRpcClientAutoConfig(String host, int port) {
+ this.port = port;
+ this.host = host;
+ }
+
+ @Override
+ public void afterPropertiesSet() throws Exception {
+ client = new LightningClient();
+ }
+
+ @Override
+ public void onApplicationEvent(ApplicationReadyEvent event) {
+ // CuratorFramework curatorFramework = ZKCuratorFactory.create();
+ // try {
+ // //客户端监听zk变化
+ // CuratorWatcher zkServerWatcher = new ZkServerWatcher();
+ // curatorFramework.getChildren().usingWatcher(zkServerWatcher).forPath(ZKConstants.SERVER_PATH);
+ // //获取服务地址
+ // List servers = curatorFramework.getChildren().forPath(ZKConstants.SERVER_PATH);
+ // if (!CollectionUtils.isEmpty(servers)) {
+ // serverAddress = servers.stream().distinct().findAny().orElseGet(() -> serverAddress);
+ // }
+ // } catch (Exception e) {
+ // e.printStackTrace();
+ // }
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ @Override
+ public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
+
+ Field[] declaredFields = bean.getClass().getDeclaredFields();
+ for (Field declaredField : declaredFields) {
+ RpcReference rpcReference = declaredField.getAnnotation(RpcReference.class);
+ //rpc 仅支持接口类型
+ if (rpcReference != null && declaredField.getType().isInterface()) {
+ Class> rpcInterface = declaredField.getType();
+ Object jdkProxy = client.createProxy(rpcInterface, rpcReference.timeout());
+ try {
+ declaredField.setAccessible(true);
+ declaredField.set(bean, jdkProxy);
+ } catch (IllegalAccessException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ return bean;
+ }
+
+ @Override
+ public int getPhase() {
+ return Integer.MAX_VALUE - 4000;
+ }
+
+ @Override
+ public void start() {
+ client.startSync(host, port);
+ }
+
+ @Override
+ public void stop() {
+
+ }
+
+ @Override
+ public boolean isRunning() {
+ return false;
+ }
+}
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/client/NettyRpcClientAutoConfigRegistrar.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/client/NettyRpcClientAutoConfigRegistrar.java
new file mode 100644
index 0000000..5c14c8d
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/client/NettyRpcClientAutoConfigRegistrar.java
@@ -0,0 +1,32 @@
+package com.bruce.lightning.rpc.spring.client;
+
+import org.springframework.beans.factory.config.BeanDefinition;
+import org.springframework.beans.factory.support.AbstractBeanDefinition;
+import org.springframework.beans.factory.support.BeanDefinitionBuilder;
+import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
+import org.springframework.beans.factory.support.BeanDefinitionRegistry;
+import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
+import org.springframework.core.annotation.AnnotationAttributes;
+import org.springframework.core.type.AnnotationMetadata;
+
+/**
+ * Created by bruce on 2019/1/11 21:32
+ */
+public class NettyRpcClientAutoConfigRegistrar implements ImportBeanDefinitionRegistrar {
+
+ @Override
+ public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
+ AnnotationAttributes attributes = AnnotationAttributes.fromMap(metadata.getAnnotationAttributes(EnableNettyRpcClient.class.getName()));
+
+ String host = attributes.getString("host");
+ int port = (int) attributes.get("port");
+
+ BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(NettyRpcClientAutoConfig.class);
+ builder.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
+ builder.addConstructorArgValue(host);
+ builder.addConstructorArgValue(port);
+
+ AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
+ BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, registry);
+ }
+}
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/client/RpcReference.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/client/RpcReference.java
new file mode 100644
index 0000000..412b5c1
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/client/RpcReference.java
@@ -0,0 +1,22 @@
+package com.bruce.lightning.rpc.spring.client;
+
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Created by bruce on 2019/1/11 21:03
+ */
+@Target(ElementType.FIELD)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+public @interface RpcReference {
+
+ int timeout() default 300000; //毫秒
+
+
+
+}
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/server/EnableNettyRpcServer.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/server/EnableNettyRpcServer.java
new file mode 100644
index 0000000..19186eb
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/server/EnableNettyRpcServer.java
@@ -0,0 +1,22 @@
+package com.bruce.lightning.rpc.spring.server;
+
+import org.springframework.context.annotation.Import;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Import({NettyRpcServerAutoConfigRegistrar.class})
+public @interface EnableNettyRpcServer {
+
+
+ int port() default 8088;
+
+
+}
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/server/NettyRpcServerAutoConfig.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/server/NettyRpcServerAutoConfig.java
new file mode 100644
index 0000000..0eaf035
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/server/NettyRpcServerAutoConfig.java
@@ -0,0 +1,77 @@
+package com.bruce.lightning.rpc.spring.server;
+
+import com.bruce.lightning.rpc.server.LightningServer;
+import com.bruce.lightning.rpc.server.mapping.BeanMethodContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.DisposableBean;
+import org.springframework.beans.factory.config.BeanPostProcessor;
+import org.springframework.context.SmartLifecycle;
+import org.springframework.core.annotation.Order;
+
+
+@Order(200)
+public class NettyRpcServerAutoConfig implements BeanPostProcessor, SmartLifecycle, DisposableBean {
+
+ private static final Logger logger = LoggerFactory.getLogger(NettyRpcServerAutoConfig.class);
+
+ private LightningServer lightningServer;
+
+ private int port;
+
+ public NettyRpcServerAutoConfig(int port) {
+ this.port = port;
+ }
+
+ @Override
+ public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
+ if (bean.getClass().isAnnotationPresent(RpcService.class)) {
+ BeanMethodContext.addServiceBean(bean);
+ }
+ return bean;
+ }
+
+ @Override
+ public void destroy() throws Exception {
+ if (lightningServer != null) {
+ lightningServer.close();
+ }
+ }
+
+ @Override
+ public int getPhase() {
+ return Integer.MAX_VALUE - 50000;
+ }
+
+ private boolean isRunning = false;
+
+ @Override
+ public void start() {
+ logger.info("SmartLifecycle start");
+ isRunning = true;
+ try {
+ lightningServer = new LightningServer(port);
+ lightningServer.startSync();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void stop() {
+ logger.info("SmartLifecycle stop");
+ }
+
+ /**
+ *
+ * 返回false时执行, org.springframework.context.Lifecycle#start()
+ * 返回true时执行, org.springframework.context.Lifecycle#stop()
+ *
+ */
+ @Override
+ public boolean isRunning() {
+ logger.info("SmartLifecycle isRunning:{}", isRunning);
+ return isRunning;
+ }
+}
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/server/NettyRpcServerAutoConfigRegistrar.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/server/NettyRpcServerAutoConfigRegistrar.java
new file mode 100644
index 0000000..a3fa782
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/server/NettyRpcServerAutoConfigRegistrar.java
@@ -0,0 +1,27 @@
+package com.bruce.lightning.rpc.spring.server;
+
+import org.springframework.beans.factory.config.BeanDefinition;
+import org.springframework.beans.factory.support.AbstractBeanDefinition;
+import org.springframework.beans.factory.support.BeanDefinitionBuilder;
+import org.springframework.beans.factory.support.BeanDefinitionReaderUtils;
+import org.springframework.beans.factory.support.BeanDefinitionRegistry;
+import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
+import org.springframework.core.annotation.AnnotationAttributes;
+import org.springframework.core.type.AnnotationMetadata;
+
+public class NettyRpcServerAutoConfigRegistrar implements ImportBeanDefinitionRegistrar {
+
+ @Override
+ public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
+
+ AnnotationAttributes attributes = AnnotationAttributes.fromMap(metadata.getAnnotationAttributes(EnableNettyRpcServer.class.getName()));
+ int port = (int) attributes.get("port");
+
+ BeanDefinitionBuilder builder = BeanDefinitionBuilder.genericBeanDefinition(NettyRpcServerAutoConfig.class);
+ builder.addConstructorArgValue(port);
+ builder.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
+ AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
+
+ BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, registry);
+ }
+}
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/server/RpcService.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/server/RpcService.java
new file mode 100644
index 0000000..94878f6
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/spring/server/RpcService.java
@@ -0,0 +1,20 @@
+package com.bruce.lightning.rpc.spring.server;
+
+import org.springframework.core.annotation.AliasFor;
+import org.springframework.stereotype.Component;
+
+import java.lang.annotation.*;
+
+/**
+ * Created by bruce on 2019/1/11 19:20
+ */
+@Target(ElementType.TYPE)
+@Retention(RetentionPolicy.RUNTIME)
+@Documented
+@Component
+public @interface RpcService {
+
+ @AliasFor(annotation = Component.class)
+ String value() default "";
+
+}
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/util/JsonUtils.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/util/JsonUtils.java
new file mode 100644
index 0000000..9589bef
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/util/JsonUtils.java
@@ -0,0 +1,74 @@
+package com.bruce.lightning.rpc.util;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.lang.reflect.Type;
+import java.util.List;
+
+/**
+ * 自定义响应结构, 转换类
+ */
+public class JsonUtils {
+
+ // 定义jackson对象
+ private static final ObjectMapper objectMapper = new ObjectMapper();
+
+ static {
+ objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ }
+
+ /**
+ * 将对象转换成json字符串。
+ * Title: pojoToJson
+ * Description:
+ */
+ public static String toJson(Object data) {
+ try {
+ return objectMapper.writeValueAsString(data);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * 将json结果集转化为对象
+ *
+ * @param jsonStr json数据
+ * @param beanType 对象中的object类型
+ */
+ public static T jsonToObj(String jsonStr, Class beanType) {
+ try {
+ return objectMapper.readValue(jsonStr, beanType);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public static Object parse(String json, Type type) {
+ JavaType javaType = objectMapper.getTypeFactory().constructType(type);
+ try {
+ return objectMapper.readerFor(javaType).readValue(json);
+ } catch (JsonProcessingException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * 将json数据转换成pojo对象list
+ * Title: jsonToList
+ * Description:
+ */
+ public static List jsonToList(String jsonStr, Class beanType) {
+ JavaType javaType = objectMapper.getTypeFactory().constructParametricType(List.class, beanType);
+ try {
+ return objectMapper.readValue(jsonStr, javaType);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+
+}
diff --git a/lightning-rpc/src/main/java/com/bruce/lightning/rpc/util/PlatformUtil.java b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/util/PlatformUtil.java
new file mode 100644
index 0000000..f319b60
--- /dev/null
+++ b/lightning-rpc/src/main/java/com/bruce/lightning/rpc/util/PlatformUtil.java
@@ -0,0 +1,193 @@
+package com.bruce.lightning.rpc.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Properties;
+
+/**
+ * Created by bruce on 2018/10/29 17:07
+ */
+public class PlatformUtil {
+
+ // NOTE: since this class can be initialized by application code in some
+ // cases, we must encapsulate all calls to System.getProperty("...") in
+ // a doPrivileged block except for standard JVM properties such as
+ // os.name, os.version, os.arch, java.vm.name, etc.
+
+ private static final String os = System.getProperty("os.name");
+ private static final String version = System.getProperty("os.version");
+ private static final boolean embedded;
+ private static final String embeddedType;
+ private static final boolean useEGL;
+ private static final boolean doEGLCompositing;
+ // a property used to denote a non-default impl for this host
+ private static String javafxPlatform;
+
+ static {
+ javafxPlatform = AccessController.doPrivileged((PrivilegedAction) () -> System.getProperty("javafx.platform"));
+ embedded = AccessController.doPrivileged((PrivilegedAction) () -> Boolean.getBoolean("com.sun.javafx.isEmbedded"));
+ embeddedType = AccessController.doPrivileged((PrivilegedAction) () -> System.getProperty("embedded"));
+ useEGL = AccessController.doPrivileged((PrivilegedAction) () -> Boolean.getBoolean("use.egl"));
+ if (useEGL) {
+ doEGLCompositing = AccessController.doPrivileged((PrivilegedAction) () -> Boolean.getBoolean("doNativeComposite"));
+ } else
+ doEGLCompositing = false;
+ }
+
+ private static final boolean ANDROID = "android".equals(javafxPlatform) || "Dalvik".equals(System.getProperty("java.vm.name"));
+ private static final boolean WINDOWS = os.startsWith("Windows");
+ private static final boolean WINDOWS_VISTA_OR_LATER = WINDOWS && versionNumberGreaterThanOrEqualTo(6.0f);
+ private static final boolean WINDOWS_7_OR_LATER = WINDOWS && versionNumberGreaterThanOrEqualTo(6.1f);
+ private static final boolean MAC = os.startsWith("Mac");
+ private static final boolean LINUX = os.startsWith("Linux") && !ANDROID;
+ private static final boolean SOLARIS = os.startsWith("SunOS");
+ private static final boolean IOS = os.startsWith("iOS");
+
+ /**
+ * Utility method used to determine whether the version number as
+ * reported by system properties is greater than or equal to a given
+ * value.
+ *
+ * @param value The value to test against.
+ * @return false if the version number cannot be parsed as a float,
+ * otherwise the comparison against value.
+ */
+ private static boolean versionNumberGreaterThanOrEqualTo(float value) {
+ try {
+ return Float.parseFloat(version) >= value;
+ } catch (Exception e) {
+ return false;
+ }
+ }
+
+ /**
+ * Returns true if the operating system is a form of Windows.
+ */
+ public static boolean isWindows(){
+ return WINDOWS;
+ }
+
+ /**
+ * Returns true if the operating system is at least Windows Vista(v6.0).
+ */
+ public static boolean isWinVistaOrLater(){
+ return WINDOWS_VISTA_OR_LATER;
+ }
+
+ /**
+ * Returns true if the operating system is at least Windows 7(v6.1).
+ */
+ public static boolean isWin7OrLater(){
+ return WINDOWS_7_OR_LATER;
+ }
+
+ /**
+ * Returns true if the operating system is a form of Mac OS.
+ */
+ public static boolean isMac(){
+ return MAC;
+ }
+
+ /**
+ * Returns true if the operating system is a form of Linux.
+ */
+ public static boolean isLinux(){
+ return LINUX;
+ }
+
+ public static boolean useEGL() {
+ return useEGL;
+ }
+
+ public static boolean useEGLWindowComposition() {
+ return doEGLCompositing;
+ }
+
+ public static boolean useGLES2() {
+ String useGles2 = "false";
+ useGles2 =
+ AccessController.doPrivileged((PrivilegedAction) () -> System.getProperty("use.gles2"));
+ if ("true".equals(useGles2))
+ return true;
+ else
+ return false;
+ }
+
+ /**
+ * Returns true if the operating system is a form of Unix, including Linux.
+ */
+ public static boolean isSolaris(){
+ return SOLARIS;
+ }
+
+ /**
+ * Returns true if the operating system is a form of Linux or Solaris
+ */
+ public static boolean isUnix(){
+ return LINUX || SOLARIS;
+ }
+
+ /**
+ * Returns true if the platform is embedded.
+ */
+ public static boolean isEmbedded() {
+ return embedded;
+ }
+
+ /**
+ * Returns a string with the embedded type - ie eglx11, eglfb, dfb or null.
+ */
+ public static String getEmbeddedType() {
+ return embeddedType;
+ }
+
+ /**
+ * Returns true if the operating system is iOS
+ */
+ public static boolean isIOS(){
+ return IOS;
+ }
+
+ private static void loadPropertiesFromFile(final File file) {
+ Properties p = new Properties();
+ try {
+ InputStream in = new FileInputStream(file);
+ p.load(in);
+ in.close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ if (javafxPlatform == null) {
+ javafxPlatform = p.getProperty("javafx.platform");
+ }
+ String prefix = javafxPlatform + ".";
+ int prefixLength = prefix.length();
+ boolean foundPlatform = false;
+ for (Object o : p.keySet()) {
+ String key = (String) o;
+ if (key.startsWith(prefix)) {
+ foundPlatform = true;
+ String systemKey = key.substring(prefixLength);
+ if (System.getProperty(systemKey) == null) {
+ String value = p.getProperty(key);
+ System.setProperty(systemKey, value);
+ }
+ }
+ }
+ if (!foundPlatform) {
+ System.err.println(
+ "Warning: No settings found for javafx.platform='"
+ + javafxPlatform + "'");
+ }
+ }
+
+ public static boolean isAndroid() {
+ return ANDROID;
+ }
+
+
+}
diff --git a/lightning-rpc/src/main/proto/Data.proto b/lightning-rpc/src/main/proto/Data.proto
new file mode 100644
index 0000000..50323e9
--- /dev/null
+++ b/lightning-rpc/src/main/proto/Data.proto
@@ -0,0 +1,19 @@
+
+
+syntax = "proto3";
+
+option java_multiple_files = true;
+option java_package = "com.bruce.grpc.demo.entity";
+
+message HelloRequest {
+ string greeting = 1;
+}
+
+message HelloResponse {
+ string reply = 1;
+}
+
+service HelloService {
+ rpc SayHello(HelloRequest) returns (HelloResponse);
+}
+
diff --git a/lightning-rpc/src/test/java/test/FutureTest.java b/lightning-rpc/src/test/java/test/FutureTest.java
new file mode 100644
index 0000000..6874adc
--- /dev/null
+++ b/lightning-rpc/src/test/java/test/FutureTest.java
@@ -0,0 +1,30 @@
+package test;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class FutureTest {
+
+ public static void main(String[] args) throws ExecutionException, InterruptedException {
+
+ Map map = new ConcurrentHashMap<>();
+
+ //10 接口, 每个接口 5个方法
+
+ for (int i = 0; i < 1000; i++) {
+ map.put("com.bruce.fastrpc.spring.server.InitRpcBeanMediator.method" + 1, "哈哈哈" + 1);
+ }
+
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < 500000; i++) {
+ int i1 = ThreadLocalRandom.current().nextInt(50);
+ Object o = map.get("com.bruce.fastrpc.spring.server.InitRpcBeanMediator.method" + i1);
+ }
+ System.out.println(System.currentTimeMillis() - start);
+
+ }
+}
diff --git a/lightning-rpc/src/test/java/test/FutureTest2.java b/lightning-rpc/src/test/java/test/FutureTest2.java
new file mode 100644
index 0000000..7d99cc6
--- /dev/null
+++ b/lightning-rpc/src/test/java/test/FutureTest2.java
@@ -0,0 +1,36 @@
+package test;
+
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class FutureTest2 {
+
+ public static void main(String[] args) throws ExecutionException, InterruptedException {
+
+
+ Map> map = new ConcurrentHashMap<>();
+
+ //10 接口, 每个接口 5个方法
+
+ for (int i = 0; i < 20; i++) {
+ HashMap methods = new HashMap<>();
+ for (int j = 0; j < 50; j++) {
+ methods.put("method" + j, "哈哈哈" + 1);
+ }
+ map.put("com.bruce.fastrpc.spring.server.InitRpcBeanMediator" + i, methods);
+ }
+
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < 500000; i++) {
+ int i1 = ThreadLocalRandom.current().nextInt(10);
+ Map stringObjectMap = map.get("com.bruce.fastrpc.spring.server.InitRpcBeanMediator" + i1);
+ Object o = stringObjectMap.get("method" + i1);
+ }
+ System.out.println(System.currentTimeMillis() - start);
+
+ }
+}
diff --git a/lightning-rpc/src/test/java/test/ZkTest.java b/lightning-rpc/src/test/java/test/ZkTest.java
new file mode 100644
index 0000000..af65fdb
--- /dev/null
+++ b/lightning-rpc/src/test/java/test/ZkTest.java
@@ -0,0 +1,51 @@
+// package test;
+//
+// import com.bruce.fastrpc.registry.zookeeper.ZKCuratorFactory;
+// import lombok.extern.slf4j.Slf4j;
+// import org.apache.curator.framework.CuratorFramework;
+// import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
+// import org.apache.zookeeper.CreateMode;
+//
+// @Slf4j
+// public class ZkTest {
+// public static void main(String[] args) throws Exception {
+//
+// CuratorFramework client = ZKCuratorFactory.create();
+// /*System.out.println(client.getState());
+// String s = client.create().forPath("/netty");
+// System.out.println(s);*/
+//
+// // 创建节点
+// String nodePath = "/super/bruce";
+// byte[] data = "superme".getBytes();
+//
+// //create(client, nodePath, data);
+// delete(client, "/netty");
+// }
+//
+// public static String create(CuratorFramework client, String path) throws Exception {
+// return create(client, path, null);
+// }
+//
+// public static String create(CuratorFramework client, String path, byte[] data) throws Exception {
+// ACLBackgroundPathAndBytesable create = client.create()
+// .creatingParentsIfNeeded()
+// .withMode(CreateMode.PERSISTENT);
+// //.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE)
+// //.forPath(path, data);
+//
+// String s = data == null ? create.forPath(path) : create.forPath(path, data);
+// log.info("创建节点成功{}", s);
+// return s;
+// }
+//
+// public static void delete(CuratorFramework client, String path) throws Exception {
+// client.delete()
+// .guaranteed()
+// .deletingChildrenIfNeeded()
+// .forPath(path);
+// log.info("删除节点成功");
+// }
+//
+//
+// }
diff --git a/netty-hello/.gitignore b/netty-hello/.gitignore
new file mode 100644
index 0000000..82eca33
--- /dev/null
+++ b/netty-hello/.gitignore
@@ -0,0 +1,25 @@
+/target/
+!.mvn/wrapper/maven-wrapper.jar
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/build/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
\ No newline at end of file
diff --git a/netty-hello/pom.xml b/netty-hello/pom.xml
new file mode 100644
index 0000000..40838bd
--- /dev/null
+++ b/netty-hello/pom.xml
@@ -0,0 +1,61 @@
+
+
+ 4.0.0
+
+
+ com.bruce.rpc
+ lightning-parent
+ 1.0
+
+
+ com.bruce
+ netty-hello
+ 0.0.1-SNAPSHOT
+ jar
+
+ netty-hello
+ Demo project for Spring Boot
+
+
+
+ UTF-8
+ UTF-8
+ 1.8
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter
+
+
+
+ org.projectlombok
+ lombok
+ true
+
+
+
+ io.netty
+ netty-all
+
+
+
+ org.springframework.boot
+ spring-boot-starter-test
+ test
+
+
+
+
+
+
+ org.springframework.boot
+ spring-boot-maven-plugin
+
+
+
+
+
+
diff --git a/netty-hello/src/main/java/com/bruce/netty/NettyHelloApplication.java b/netty-hello/src/main/java/com/bruce/netty/NettyHelloApplication.java
new file mode 100644
index 0000000..2c4706b
--- /dev/null
+++ b/netty-hello/src/main/java/com/bruce/netty/NettyHelloApplication.java
@@ -0,0 +1,12 @@
+package com.bruce.netty;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class NettyHelloApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(NettyHelloApplication.class, args);
+ }
+}
diff --git a/netty-hello/src/main/java/com/bruce/netty/config/NettyHttpServerBootstrap.java b/netty-hello/src/main/java/com/bruce/netty/config/NettyHttpServerBootstrap.java
new file mode 100644
index 0000000..a748042
--- /dev/null
+++ b/netty-hello/src/main/java/com/bruce/netty/config/NettyHttpServerBootstrap.java
@@ -0,0 +1,74 @@
+package com.bruce.netty.config;
+
+import com.bruce.netty.http.HelloChildInitializer;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+/**
+ * Created by bruce on 2018/10/15 22:48
+ */
+@Slf4j
+@ConditionalOnProperty(prefix = "netty.server",name = "type",havingValue = "http")
+@Configuration
+public class NettyHttpServerBootstrap {
+
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+ private final int port = 8088;
+
+ @PostConstruct
+ public void nettyServerStart() {
+ log.info("netty http server starting");
+ try {
+ start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ log.info("netty http server started on port:" + port);
+ }
+
+ @PreDestroy
+ public void aa() {
+ close();
+ }
+
+ private void close() {
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
+ }
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
+ }
+ log.info("destroy netty server thread");
+ }
+
+ private void start() throws InterruptedException {
+ bossGroup = new NioEventLoopGroup(2);
+ workerGroup = new NioEventLoopGroup();
+ try {
+ ServerBootstrap bootstrap = new ServerBootstrap();
+ bootstrap.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new HelloChildInitializer());
+ //使用sync()会阻塞当前线程,程序无法向下执行,可以在子线程中执行,或者不使用sync();
+ // ChannelFuture channelFuture = bootstrap.bind(8088).sync();
+ ChannelFuture channelFuture = bootstrap.bind(port);
+ channelFuture.channel().closeFuture();
+
+ } catch (Exception e) {
+ log.error("NettyServerBootstrap-->", e);
+ close();
+ }
+ }
+
+
+}
diff --git a/netty-hello/src/main/java/com/bruce/netty/config/NettyWebsocketServerBootstrap.java b/netty-hello/src/main/java/com/bruce/netty/config/NettyWebsocketServerBootstrap.java
new file mode 100644
index 0000000..a04a34e
--- /dev/null
+++ b/netty-hello/src/main/java/com/bruce/netty/config/NettyWebsocketServerBootstrap.java
@@ -0,0 +1,74 @@
+package com.bruce.netty.config;
+
+import com.bruce.netty.websocket.WebsocketInitializer;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+/**
+ * Created by bruce on 2018/10/15 22:48
+ */
+@Slf4j
+@ConditionalOnProperty(prefix = "netty.server",name = "type",havingValue = "ws")
+@Configuration
+public class NettyWebsocketServerBootstrap {
+
+ private EventLoopGroup bossGroup;
+ private EventLoopGroup workerGroup;
+ private final int port = 8088;
+
+ @PostConstruct
+ public void nettyServerStart() {
+ log.info("netty websocket server starting");
+ try {
+ start();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ log.info("netty websocket server started on port:" + port);
+ }
+
+ @PreDestroy
+ public void aa() {
+ close();
+ }
+
+ private void close() {
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
+ }
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
+ }
+ log.info("destroy netty server thread");
+ }
+
+ private void start() throws InterruptedException {
+ bossGroup = new NioEventLoopGroup(2);
+ workerGroup = new NioEventLoopGroup();
+ try {
+ ServerBootstrap bootstrap = new ServerBootstrap();
+ bootstrap.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new WebsocketInitializer());
+ //使用sync()会阻塞当前线程,程序无法向下执行,可以在子线程中执行,或者不使用sync();
+ // ChannelFuture channelFuture = bootstrap.bind(8088).sync();
+ ChannelFuture channelFuture = bootstrap.bind(port);
+ channelFuture.channel().closeFuture();
+
+ } catch (Exception e) {
+ log.error("NettyServerBootstrap-->", e);
+ close();
+ }
+ }
+
+
+}
diff --git a/netty-hello/src/main/java/com/bruce/netty/http/EventLogInboundHandler.java b/netty-hello/src/main/java/com/bruce/netty/http/EventLogInboundHandler.java
new file mode 100644
index 0000000..f9a3d93
--- /dev/null
+++ b/netty-hello/src/main/java/com/bruce/netty/http/EventLogInboundHandler.java
@@ -0,0 +1,68 @@
+package com.bruce.netty.http;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandler;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Created by bruce on 2018/10/16 19:06
+ */
+@Slf4j
+public class EventLogInboundHandler implements ChannelInboundHandler {
+
+
+ @Override
+ public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
+ log.info("channelRegistered");
+ }
+
+ @Override
+ public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
+ log.info("channelUnregistered");
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ log.info("channelActive");
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ log.info("channelInactive");
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ log.info("channelRead");
+ }
+
+ @Override
+ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+ log.info("channelReadComplete");
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ log.info("userEventTriggered");
+ }
+
+ @Override
+ public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+ log.info("channelWritabilityChanged");
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ log.info("handlerAdded");
+ }
+
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+ log.info("handlerRemoved");
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ log.info("exceptionCaught");
+ }
+}
diff --git a/netty-hello/src/main/java/com/bruce/netty/http/HelloChildInitializer.java b/netty-hello/src/main/java/com/bruce/netty/http/HelloChildInitializer.java
new file mode 100644
index 0000000..130f469
--- /dev/null
+++ b/netty-hello/src/main/java/com/bruce/netty/http/HelloChildInitializer.java
@@ -0,0 +1,21 @@
+package com.bruce.netty.http;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpServerCodec;
+
+/**
+ * Created by bruce on 2018/10/16 18:57
+ */
+public class HelloChildInitializer extends ChannelInitializer {
+
+ @Override
+ protected void initChannel(SocketChannel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addLast("httpServerCodec", new HttpServerCodec());
+ pipeline.addLast("helloInboundHandler", new HelloInboundHandler());
+ pipeline.addLast("eventLogInboundHandler", new EventLogInboundHandler());
+
+ }
+}
diff --git a/netty-hello/src/main/java/com/bruce/netty/http/HelloInboundHandler.java b/netty-hello/src/main/java/com/bruce/netty/http/HelloInboundHandler.java
new file mode 100644
index 0000000..6d2b90a
--- /dev/null
+++ b/netty-hello/src/main/java/com/bruce/netty/http/HelloInboundHandler.java
@@ -0,0 +1,41 @@
+package com.bruce.netty.http;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.*;
+import io.netty.util.CharsetUtil;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Created by bruce on 2018/10/16 19:06
+ */
+@Slf4j
+public class HelloInboundHandler extends SimpleChannelInboundHandler {
+
+ public HelloInboundHandler() {
+ log.info("HelloInboundHandler create");
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, DefaultHttpRequest request) throws Exception {
+ Channel channel = ctx.channel();
+ //打印客户端的远程地址
+ log.info("客户端:" + channel.remoteAddress().toString() + " 请求路径:" + request.uri());
+ ByteBuf content = Unpooled.copiedBuffer("hello netty--", CharsetUtil.UTF_8);
+ //返回客户端数据
+ DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
+ //设置响应头
+ HttpHeaders headers = response.headers();
+ headers.set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
+ headers.set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
+ ChannelFuture future = ctx.writeAndFlush(response);
+ //加上下面代码后,每次请求结束,该handler就被销毁,下次请求重新实例化
+ //future.addListener(ChannelFutureListener.CLOSE);
+ }
+
+
+}
diff --git a/netty-hello/src/main/java/com/bruce/netty/websocket/DispatcherHandler.java b/netty-hello/src/main/java/com/bruce/netty/websocket/DispatcherHandler.java
new file mode 100644
index 0000000..c1271fe
--- /dev/null
+++ b/netty-hello/src/main/java/com/bruce/netty/websocket/DispatcherHandler.java
@@ -0,0 +1,37 @@
+package com.bruce.netty.websocket;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
+
+/**
+ * Created by bruce on 2018/10/25 10:24
+ */
+public class DispatcherHandler extends ChannelInboundHandlerAdapter {
+ private static final boolean checkStartsWith = false;
+ private static final String websocketPath = "/ws";
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ ChannelPipeline pipeline = ctx.pipeline();
+ if (msg instanceof FullHttpRequest) {
+ FullHttpRequest req = (FullHttpRequest) msg;
+ if (isNotWebSocketPath(req) && pipeline.get("httpHandler") == null) {
+ pipeline.addLast("httpHandler", new HttpHandler());
+ } else {
+ pipeline.addLast(new WebSocketServerProtocolHandler(websocketPath));
+ pipeline.addLast("websocketChatHandler", new WebsocketChatHandler());
+ }
+
+ pipeline.remove("dispatcherHandler");
+ }
+
+ super.channelRead(ctx, msg);
+ }
+
+ private boolean isNotWebSocketPath(FullHttpRequest req) {
+ return checkStartsWith ? !req.uri().startsWith(websocketPath) : !req.uri().equals(websocketPath);
+ }
+}
diff --git a/netty-hello/src/main/java/com/bruce/netty/websocket/HttpHandler.java b/netty-hello/src/main/java/com/bruce/netty/websocket/HttpHandler.java
new file mode 100644
index 0000000..a03ba8d
--- /dev/null
+++ b/netty-hello/src/main/java/com/bruce/netty/websocket/HttpHandler.java
@@ -0,0 +1,50 @@
+package com.bruce.netty.websocket;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.*;
+import io.netty.util.CharsetUtil;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Created by bruce on 2018/10/16 19:06
+ */
+@Slf4j
+public class HttpHandler extends SimpleChannelInboundHandler {
+ public HttpHandler() {
+ log.info("HttpHandler create");
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ super.handlerAdded(ctx);
+ }
+
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+ super.handlerRemoved(ctx);
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
+ Channel channel = ctx.channel();
+ //打印客户端的远程地址
+ log.info("客户端:" + channel.remoteAddress().toString() + " 请求路径:" + request.uri());
+ ByteBuf content = Unpooled.copiedBuffer("hello netty--", CharsetUtil.UTF_8);
+ //返回客户端数据
+ DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content);
+ //设置响应头
+ HttpHeaders headers = response.headers();
+ headers.set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
+ headers.set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
+ ChannelFuture future = ctx.writeAndFlush(response);
+ //加上下面代码后,每次请求结束,该handler就被销毁,下次请求重新实例化
+ //future.addListener(ChannelFutureListener.CLOSE);
+ }
+
+
+}
diff --git a/netty-hello/src/main/java/com/bruce/netty/websocket/WebsocketChatHandler.java b/netty-hello/src/main/java/com/bruce/netty/websocket/WebsocketChatHandler.java
new file mode 100644
index 0000000..dcf164a
--- /dev/null
+++ b/netty-hello/src/main/java/com/bruce/netty/websocket/WebsocketChatHandler.java
@@ -0,0 +1,64 @@
+package com.bruce.netty.websocket;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.util.concurrent.GlobalEventExecutor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.LocalDateTime;
+
+/**
+ * Created by bruce on 2018/10/16 21:46
+ * TextWebSocketFrame: 在netty中,是用于为websocket专门处理文本的对象,frame是消息的载体
+ */
+@Slf4j
+public class WebsocketChatHandler extends SimpleChannelInboundHandler {
+ //记录所有的客户端
+ private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+ public WebsocketChatHandler(){
+ log.info("WebsocketChatHandler create");
+ }
+
+ /*
+ * 当客户端连接服务端之后(打开连接)
+ * 获取客户端的channle,并且放到ChannelGroup中去进行管理
+ */
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ super.handlerAdded(ctx);
+ clients.add(ctx.channel());
+ log.info("handlerAdded ");
+ log.info(ctx.channel().id().asLongText());
+ log.info(ctx.channel().id().asShortText());
+ log.info("客户端连接当前数量:" + clients.size());
+ }
+
+ @Override
+ public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
+ super.handlerRemoved(ctx);
+ //当触发handlerRemoved,ChannelGroup会自动移除对应客户端的channel
+ //clients.remove(ctx.channel());
+ log.info("handlerRemoved ");
+ log.info(ctx.channel().id().asLongText());
+ log.info(ctx.channel().id().asShortText());
+ log.info("客户端连接剩余数量:" + clients.size());
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
+ String text = msg.text();
+ log.info("接收到的消息" + text);
+ /*for (Channel client : clients) {
+ if (client.id().asLongText().equals(ctx.channel().id().asLongText())) {
+ //自己就不需要发送了
+ continue;
+ }
+ client.writeAndFlush(new TextWebSocketFrame("服务端在" + LocalDateTime.now() + "接收到" + ctx.channel().id().asShortText() + "消息" + text));
+ }*/
+ clients.writeAndFlush(new TextWebSocketFrame("服务端在" + LocalDateTime.now() + "接收到" + ctx.channel().id().asShortText() + "消息" + text));
+ }
+}
diff --git a/netty-hello/src/main/java/com/bruce/netty/websocket/WebsocketInitializer.java b/netty-hello/src/main/java/com/bruce/netty/websocket/WebsocketInitializer.java
new file mode 100644
index 0000000..d7d9d83
--- /dev/null
+++ b/netty-hello/src/main/java/com/bruce/netty/websocket/WebsocketInitializer.java
@@ -0,0 +1,39 @@
+package com.bruce.netty.websocket;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+
+/**
+ * Created by bruce on 2018/10/16 21:25
+ */
+public class WebsocketInitializer extends ChannelInitializer {
+
+ @Override
+ protected void initChannel(SocketChannel channel) throws Exception {
+ ChannelPipeline pipeline = channel.pipeline();
+ //websocket基于http,需要http编解码器
+ pipeline.addLast(new HttpServerCodec());
+ //对大数据流读写的支持
+ pipeline.addLast(new ChunkedWriteHandler());
+ //对httpMessage进行聚合,聚合成FullHttpRequest或者FullHttpResponse
+ pipeline.addLast(new HttpObjectAggregator(1048576));
+ /* 添加websocket支持
+ * websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws
+ * 本handler会帮你处理一些繁重的复杂的事
+ * 会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳
+ * 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同
+ */
+ //pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
+
+ pipeline.addLast("dispatcherHandler", new DispatcherHandler());
+ // pipeline.addLast(new WebsocketChatHandler());
+ //pipeline.addLast(new HttpHandler());
+ }
+
+
+}
diff --git a/netty-hello/src/main/resources/application.properties b/netty-hello/src/main/resources/application.properties
new file mode 100644
index 0000000..6d084b2
--- /dev/null
+++ b/netty-hello/src/main/resources/application.properties
@@ -0,0 +1,2 @@
+
+netty.server.type=ws
\ No newline at end of file
diff --git a/netty-hello/src/main/resources/static/demo/index.html b/netty-hello/src/main/resources/static/demo/index.html
new file mode 100644
index 0000000..8fba025
--- /dev/null
+++ b/netty-hello/src/main/resources/static/demo/index.html
@@ -0,0 +1,59 @@
+
+
+
+
+ websocket
+
+
+
+
+
+ 发送消息:
+
+
+ 接收消息:
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/netty-hello/src/test/java/com/bruce/netty/NettyHelloApplicationTests.java b/netty-hello/src/test/java/com/bruce/netty/NettyHelloApplicationTests.java
new file mode 100644
index 0000000..1baeebb
--- /dev/null
+++ b/netty-hello/src/test/java/com/bruce/netty/NettyHelloApplicationTests.java
@@ -0,0 +1,16 @@
+package com.bruce.netty;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class NettyHelloApplicationTests {
+
+ @Test
+ public void contextLoads() {
+ }
+
+}
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..1073dc9
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,88 @@
+
+
+ 4.0.0
+
+
+ org.springframework.boot
+ spring-boot-starter-parent
+ 2.4.1
+
+
+
+ com.bruce.rpc
+ lightning-parent
+ 1.0
+ pom
+
+ lightning-parent
+ Demo project for Spring Boot
+
+
+
+ UTF-8
+ UTF-8
+ 1.8
+
+
+
+ lightning-rpc
+ lightning-rpc-demo
+ netty-hello
+
+
+
+
+
+
+ io.netty
+ netty-all
+
+ 4.1.55.Final
+
+
+
+ com.google.protobuf
+ protobuf-java
+ 3.14.0
+
+
+
+ org.jboss.marshalling
+ jboss-marshalling-serial
+ 2.0.10.Final
+
+
+
+ org.apache.zookeeper
+ zookeeper
+ 3.4.14
+
+
+ org.slf4j
+ slf4j-log4j12
+
+
+ log4j
+ log4j
+
+
+
+
+
+ org.apache.curator
+ curator-recipes
+ 4.0.1
+
+
+ org.apache.zookeeper
+ zookeeper
+
+
+
+
+
+
+
+
+
\ No newline at end of file