Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor ][admin]Refactor process #2391

Merged
merged 14 commits into from
Oct 17, 2023
Prev Previous commit
Next Next commit
add i81n
  • Loading branch information
gaoyan1998 committed Oct 16, 2023
commit 09bd8231ae944bdcda13e7258537356d9e478276
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

Expand Down Expand Up @@ -64,7 +63,7 @@ public static ConsoleContextHolder getInstances() {
/**
* Get a list of all processes
* */
public List<ProcessEntity> list(){
public List<ProcessEntity> list() {
return new ArrayList<>(logPross.values());
}

Expand Down Expand Up @@ -158,6 +157,7 @@ public void registerProcessStep(ProcessStepType type, String processName) throws
.stepStatus(ProcessStatus.RUNNING)
.startTime(LocalDateTime.now())
.type(type)
.name(type.getDesc().getMessage())
.log(new StringBuilder())
.errLog(new StringBuilder())
.childStepsMap(new LinkedHashMap<>())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,25 @@

package org.dinky.controller;

import io.swagger.annotations.ApiImplicitParams;
import org.dinky.context.ConsoleContextHolder;
import org.dinky.data.result.ProTableResult;
import org.dinky.process.model.ProcessEntity;
import org.dinky.sse.SseEmitterUTF8;

import java.util.concurrent.TimeUnit;

import org.dinky.sse.SseEmitterUTF8;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
import lombok.RequiredArgsConstructor;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

/**
* ProcessController
Expand All @@ -53,12 +53,12 @@ public class ProcessController {
@GetMapping(value = "/getLastUpdateData", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ApiOperation("Get Last Update Data")
@ApiImplicitParams({
@ApiImplicitParam(name = "lastTime", value = "Last Time", required = false, dataType = "Long"),
@ApiImplicitParam(name = "keys", value = "jobids", required = true, dataType = "String")
@ApiImplicitParam(name = "lastTime", value = "Last Time", required = false, dataType = "Long"),
@ApiImplicitParam(name = "keys", value = "jobids", required = true, dataType = "String")
})
public SseEmitter getLastUpdateData(String keys) {
SseEmitter emitter = new SseEmitterUTF8(TimeUnit.MINUTES.toMillis(30));
ConsoleContextHolder.getInstances().addSse(keys,emitter);
ConsoleContextHolder.getInstances().addSse(keys, emitter);
return emitter;
}

Expand All @@ -80,5 +80,4 @@ public ProTableResult<ProcessEntity> listAllProcess(@RequestParam boolean active
.data(ConsoleContextHolder.getInstances().list())
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.dinky.metadata.driver.Driver;
import org.dinky.metadata.result.JdbcSelectResult;
import org.dinky.mybatis.service.impl.SuperServiceImpl;
import org.dinky.process.annotations.ProcessStep;
import org.dinky.process.enums.ProcessStepType;
import org.dinky.service.DataBaseService;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -265,6 +267,7 @@ public List<SqlExplainResult> explainCommonSql(TaskDTO task) {
}

@Override
@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE_COMMON_SQL)
public JobResult executeCommonSql(SqlDTO sqlDTO) {
JobResult result = new JobResult();
result.setStatement(sqlDTO.getStatement());
Expand Down
13 changes: 11 additions & 2 deletions dinky-common/src/main/java/org/dinky/data/enums/Status.java
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,18 @@ public enum Status {
* gateway config
*/
GAETWAY_KUBERNETS_TEST_FAILED(180, "gateway.kubernetes.test.failed"),
GAETWAY_KUBERNETS_TEST_SUCCESS(180, "gateway.kubernetes.test.success"),
;
GAETWAY_KUBERNETS_TEST_SUCCESS(181, "gateway.kubernetes.test.success"),

/**
* process
* */
PROCESS_SUBMIT_SUBMITTASK(190, "process.submit.submitTask"),
PROCESS_SUBMIT_CHECKSQL(191, "process.submit.checkSql"),
PROCESS_SUBMIT_EXECUTE(192, "process.submit.execute"),
PROCESS_SUBMIT_BUILDCONFIG(193, "process.submit.buildConfig"),
PROCESS_SUBMIT_EXECUTECOMMSQL(194, "process.submit.execute.commSql"),
PROCESS_SUBMIT_EXECUTEFLINKSQL(195, "process.submit.execute.flinkSql"),
;
private final int code;
private final String key;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,11 @@ gateway.kubernetes.test.failed= failed to test the Flink configuration:
task.status.is.not.done=In the current publishing state, a job is running and the online fails, please stop and go online
task.sql.explain.failed=SQL parsing failed, please check the SQL statement
task.update.failed=Task Update failed

# process
process.submit.submitTask= Submit the job
process.submit.checkSql=Check job
process.submit.execute = execute the job
process.submit.buildConfig=Build configuration information
process.submit.execute.commSql=excute commonSql
process.submit.execute.flinkSql=excute flinkSql
10 changes: 9 additions & 1 deletion dinky-common/src/main/resources/i18n/messages_zh_CN.properties
Original file line number Diff line number Diff line change
Expand Up @@ -235,4 +235,12 @@ gateway.kubernetes.test.failed=测试 Flink 配置失败:
# Task
task.status.is.not.done=当前发布状态下有作业正在运行,上线失败,请停止后上线
task.sql.explain.failed=sql解析失败,请检查
task.update.failed=Task更新失败
task.update.failed=Task更新失败

# process
process.submit.submitTask=提交作业
process.submit.checkSql=检查作业
process.submit.execute=执行作业
process.submit.buildConfig=构建配置信息
process.submit.execute.commSql=执行普通sql
process.submit.execute.flinkSql=执行flinkSql
3 changes: 3 additions & 0 deletions dinky-core/src/main/java/org/dinky/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.dinky.interceptor.FlinkInterceptor;
import org.dinky.interceptor.FlinkInterceptorResult;
import org.dinky.parser.SqlType;
import org.dinky.process.annotations.ProcessStep;
import org.dinky.process.enums.ProcessStepType;
import org.dinky.trans.Operations;
import org.dinky.utils.DinkyClassLoaderUtil;
import org.dinky.utils.LogUtil;
Expand Down Expand Up @@ -300,6 +302,7 @@ public boolean close() {
return true;
}

@ProcessStep(type = ProcessStepType.SUBMIT_EXECUTE)
public JobResult executeSql(String statement) {
// TODO 改为ProcessStep注释
Job job = Job.init(runMode, config, executorConfig, executor, statement, useGateway);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,29 @@
package org.dinky.process.enums;

import org.dinky.assertion.Asserts;
import org.dinky.data.enums.Status;

import lombok.Getter;

@Getter
public enum ProcessStepType {
SUBMIT_TASK("SUBMIT_TASK", null),
SUBMIT_PRECHECK("SUBMIT_PRECHECK", SUBMIT_TASK),
SUBMIT_EXECUTE("SUBMIT_EXECUTE", SUBMIT_TASK),
SUBMIT_BUILD_CONFIG("SUBMIT_BUILD_CONFIG", SUBMIT_EXECUTE),
UNKNOWN("Unknown", null);
SUBMIT_TASK("SUBMIT_TASK", null, Status.PROCESS_SUBMIT_SUBMITTASK),
SUBMIT_PRECHECK("SUBMIT_PRECHECK", SUBMIT_TASK, Status.PROCESS_SUBMIT_CHECKSQL),
SUBMIT_EXECUTE("SUBMIT_EXECUTE", SUBMIT_TASK, Status.PROCESS_SUBMIT_EXECUTE),
SUBMIT_BUILD_CONFIG("SUBMIT_BUILD_CONFIG", SUBMIT_EXECUTE, Status.PROCESS_SUBMIT_BUILDCONFIG),
SUBMIT_EXECUTE_COMMON_SQL("SUBMIT_EXECUTE_COMMON_SQL", SUBMIT_BUILD_CONFIG, Status.PROCESS_SUBMIT_EXECUTECOMMSQL),
SUBMIT_EXECUTE_FLINK_SQL("SUBMIT_EXECUTE_FLINK_SQL", SUBMIT_BUILD_CONFIG, Status.PROCESS_SUBMIT_EXECUTEFLINKSQL),
UNKNOWN("UNKNOWN", null, Status.UNKNOWN_ERROR),
;

private final String value;
private final ProcessStepType parentStep;
private final Status desc;

ProcessStepType(String type, ProcessStepType parentStep) {
ProcessStepType(String type, ProcessStepType parentStep, Status desc) {
this.value = type;
this.parentStep = parentStep;
this.desc = desc;
}

public static ProcessStepType get(String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
@Builder
public class ProcessStep {

private String name;
private ProcessStatus stepStatus;
private ProcessStepType type;
private LocalDateTime startTime;
Expand Down
Loading