Skip to content

Commit

Permalink
[Fix-2731] [scheduler] Fix push task to dolphinscheduler (#2732)
Browse files Browse the repository at this point in the history
Co-authored-by: wenmo <32723967+wenmo@users.noreply.github.com>
  • Loading branch information
aiwenmo and aiwenmo authored Dec 24, 2023
1 parent 5df10a4 commit fa1ce86
Show file tree
Hide file tree
Showing 16 changed files with 124 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

import cn.hutool.core.collection.CollUtil;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
Expand Down Expand Up @@ -84,9 +83,6 @@ public Result<TaskDefinition> getTaskDefinition(@ApiParam(value = "dinky任务id
example = "1")
public Result<List<TaskMainInfo>> getTaskMainInfos(@ApiParam(value = "dinky任务id") @RequestParam Long dinkyTaskId) {
List<TaskMainInfo> taskMainInfos = schedulerService.getTaskMainInfos(dinkyTaskId);
if (CollUtil.isEmpty(taskMainInfos)) {
return Result.failed();
}
return Result.succeed(taskMainInfos);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,15 @@
import org.dinky.scheduler.model.DinkyTaskParams;
import org.dinky.scheduler.model.DinkyTaskRequest;
import org.dinky.scheduler.model.ProcessDefinition;
import org.dinky.scheduler.model.ProcessTaskRelation;
import org.dinky.scheduler.model.Project;
import org.dinky.scheduler.model.TaskDefinition;
import org.dinky.scheduler.model.TaskMainInfo;
import org.dinky.scheduler.model.TaskRequest;
import org.dinky.service.CatalogueService;
import org.dinky.service.SchedulerService;
import org.dinky.utils.JsonUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

Expand Down Expand Up @@ -76,6 +77,13 @@ public class SchedulerServiceImpl implements SchedulerService {
*/
@Override
public boolean pushAddTask(DinkyTaskRequest dinkyTaskRequest) {
// Use root catalog as process (workflow) name.
Catalogue catalogue = catalogueService.getOne(
new LambdaQueryWrapper<Catalogue>().eq(Catalogue::getTaskId, dinkyTaskRequest.getTaskId()));
if (catalogue == null) {
log.error(Status.DS_GET_NODE_LIST_ERROR.getMessage());
throw new BusException(Status.DS_GET_NODE_LIST_ERROR);
}

DinkyTaskParams dinkyTaskParams = new DinkyTaskParams();
dinkyTaskParams.setTaskId(dinkyTaskRequest.getTaskId());
Expand All @@ -84,77 +92,91 @@ public boolean pushAddTask(DinkyTaskRequest dinkyTaskRequest) {
dinkyTaskRequest.setTaskParams(JSONUtil.parseObj(dinkyTaskParams).toString());
dinkyTaskRequest.setTaskType(TASK_TYPE);

Catalogue catalogue = catalogueService.getOne(
new LambdaQueryWrapper<Catalogue>().eq(Catalogue::getTaskId, dinkyTaskRequest.getTaskId()));
if (catalogue == null) {
log.error(Status.DS_GET_NODE_LIST_ERROR.getMessage());
throw new BusException(Status.DS_GET_NODE_LIST_ERROR);
}

String processName = getDinkyNames(catalogue, 0);
long projectCode = SystemInit.getProject().getCode();
// Get process from dolphin scheduler
ProcessDefinition process = processClient.getProcessDefinitionInfo(projectCode, processName);

String taskName = catalogue.getName() + ":" + catalogue.getId();
dinkyTaskRequest.setName(taskName);

TaskRequest taskRequest = new TaskRequest();
JSONArray array = new JSONArray();
Long taskCode = taskClient.genTaskCode(projectCode);

// If the process does not exist, a process needs to be created.
if (process == null) {
dinkyTaskRequest.setCode(taskCode);
BeanUtil.copyProperties(dinkyTaskRequest, taskRequest);
taskRequest.setTimeoutFlag(dinkyTaskRequest.getTimeoutFlag());
taskRequest.setFlag(dinkyTaskRequest.getFlag());
taskRequest.setIsCache(dinkyTaskRequest.getIsCache());
JSONObject jsonObject = JSONUtil.parseObj(taskRequest);
array.set(jsonObject);
JSONArray taskArray = new JSONArray();
taskArray.set(jsonObject);
log.info(Status.DS_ADD_WORK_FLOW_DEFINITION_SUCCESS.getMessage());
// 随机出一个 x y 坐标

DagNodeLocation dagNodeLocation = new DagNodeLocation();
dagNodeLocation.setTaskCode(taskCode);
dagNodeLocation.setX(RandomUtil.randomLong(200, 500));
dagNodeLocation.setY(RandomUtil.randomLong(100, 400));
log.info("DagNodeLocation Info: {}", dagNodeLocation);

ProcessTaskRelation processTaskRelation = ProcessTaskRelation.generateProcessTaskRelation(taskCode);
JSONObject processTaskRelationJSONObject = JSONUtil.parseObj(processTaskRelation);
JSONArray taskRelationArray = new JSONArray();
taskRelationArray.set(processTaskRelationJSONObject);

processClient.createOrUpdateProcessDefinition(
projectCode, null, processName, taskCode, array.toString(), Arrays.asList(dagNodeLocation), false);
projectCode,
null,
processName,
taskCode,
taskRelationArray.toString(),
taskArray.toString(),
Arrays.asList(dagNodeLocation),
false);
return true;
}

if (process != null && process.getReleaseState() == ReleaseState.ONLINE) {
// If the workflow is in an online state, it cannot be updated.
if (process.getReleaseState() == ReleaseState.ONLINE) {
log.error(Status.DS_WORK_FLOW_DEFINITION_ONLINE.getMessage(), processName);
}

TaskMainInfo taskMainInfo = taskClient.getTaskMainInfo(projectCode, processName, taskName, "DINKY");
TaskMainInfo taskMainInfo = taskClient.getTaskMainInfo(projectCode, processName, taskName, TASK_TYPE);
// If task name exist, update task definition.
if (taskMainInfo != null) {
// if task name exist, update task definition
log.warn(Status.DS_WORK_FLOW_DEFINITION_TASK_NAME_EXIST.getMessage(), processName, taskName);
return pushUpdateTask(
projectCode, taskMainInfo.getProcessDefinitionCode(), taskMainInfo.getTaskCode(), dinkyTaskRequest);
}

// If the task does not exist, a dinky task needs to be created.
dinkyTaskRequest.setCode(taskCode);
BeanUtil.copyProperties(dinkyTaskRequest, taskRequest);
taskRequest.setTimeoutFlag(dinkyTaskRequest.getTimeoutFlag());
taskRequest.setFlag(dinkyTaskRequest.getFlag());
taskRequest.setIsCache(dinkyTaskRequest.getIsCache());

String taskDefinitionJsonObj = JSONUtil.toJsonStr(taskRequest);
if (process != null) {
taskClient.createTaskDefinition(
projectCode, process.getCode(), dinkyTaskRequest.getUpstreamCodes(), taskDefinitionJsonObj);
// 更新 process 的 location 信息
updateProcessDefinition(process, taskCode, taskRequest, array, projectCode);
taskClient.createTaskDefinition(
projectCode, process.getCode(), dinkyTaskRequest.getUpstreamCodes(), taskDefinitionJsonObj);
// update the location of process
updateProcessDefinition(process, taskCode, taskRequest, projectCode);

log.info(Status.DS_ADD_TASK_DEFINITION_SUCCESS.getMessage());
return true;
}
return false;
log.info(Status.DS_ADD_TASK_DEFINITION_SUCCESS.getMessage());
return true;
}

private void updateProcessDefinition(
ProcessDefinition process, Long taskCode, TaskRequest taskRequest, JSONArray array, long projectCode) {
JSONObject jsonObject = JSONUtil.parseObj(taskRequest);
array.set(jsonObject);
private void updateProcessDefinition(ProcessDefinition process, Long taskCode, TaskRequest task, long projectCode) {

List<DagNodeLocation> locations = new ArrayList<>();
DagData dagData = processClient.getProcessDefinitionInfo(projectCode, process.getCode());
if (dagData == null) {
log.error(Status.DS_WORK_FLOW_DEFINITION_NOT_EXIST.getMessage());
throw new BusException(Status.DS_WORK_FLOW_DEFINITION_NOT_EXIST);
}
List<ProcessTaskRelation> processTaskRelationList = dagData.getProcessTaskRelationList();
List<TaskDefinition> taskDefinitionList = dagData.getTaskDefinitionList();
List<DagNodeLocation> locations = process.getLocations();

if (CollUtil.isNotEmpty(process.getLocations())) {
boolean matched = process.getLocations().stream().anyMatch(location -> location.getTaskCode() == taskCode);
Expand All @@ -175,13 +197,13 @@ private void updateProcessDefinition(
.getAsLong();
long yMin = process.getLocations().stream()
.mapToLong(DagNodeLocation::getY)
.max()
.min()
.getAsLong();
// 随机出一个 x y 坐标
DagNodeLocation dagNodeLocation = new DagNodeLocation();
dagNodeLocation.setTaskCode(taskCode);
dagNodeLocation.setX(RandomUtil.randomLong(xMax, xMin));
dagNodeLocation.setY(RandomUtil.randomLong(yMax, yMin));
dagNodeLocation.setX(RandomUtil.randomLong(xMin == xMax ? 0 : xMin, xMax));
dagNodeLocation.setY(RandomUtil.randomLong(yMin == yMax ? 0 : yMin, yMax));
locations = process.getLocations();
locations.add(dagNodeLocation);
}
Expand All @@ -194,13 +216,27 @@ private void updateProcessDefinition(
locations.add(dagNodeLocation);
}

JSONArray taskArray = new JSONArray();
taskDefinitionList.removeIf(taskDefinition -> (task.getName()).equalsIgnoreCase(taskDefinition.getName()));

taskArray.addAll(taskDefinitionList);
taskArray.add(task);
String processTaskRelationListJson = JsonUtils.toJsonString(processTaskRelationList);

processClient.createOrUpdateProcessDefinition(
projectCode, process.getCode(), process.getName(), taskCode, array.toString(), locations, true);
projectCode,
process.getCode(),
process.getName(),
taskCode,
processTaskRelationListJson,
taskArray.toString(),
locations,
true);
log.info(
Status.DS_PROCESS_DEFINITION_UPDATE.getMessage(),
process.getName(),
taskCode,
array.toString(),
taskArray.toString(),
locations);
}

Expand Down Expand Up @@ -251,14 +287,13 @@ public boolean pushUpdateTask(
BeanUtil.copyProperties(dinkyTaskRequest, taskRequest);
taskRequest.setTimeoutFlag(dinkyTaskRequest.getTimeoutFlag());
taskRequest.setFlag(dinkyTaskRequest.getFlag());
taskRequest.setIsCache(dinkyTaskRequest.getIsCache());

String taskDefinitionJsonObj = JSONUtil.toJsonStr(taskRequest);
Long updatedTaskDefinition = taskClient.updateTaskDefinition(
projectCode, taskCode, dinkyTaskRequest.getUpstreamCodes(), taskDefinitionJsonObj);
JSONObject jsonObject = JSONUtil.parseObj(taskRequest);
JSONArray array = new JSONArray();
array.set(jsonObject);
updateProcessDefinition(process, taskCode, taskRequest, array, projectCode);

updateProcessDefinition(process, taskCode, taskRequest, projectCode);
if (updatedTaskDefinition != null && updatedTaskDefinition > 0) {
log.info(Status.MODIFY_SUCCESS.getMessage());
return true;
Expand Down Expand Up @@ -309,7 +344,7 @@ public TaskDefinition getTaskDefinitionInfo(long dinkyTaskId) {

String processName = getDinkyNames(catalogue, 0);
String taskName = catalogue.getName() + ":" + catalogue.getId();
TaskMainInfo taskMainInfo = taskClient.getTaskMainInfo(projectCode, processName, taskName, "DINKY");
TaskMainInfo taskMainInfo = taskClient.getTaskMainInfo(projectCode, processName, taskName, TASK_TYPE);
TaskDefinition taskDefinition = null;
if (taskMainInfo == null) {
log.error(Status.DS_WORK_FLOW_DEFINITION_NOT_EXIST.getMessage(), processName, taskName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import org.dinky.scheduler.result.Result;
import org.dinky.scheduler.utils.MyJSONUtil;
import org.dinky.scheduler.utils.ParamUtil;
import org.dinky.scheduler.utils.ReadFileUtil;
import org.dinky.utils.JsonUtils;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -75,7 +75,7 @@ public List<ProcessDefinition> getProcessDefinition(Long projectCode, String pro
.getDolphinschedulerToken()
.getValue())
.form(ParamUtil.getPageParams(processName))
.timeout(5000)
.timeout(20000)
.execute()
.body();
PageInfo<JSONObject> data = MyJSONUtil.toPageBean(content);
Expand All @@ -85,7 +85,11 @@ public List<ProcessDefinition> getProcessDefinition(Long projectCode, String pro
}

for (JSONObject jsonObject : data.getTotalList()) {
lists.add(MyJSONUtil.toBean(jsonObject, ProcessDefinition.class));
ProcessDefinition processDefinition = MyJSONUtil.toBean(jsonObject, ProcessDefinition.class);
// The locations of processDefinition is json string
List<DagNodeLocation> locations = jsonObject.getBeanList("locations", DagNodeLocation.class);
processDefinition.setLocations(locations);
lists.add(processDefinition);
}
return lists;
}
Expand Down Expand Up @@ -127,7 +131,7 @@ public DagData getProcessDefinitionInfo(Long projectCode, Long processCode) {
SystemConfiguration.getInstances()
.getDolphinschedulerToken()
.getValue())
.timeout(5000)
.timeout(20000)
.execute()
.body();

Expand All @@ -148,6 +152,7 @@ public ProcessDefinition createOrUpdateProcessDefinition(
Long processCode,
String processName,
Long taskCode,
String taskRelationJson,
String taskDefinitionJson,
List<DagNodeLocation> locations,
boolean isModify) {
Expand All @@ -160,8 +165,8 @@ public ProcessDefinition createOrUpdateProcessDefinition(
params.put("name", processName);
params.put("description", "系统添加");
params.put("tenantCode", "default");
params.put("locations", locations);
params.put("taskRelationJson", ReadFileUtil.taskRelation(Collections.singletonMap("code", taskCode)));
params.put("locations", JsonUtils.toJsonString(locations));
params.put("taskRelationJson", taskRelationJson);
params.put("taskDefinitionJson", taskDefinitionJson);
params.put("executionType", "PARALLEL");

Expand All @@ -178,7 +183,7 @@ public ProcessDefinition createOrUpdateProcessDefinition(
.getDolphinschedulerToken()
.getValue())
.form(params)
.timeout(5000)
.timeout(20000)
.execute();
String content = httpResponse.body();
return MyJSONUtil.verifyResult(MyJSONUtil.toBean(content, new TypeReference<Result<ProcessDefinition>>() {}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public TaskDefinition getTaskDefinition(Long projectCode, Long taskCode) {
SystemConfiguration.getInstances()
.getDolphinschedulerToken()
.getValue())
.timeout(5000)
.timeout(20000)
.execute()
.body();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
@Data
public class DagNodeLocation implements Serializable {

private static final long serialVersionUID = -5243356147439794746L;

private long taskCode;
private long x;
private long y;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,16 @@ public class ProcessTaskRelation {

@ApiModelProperty(value = "更新时间")
private Date updateTime;

public static ProcessTaskRelation generateProcessTaskRelation(long taskCode) {
ProcessTaskRelation processTaskRelation = new ProcessTaskRelation();
processTaskRelation.setName("");
processTaskRelation.setPreTaskCode(0);
processTaskRelation.setPreTaskVersion(0);
processTaskRelation.setPostTaskCode(taskCode);
processTaskRelation.setPostTaskVersion(1);
processTaskRelation.setConditionType(ConditionType.NONE);
processTaskRelation.setConditionParams("{}");
return processTaskRelation;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,9 @@ public class TaskDefinition {
@ApiModelProperty(value = "运行标志 yes 正常/no 禁止执行")
private Flag flag;

@ApiModelProperty(value = "Cache run: yes/no ")
private Flag isCache;

@ApiModelProperty(value = "优先级")
private Priority taskPriority;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public class TaskRequest {
@ApiModelProperty(value = "运行标志 yes 正常/no 禁止执行")
private String flag;

@ApiModelProperty(value = "Cache run: yes/no")
private String isCache;

@ApiModelProperty(value = "任务参数 默认DINKY参数")
private String taskParams;

Expand Down
Loading

0 comments on commit fa1ce86

Please sign in to comment.