Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,12 @@
import org.apache.dolphinscheduler.dao.utils.WorkerGroupUtils;
import org.apache.dolphinscheduler.plugin.task.api.enums.SqlType;
import org.apache.dolphinscheduler.plugin.task.api.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.plugin.task.api.model.ConditionDependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.ConditionDependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentItem;
import org.apache.dolphinscheduler.plugin.task.api.model.DependentTaskModel;
import org.apache.dolphinscheduler.plugin.task.api.model.Property;
import org.apache.dolphinscheduler.plugin.task.api.parameters.ConditionsParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.DependentParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters;
import org.apache.dolphinscheduler.plugin.task.api.parameters.SwitchParameters;
Expand Down Expand Up @@ -2116,22 +2119,10 @@
taskDefinitionLog.setVersion(0);
taskDefinitionLog.setName(taskDefinitionLog.getName());
if (TaskTypeUtils.isSwitchTask(taskDefinitionLog.getTaskType())) {
final String taskParams = taskDefinitionLog.getTaskParams();
final SwitchParameters switchParameters =
JSONUtils.parseObject(taskParams, SwitchParameters.class);
if (switchParameters == null) {
throw new IllegalArgumentException(
"Switch task params: " + taskParams + " is invalid.");
}
SwitchParameters.SwitchResult switchResult = switchParameters.getSwitchResult();
switchResult.getDependTaskList().forEach(switchResultVo -> {
switchResultVo.setNextNode(taskCodeMap.get(switchResultVo.getNextNode()));
});
if (switchResult.getNextNode() != null) {
switchResult.setNextNode(
taskCodeMap.get(switchResult.getNextNode()));
}
taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(switchParameters));
replaceTaskCodeForSwitchTaskParams(taskDefinitionLog, taskCodeMap);
}
if (TaskTypeUtils.isConditionTask(taskDefinitionLog.getTaskType())) {
replaceTaskCodeForConditionTaskParams(taskDefinitionLog, taskCodeMap);
}
}
for (WorkflowTaskRelationLog workflowTaskRelationLog : taskRelationList) {
Expand Down Expand Up @@ -2205,6 +2196,116 @@
}
}

/**
* replace task code references inside the task parameters of a Switch task.
*
* @param taskDefinitionLog the task log to update
* @param taskCodeMap mapping from old task code to new task code
* @throws IllegalArgumentException if taskParams is invalid or cannot be parsed
*/
private void replaceTaskCodeForSwitchTaskParams(TaskDefinitionLog taskDefinitionLog, Map<Long, Long> taskCodeMap) {
final String taskParams = taskDefinitionLog.getTaskParams();
final SwitchParameters switchParameters =
JSONUtils.parseObject(taskParams, SwitchParameters.class);
if (switchParameters == null) {
log.warn("Failed to parse Switch task params: {}", taskParams);
throw new IllegalArgumentException(
"Switch task params: " + taskParams + " is invalid.");
}

// SwitchParameters.nextBranch
if (switchParameters.getNextBranch() != null && taskCodeMap.containsKey(switchParameters.getNextBranch())) {
switchParameters.setNextBranch(taskCodeMap.get(switchParameters.getNextBranch()));
}

// SwitchParameters.SwitchResult
SwitchParameters.SwitchResult switchResult = switchParameters.getSwitchResult();
if (switchResult != null) {
// SwitchParameters.SwitchResult.nextNode
if (switchResult.getNextNode() != null && taskCodeMap.containsKey(switchResult.getNextNode())) {
switchResult.setNextNode(
taskCodeMap.get(switchResult.getNextNode()));
}

// SwitchParameters.SwitchResult.SwitchResultVo.nextNode
switchResult.getDependTaskList().forEach(switchResultVo -> {
switchResultVo.setNextNode(taskCodeMap.get(switchResultVo.getNextNode()));
});
}

taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(switchParameters));
}

/**
* replace task code references inside the task parameters of a Condition task.
*
* @param taskDefinitionLog the task log to update
* @param taskCodeMap mapping from old task code to new task code
* @throws IllegalArgumentException if taskParams is invalid or cannot be parsed
*/
private void replaceTaskCodeForConditionTaskParams(TaskDefinitionLog taskDefinitionLog,

Check failure on line 2246 in dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkflowDefinitionServiceImpl.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this method to reduce its Cognitive Complexity from 34 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=apache-dolphinscheduler&issues=AZsHlgfATtZoSj_GapMa&open=AZsHlgfATtZoSj_GapMa&pullRequest=17774
Map<Long, Long> taskCodeMap) {
final String taskParams = taskDefinitionLog.getTaskParams();
final ConditionsParameters conditionsParameters =
JSONUtils.parseObject(taskParams, ConditionsParameters.class);

if (conditionsParameters == null) {
log.warn("Failed to parse Condition task params: {}", taskParams);
throw new IllegalArgumentException(
"Condition task params: " + taskParams + " is invalid.");
}

// ConditionsParameters.ConditionDependency
ConditionsParameters.ConditionDependency conditionDependency = conditionsParameters.getDependence();
if (conditionDependency != null && CollectionUtils.isNotEmpty(conditionDependency.getDependTaskList())) {
for (ConditionDependentTaskModel conditionDependentTaskModel : conditionDependency.getDependTaskList()) {
if (CollectionUtils.isEmpty(conditionDependentTaskModel.getDependItemList())) {
continue;
}
for (ConditionDependentItem conditionDependentItem : conditionDependentTaskModel.getDependItemList()) {
if (conditionDependentItem == null) {
continue;
}
// ConditionsParameters.ConditionDependency.ConditionDependentTaskModel.ConditionDependentItem.depTaskCode
Long depTaskCode = conditionDependentItem.getDepTaskCode();
if (taskCodeMap.containsKey(depTaskCode)) {
conditionDependentItem.setDepTaskCode(taskCodeMap.get(depTaskCode));
}
}
}
}

// ConditionsParameters.ConditionResult
ConditionsParameters.ConditionResult conditionResult = conditionsParameters.getConditionResult();
if (conditionResult != null) {
// ConditionsParameters.ConditionResult.successNode
if (CollectionUtils.isNotEmpty(conditionResult.getSuccessNode())) {
List<Long> successNode = conditionResult.getSuccessNode().stream()
.map(taskCode -> {
if (taskCode != null && taskCodeMap.containsKey(taskCode)) {
return taskCodeMap.get(taskCode);
}
return taskCode;
}).collect(Collectors.toList());
conditionResult.setSuccessNode(successNode);
}

// ConditionsParameters.ConditionResult.failedNode
if (CollectionUtils.isNotEmpty(conditionResult.getFailedNode())) {
List<Long> failedNode = conditionResult.getFailedNode().stream()
.map(taskCode -> {
if (taskCode != null && taskCodeMap.containsKey(taskCode)) {
return taskCodeMap.get(taskCode);
}
return taskCode;
}).collect(Collectors.toList());
conditionResult.setFailedNode(failedNode);
}
}

taskDefinitionLog.setTaskParams(JSONUtils.toJsonString(conditionsParameters));
}

/**
* get new task name or workflow name when copy or import operate
*
Expand Down
Loading