Skip to content

Commit

Permalink
添加线程名
Browse files Browse the repository at this point in the history
  • Loading branch information
brucelwl committed Dec 17, 2020
1 parent b727af9 commit 098e79d
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.bruce.lightning.rpc.common.RpcRequest;
import com.bruce.lightning.rpc.common.RpcResponse;
import com.bruce.lightning.rpc.util.PlatformUtil;
import com.bruce.lightning.rpc.util.ReflectUtils;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
Expand All @@ -14,6 +15,7 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -34,8 +36,10 @@ public LightningClient() {

//String property = DefaultEnvironment.getInstance().getProperty("lightning.rpc.serial.type");

DefaultThreadFactory workerThreadFactory = new DefaultThreadFactory("RpcClientWorker-");
ReflectUtils.set(workerThreadFactory, DefaultThreadFactory.class, "prefix", "RpcClientWorker-");

workerGroup = PlatformUtil.isLinux() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
workerGroup = PlatformUtil.isLinux() ? new EpollEventLoopGroup(workerThreadFactory) : new NioEventLoopGroup(workerThreadFactory);
Class<? extends SocketChannel> socketChannelClass = PlatformUtil.isLinux() ? EpollSocketChannel.class : NioSocketChannel.class;
bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());
pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());

pipeline.addLast(new IdleStateHandler(15, 0, 5));
pipeline.addLast(new IdleStateHandler(60, 0, 25));
pipeline.addLast(clientHeartBeatHandler);

pipeline.addLast(new RpcClientHandler());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@


import java.io.Serializable;
import java.util.Arrays;

public class RpcRequest implements Serializable {

Expand Down Expand Up @@ -35,4 +36,14 @@ public Object[] getArgs() {
public void setArgs(Object[] args) {
this.args = args;
}


@Override
public String toString() {
return "RpcRequest{" +
"id=" + id +
", fullMethodName='" + fullMethodName + '\'' +
", args=" + Arrays.toString(args) +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.bruce.lightning.rpc.server.initial.marshalling.MarshallingServerHandlerChannelInitializer;
import com.bruce.lightning.rpc.util.PlatformUtil;
import com.bruce.lightning.rpc.util.ReflectUtils;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
Expand All @@ -10,6 +11,7 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -38,8 +40,14 @@ public LightningServer(int port) {
* 同步连接
*/
public void startSync() {
acceptGroup = PlatformUtil.isLinux() ? new EpollEventLoopGroup(2) : new NioEventLoopGroup(1);
workerGroup = PlatformUtil.isLinux() ? new EpollEventLoopGroup() : new NioEventLoopGroup();
DefaultThreadFactory acceptThreadFactory = new DefaultThreadFactory("RpcServerAccept-");
ReflectUtils.set(acceptThreadFactory, DefaultThreadFactory.class, "prefix", "RpcServerAccept-");

DefaultThreadFactory workerThreadFactory = new DefaultThreadFactory("RpcServerWorker-");
ReflectUtils.set(workerThreadFactory, DefaultThreadFactory.class, "prefix", "RpcServerWorker-");

acceptGroup = PlatformUtil.isLinux() ? new EpollEventLoopGroup(1, acceptThreadFactory) : new NioEventLoopGroup(1, acceptThreadFactory);
workerGroup = PlatformUtil.isLinux() ? new EpollEventLoopGroup(workerThreadFactory) : new NioEventLoopGroup(workerThreadFactory);
Class<? extends ServerSocketChannel> serverSocketChannelClass = PlatformUtil.isLinux() ? EpollServerSocketChannel.class : NioServerSocketChannel.class;

ServerBootstrap bootstrap = new ServerBootstrap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,54 @@
import com.bruce.lightning.rpc.common.RpcRequest;
import com.bruce.lightning.rpc.common.RpcResponse;
import com.bruce.lightning.rpc.server.mapping.BeanMethodContext;
import com.bruce.lightning.rpc.util.ReflectUtils;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.netty.util.concurrent.GlobalEventExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RpcServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = LoggerFactory.getLogger(RpcServerHandler.class);

static final DefaultEventExecutorGroup eventExecutors;

static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

static {
DefaultThreadFactory threadFactory = new DefaultThreadFactory("RpcServerHandler-");
ReflectUtils.set(threadFactory, DefaultThreadFactory.class, "prefix", "RpcServerHandler-");

eventExecutors = new DefaultEventExecutorGroup(Runtime.getRuntime().availableProcessors() * 10, threadFactory);
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
channels.add(ctx.channel());
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

log.info("服务端接收:{}", msg);

RpcRequest request = (RpcRequest) msg;
//TODO 应该在异步线程中执行业务逻辑处理
RpcResponse response = BeanMethodContext.process(request);

ctx.writeAndFlush(response);
//TODO 接收请求之后,应该在异步线程中执行
eventExecutors.execute(new Runnable() {
@Override
public void run() {
RpcResponse response = BeanMethodContext.process(request);
ctx.writeAndFlush(response);

log.info("服务端响应:{}", request.getId());
}
});


}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());

//心跳检测机制(这里采用服务端向客户端发送ping机制)
pipeline.addLast(new IdleStateHandler(25, 0, 10));
pipeline.addLast(new IdleStateHandler(90, 0, 40));
pipeline.addLast(serverHeartBeatHandler);

pipeline.addLast(new RpcServerHandler());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.bruce.lightning.rpc.util;

import java.lang.reflect.Field;

public abstract class ReflectUtils {


public static void set(Object obj, Class<?> clazz, String fieldName, Object value) {
try {
Field declaredField = clazz.getDeclaredField(fieldName);
declaredField.setAccessible(true);
declaredField.set(obj, value);
} catch (NoSuchFieldException | IllegalAccessException e) {
e.printStackTrace();
}
}


}

0 comments on commit 098e79d

Please sign in to comment.