Skip to content

Commit

Permalink
[Feature][web] support web console of process list (DataLinkDC#2425)
Browse files Browse the repository at this point in the history
* Optimize the process

* add console ui

* formate code

* formate code

* comment floatButton

* formate code

* formate code

* formate code
  • Loading branch information
gaoyan1998 authored Oct 24, 2023
1 parent 40a814f commit 4a4859a
Show file tree
Hide file tree
Showing 14 changed files with 326 additions and 114 deletions.
17 changes: 10 additions & 7 deletions dinky-admin/src/main/java/org/dinky/aop/ProcessAspect.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.dinky.process.enums.ProcessStatus;
import org.dinky.process.enums.ProcessStepType;
import org.dinky.process.enums.ProcessType;
import org.dinky.process.model.ProcessStepEntity;

import org.apache.http.util.TextUtils;

Expand All @@ -39,15 +40,16 @@
import org.slf4j.MDC;
import org.springframework.stereotype.Component;

import cn.hutool.core.text.StrFormatter;
import lombok.extern.slf4j.Slf4j;

@Aspect
@Slf4j
@Component
public class ProcessAspect {

public static String PROCESS_NAME = "name";
public static String PROCESS_STEP = "step";
public static String PROCESS_NAME = "PROCESS_NAME";
public static String PROCESS_STEP = "PROCESS_STEP";
public ConsoleContextHolder contextHolder = ConsoleContextHolder.getInstances();

/**
Expand All @@ -59,7 +61,7 @@ public Object processAround(ProceedingJoinPoint joinPoint, ExecuteProcess execut

Object result;
Object processId = getProcessId(joinPoint);
String name = executeProcess.type() + String.valueOf(processId);
String name = StrFormatter.format("{}/{}", executeProcess.type().getValue(), String.valueOf(processId));
ProcessType type = executeProcess.type();
contextHolder.registerProcess(type, name);
MDC.put(PROCESS_NAME, name);
Expand Down Expand Up @@ -97,14 +99,15 @@ public Object processStepAround(ProceedingJoinPoint joinPoint, ProcessStep proce
// Record the current step and restore it after the execution is completed
String parentStep = MDC.get(PROCESS_STEP);
ProcessStepType processStepType = processStep.type();
MDC.put(PROCESS_STEP, processStepType.getValue());
contextHolder.registerProcessStep(processStepType, MDC.get(PROCESS_NAME), parentStep);
ProcessStepEntity step = contextHolder.registerProcessStep(processStepType, MDC.get(PROCESS_NAME), parentStep);
MDC.put(PROCESS_STEP, step.getKey());
contextHolder.appendLog(processName, step.getKey(), "Start Process Step:" + step.getType());

try {
result = joinPoint.proceed();
contextHolder.finishedStep(MDC.get(PROCESS_NAME), processStepType, ProcessStatus.FINISHED, null);
contextHolder.finishedStep(MDC.get(PROCESS_NAME), step, ProcessStatus.FINISHED, null);
} catch (Exception e) {
contextHolder.finishedStep(MDC.get(PROCESS_NAME), processStepType, ProcessStatus.FAILED, e);
contextHolder.finishedStep(MDC.get(PROCESS_NAME), step, ProcessStatus.FAILED, e);
throw e;
} finally {
// If a parent step exists, it is restored after the execution is complete
Expand Down
114 changes: 70 additions & 44 deletions dinky-admin/src/main/java/org/dinky/context/ConsoleContextHolder.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@
import org.dinky.process.enums.ProcessType;
import org.dinky.process.exception.DinkyException;
import org.dinky.process.model.ProcessEntity;
import org.dinky.process.model.ProcessStep;
import org.dinky.process.model.ProcessStepEntity;
import org.dinky.utils.LogUtil;

import org.apache.http.util.TextUtils;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -46,6 +48,7 @@

import cn.hutool.core.io.FileUtil;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.lang.UUID;
import cn.hutool.core.text.StrFormatter;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -71,24 +74,40 @@ public List<ProcessEntity> list() {
return new ArrayList<>(logPross.values());
}

public ProcessEntity getProcess(String processName) {
try {
String filePath = String.format("%s/tmp/log/%s.json", System.getProperty("user.dir"), processName);
String string = FileUtil.readString(filePath, StandardCharsets.UTF_8);
return JSONObject.parseObject(string, ProcessEntity.class);
} catch (Exception e) {
log.warn("Get process {} failed, maybe not exits", processName);
return null;
}
}

/**
* Add log messages to specific processes and process steps.
*
* @param processName process name
* @param processStep process step type
* @param stepPid process step type
* @param log messages
* @throws BusException Throws an exception if the process does not exist
*/
public void appendLog(String processName, ProcessStepType processStep, String log) {
public void appendLog(String processName, String stepPid, String log) {
if (!logPross.containsKey(processName)) {
throw new BusException(StrFormatter.format("process {} does not exist", processName));
}
logPross.get(processName).appendLog(log);
ProcessStep stepNode = getStepNode(processStep.getValue(), getStepsMap(processName));
stepNode.appendLog(log);
ProcessEntity process = logPross.get(processName);
process.appendLog(log);
if (stepPid != null) {
ProcessStepEntity stepNode = getStepNode(stepPid, getStepsMap(processName));
stepNode.appendLog(log);
process.setLastUpdateStep(stepNode);
}
// /TOPIC/PROCESS_CONSOLE/FlinkSubmit/12
String topic = StrFormatter.format("{}/{}", SseTopic.PROCESS_CONSOLE.getValue(), processName);
CompletableFuture.runAsync(() -> {
SseSessionContextHolder.sendTopic(topic, logPross.get(processName));
SseSessionContextHolder.sendTopic(topic, process);
});
}

Expand All @@ -104,50 +123,51 @@ public void registerProcess(ProcessType type, String processName) throws Runtime
throw new BusException("Another user is running an action to suppress this request");
}
ProcessEntity entity = ProcessEntity.builder()
.pid(processName)
.key(UUID.fastUUID().toString())
.log(new StringBuilder())
.errLog(new StringBuilder())
.status(ProcessStatus.INITIALIZING)
.type(type)
.name(processName)
.title(processName)
.startTime(LocalDateTime.now())
.stepsMap(new LinkedHashMap<>())
.children(new LinkedList<>())
.build();
logPross.put(processName, entity);
appendLog(processName, null, "Start Process:" + processName);
}

/**
* Register a new process step.
*
* @param type process step type
* @param processName process name
* @param parentStep parent step
* @param parentStepPid parent step
* @throws RuntimeException Throws an exception if the process does not exist
*/
public void registerProcessStep(ProcessStepType type, String processName, String parentStep)
public ProcessStepEntity registerProcessStep(ProcessStepType type, String processName, String parentStepPid)
throws RuntimeException {
if (!logPross.containsKey(processName)) {
throw new BusException(StrFormatter.format("Process {} does not exist", processName));
throw new BusException(StrFormatter.format("Process {} does not exist", type));
}
ProcessEntity process = logPross.get(processName);
process.setStatus(ProcessStatus.RUNNING);
ProcessStep processStep = ProcessStep.builder()
.stepStatus(ProcessStatus.RUNNING)
ProcessStepEntity processStepEntity = ProcessStepEntity.builder()
.key(UUID.fastUUID().toString())
.status(ProcessStatus.RUNNING)
.startTime(LocalDateTime.now())
.type(type)
.name(type.getDesc().getMessage())
.title(type.getDesc().getMessage())
.log(new StringBuilder())
.errLog(new StringBuilder())
.childStepsMap(new LinkedHashMap<>())
.children(new LinkedList<>())
.build();

if (TextUtils.isEmpty(parentStep)) {
if (TextUtils.isEmpty(parentStepPid)) {
// parentStep为空表示为顶级节点
process.getStepsMap().put(type.getValue(), processStep);
process.getChildren().add(processStepEntity);
} else {
ProcessStep stepNode = getStepNode(parentStep, process.getStepsMap());
stepNode.getChildStepsMap().put(type.getValue(), processStep);
ProcessStepEntity stepNode = getStepNode(parentStepPid, process.getChildren());
stepNode.getChildren().add(processStepEntity);
}
return processStepEntity;
}

/**
Expand All @@ -156,6 +176,7 @@ public void registerProcessStep(ProcessStepType type, String processName, String
* @param processName process name
* @param status Process status
* @param e exception object, optional
*
*/
public void finishedProcess(String processName, ProcessStatus status, Throwable e) {
if (!logPross.containsKey(processName)) {
Expand All @@ -164,62 +185,67 @@ public void finishedProcess(String processName, ProcessStatus status, Throwable
ProcessEntity process = logPross.get(processName);
process.setStatus(status);
process.setEndTime(LocalDateTime.now());
process.setTime(process.getEndTime().compareTo(process.getStartTime()));
process.setTime(
Duration.between(process.getStartTime(), process.getEndTime()).toMillis());
if (e != null) {
process.appendErrLog(LogUtil.getError(e));
appendLog(processName, null, LogUtil.getError(e));
}
String filePath = String.format("%s/tmp/log/%s.json", System.getProperty("user.dir"), process.getName());
String filePath = String.format("%s/tmp/log/%s.json", System.getProperty("user.dir"), process.getTitle());
if (FileUtil.exist(filePath)) {
Assert.isTrue(FileUtil.del(filePath));
}
FileUtil.writeUtf8String(JSONObject.toJSONString(process), filePath);
appendLog(processName, null, StrFormatter.format("Process {} exit with status:{}", processName, status));
logPross.remove(processName);
}

/**
* Mark process step as completed.
*
* @param processName process name
* @param type process step type
* @param step process step type
* @param status Process step status
* @param e exception object, optional
*/
public void finishedStep(String processName, ProcessStepType type, ProcessStatus status, Throwable e) {
public void finishedStep(String processName, ProcessStepEntity step, ProcessStatus status, Throwable e) {
if (!logPross.containsKey(processName)) {
return;
}
ProcessStep processStep = getStepNode(type.getValue(), getStepsMap(processName));
processStep.setStepStatus(status);
processStep.setEndTime(LocalDateTime.now());
processStep.setTime(processStep.getEndTime().compareTo(processStep.getStartTime()));
step.setStatus(status);
step.setEndTime(LocalDateTime.now());
step.setTime(Duration.between(step.getStartTime(), step.getEndTime()).toMillis());
if (e != null) {
logPross.get(processName).appendErrLog(LogUtil.getError(e));
appendLog(processName, step.getKey(), LogUtil.getError(e));
}
appendLog(
processName,
step.getKey(),
StrFormatter.format("Process Step {} exit with status:{}", step.getType(), status));
}

private ProcessStep getStepNode(String stepType, Map<String, ProcessStep> stepsMap) {
ProcessStep stepNode = findStepNode(stepType, stepsMap);
private ProcessStepEntity getStepNode(String stepPid, LinkedList<ProcessStepEntity> stepsMap) {
ProcessStepEntity stepNode = findStepNode(stepPid, stepsMap);
if (stepNode != null) {
return stepNode;
}
String errorStr = StrFormatter.format(
"Get Parent Node Failed, This is most likely a Dinky bug, "
+ "please report the following information back to the community:\nProcess:{},\nstep:{},\nprocessNam:{}",
JSONObject.toJSONString(logPross),
stepType,
stepPid,
MDC.get(ProcessAspect.PROCESS_NAME));
throw new DinkyException(errorStr);
}

/**
* 递归查找节点
* */
private ProcessStep findStepNode(String stepType, Map<String, ProcessStep> stepsMap) {
for (Map.Entry<String, ProcessStep> entry : stepsMap.entrySet()) {
if (entry.getKey().equals(stepType)) {
return entry.getValue();
private ProcessStepEntity findStepNode(String stepPid, LinkedList<ProcessStepEntity> stepsMap) {
for (ProcessStepEntity processStepEntity : stepsMap) {
if (processStepEntity.getKey().equals(stepPid)) {
return processStepEntity;
} else {
ProcessStep stepNode = findStepNode(stepType, entry.getValue().getChildStepsMap());
ProcessStepEntity stepNode = findStepNode(stepPid, processStepEntity.getChildren());
if (stepNode != null) {
return stepNode;
}
Expand All @@ -228,7 +254,7 @@ private ProcessStep findStepNode(String stepType, Map<String, ProcessStep> steps
return null;
}

private Map<String, ProcessStep> getStepsMap(String processName) {
return logPross.get(processName).getStepsMap();
private LinkedList<ProcessStepEntity> getStepsMap(String processName) {
return logPross.get(processName).getChildren();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.dinky.context.ConsoleContextHolder;
import org.dinky.data.result.ProTableResult;
import org.dinky.data.result.Result;
import org.dinky.process.model.ProcessEntity;

import org.springframework.web.bind.annotation.GetMapping;
Expand Down Expand Up @@ -62,4 +63,11 @@ public ProTableResult<ProcessEntity> listAllProcess(@RequestParam boolean active
.data(ConsoleContextHolder.getInstances().list())
.build();
}

@GetMapping("/getProcess")
@ApiOperation("get process")
@ApiImplicitParam(name = "processName", value = "process name", dataType = "ProcessEntity")
public Result<ProcessEntity> listAllProcess(@RequestParam String processName) {
return Result.succeed(ConsoleContextHolder.getInstances().getProcess(processName));
}
}
12 changes: 8 additions & 4 deletions dinky-admin/src/main/java/org/dinky/sse/LogSseAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@

import org.dinky.aop.ProcessAspect;
import org.dinky.context.ConsoleContextHolder;
import org.dinky.process.enums.ProcessStepType;

import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
import org.apache.logging.log4j.core.StringLayout;
import org.apache.logging.log4j.core.appender.AbstractAppender;
import org.apache.logging.log4j.core.config.Property;
import org.apache.logging.log4j.core.config.plugins.Plugin;
Expand All @@ -51,6 +51,10 @@ protected LogSseAppender(
super(name, filter, layout, ignoreExceptions, properties);
}

public StringLayout getStringLayout() {
return (StringLayout) getLayout();
}

/**
* This method is called when a new log comes over, contextData is the data in the MDC,
* set in {@link ProcessAspect} If contextData contains PROCESS_NAME and PROCESS_STEP,
Expand All @@ -63,9 +67,9 @@ public void append(LogEvent event) {
if (contextData.containsKey(ProcessAspect.PROCESS_NAME)
&& contextData.containsKey(ProcessAspect.PROCESS_STEP)) {
String processName = contextData.getValue(ProcessAspect.PROCESS_NAME);
String processStep = contextData.getValue(ProcessAspect.PROCESS_STEP);
ConsoleContextHolder.getInstances()
.appendLog(processName, ProcessStepType.get(processStep), event.toString());
String processStepPid = contextData.getValue(ProcessAspect.PROCESS_STEP);
String log = getStringLayout().toSerializable(event);
ConsoleContextHolder.getInstances().appendLog(processName, processStepPid, log);
}
}

Expand Down
2 changes: 1 addition & 1 deletion dinky-admin/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
<!--这个是输出日志的格式,如果对里面的参数不理解,可以去看我的这篇文章,网址是:“https://blog.csdn.net/qq_42449963/article/details/104617356”-->
<!--<PatternLayout pattern="[dinky] %d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>-->
<PatternLayout
pattern="[dinky] %d{yyyy-MM-dd HH:mm:ss.SSS} %highlight{%6p} %style{%5pid}{bright,magenta} --- [%15.15t] %style{%c{20}}{bright,cyan}: %m%n"/>
pattern="%d{yyyy-MM-dd HH:mm:ss.SSS} %p [%15.15t] %c{20}(%L): %m"/>
</LogSseAppender>

</appenders>
Expand Down
Loading

0 comments on commit 4a4859a

Please sign in to comment.