Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

异常处理、日志处理及部分代码简单美化 #2833

Merged
merged 1 commit into from
May 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public void start() throws Exception {
// init executor-server
initEmbedServer(address, ip, port, appname, accessToken);
}

public void destroy(){
// destroy executor-server
stopEmbedServer();
Expand Down Expand Up @@ -131,6 +132,7 @@ private void initAdminBizList(String adminAddresses, String accessToken) throws
}
}
}

public static List<AdminBiz> getAdminBizList(){
return adminBizList;
}
Expand Down Expand Up @@ -251,6 +253,7 @@ public static JobThread registJobThread(int jobId, IJobHandler handler, String r

return newJobThread;
}

public static JobThread removeJobThread(int jobId, String removeOldReason){
JobThread oldJobThread = jobThreadRepository.remove(jobId);
if (oldJobThread != null) {
Expand All @@ -261,9 +264,8 @@ public static JobThread removeJobThread(int jobId, String removeOldReason){
}
return null;
}

public static JobThread loadJobThread(int jobId){
JobThread jobThread = jobThreadRepository.get(jobId);
return jobThread;
return jobThreadRepository.get(jobId);
}

}
64 changes: 27 additions & 37 deletions xxl-job-core/src/main/java/com/xxl/job/core/server/EmbedServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,8 @@ public class EmbedServer {
public void start(final String address, final int port, final String appname, final String accessToken) {
executorBiz = new ExecutorBizImpl();
thread = new Thread(new Runnable() {

@Override
public void run() {

// param
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
Expand All @@ -61,8 +59,6 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");
}
});


try {
// start server
ServerBootstrap bootstrap = new ServerBootstrap();
Expand Down Expand Up @@ -92,11 +88,9 @@ public void initChannel(SocketChannel channel) throws Exception {
future.channel().closeFuture().sync();

} catch (InterruptedException e) {
if (e instanceof InterruptedException) {
logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
} else {
logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
}
logger.info(">>>>>>>>>>> xxl-job remoting server stop.");
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);
} finally {
// stop
try {
Expand All @@ -106,17 +100,15 @@ public void initChannel(SocketChannel channel) throws Exception {
logger.error(e.getMessage(), e);
}
}

}

});
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.setDaemon(true); // daemon, service jvm, user thread leave >>> daemon leave >>> jvm leave
thread.start();
}

public void stop() throws Exception {
// destroy server thread
if (thread!=null && thread.isAlive()) {
if (thread != null && thread.isAlive()) {
thread.interrupt();
}

Expand All @@ -130,7 +122,7 @@ public void stop() throws Exception {

/**
* netty_http
*
* <p>
* Copy from : https://github.com/xuxueli/xxl-rpc
*
* @author xuxueli 2015-11-24 22:25:15
Expand All @@ -141,6 +133,7 @@ public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<F
private ExecutorBiz executorBiz;
private String accessToken;
private ThreadPoolExecutor bizThreadPool;

public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, ThreadPoolExecutor bizThreadPool) {
this.executorBiz = executorBiz;
this.accessToken = accessToken;
Expand All @@ -149,7 +142,6 @@ public EmbedHttpServerHandler(ExecutorBiz executorBiz, String accessToken, Threa

@Override
protected void channelRead0(final ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {

// request parse
//final byte[] requestBytes = ByteBufUtil.getBytes(msg.content()); // byteBuf.toString(io.netty.util.CharsetUtil.UTF_8);
String requestData = msg.content().toString(CharsetUtil.UTF_8);
Expand All @@ -175,38 +167,38 @@ public void run() {
}

private Object process(HttpMethod httpMethod, String uri, String requestData, String accessTokenReq) {

// valid
if (HttpMethod.POST != httpMethod) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, HttpMethod not support.");
}
if (uri==null || uri.trim().length()==0) {
if (uri == null || uri.trim().length() == 0) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping empty.");
}
if (accessToken!=null
&& accessToken.trim().length()>0
if (accessToken != null
&& accessToken.trim().length() > 0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}

// services mapping
try {
if ("/beat".equals(uri)) {
return executorBiz.beat();
} else if ("/idleBeat".equals(uri)) {
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
} else if ("/run".equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
} else if ("/kill".equals(uri)) {
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
} else if ("/log".equals(uri)) {
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
switch (uri) {
case "/beat":
return executorBiz.beat();
case "/idleBeat":
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
case "/run":
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
case "/kill":
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
case "/log":
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
default:
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
Expand Down Expand Up @@ -261,6 +253,4 @@ public void stopRegistry() {
// stop registry
ExecutorRegistryThread.getInstance().toStop();
}


}