Skip to content

Commit

Permalink
introduced SystemTaskRegistry in place of the registry in WorkflowSys…
Browse files Browse the repository at this point in the history
…temTask.
  • Loading branch information
Aravindan Ramkumar committed Apr 12, 2021
1 parent fe7b7d0 commit 609c422
Show file tree
Hide file tree
Showing 28 changed files with 480 additions and 354 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
* Copyright 2020 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Copyright 2021 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.contribs.tasks.http;

Expand Down Expand Up @@ -206,11 +206,6 @@ public boolean isAsync() {
return true;
}

@Override
public int getRetryTimeInSecond() {
return 60;
}

public static class HttpResponse {

public Object body;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,17 @@
/*
* Copyright 2020 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Copyright 2021 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.contribs.tasks.http;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.response;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -32,15 +24,10 @@
import com.netflix.conductor.contribs.tasks.http.HttpTask.Input;
import com.netflix.conductor.core.execution.DeciderService;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.core.utils.ParametersUtils;
import com.netflix.conductor.dao.MetadataDAO;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand All @@ -50,6 +37,21 @@
import org.testcontainers.containers.MockServerContainer;
import org.testcontainers.utility.DockerImageName;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockserver.model.HttpRequest.request;
import static org.mockserver.model.HttpResponse.response;

@SuppressWarnings("unchecked")
public class HttpTaskTest {

Expand Down Expand Up @@ -365,8 +367,9 @@ public void testOptional() {
MetadataDAO metadataDAO = mock(MetadataDAO.class);
ExternalPayloadStorageUtils externalPayloadStorageUtils = mock(ExternalPayloadStorageUtils.class);
ParametersUtils parametersUtils = mock(ParametersUtils.class);
SystemTaskRegistry systemTaskRegistry = mock(SystemTaskRegistry.class);

new DeciderService(parametersUtils, metadataDAO, externalPayloadStorageUtils, Collections.emptyMap(),
new DeciderService(parametersUtils, metadataDAO, externalPayloadStorageUtils, systemTaskRegistry, Collections.emptyMap(),
Duration.ofMinutes(60))
.decide(workflow);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.execution.mapper.TaskMapper;
import com.netflix.conductor.core.execution.mapper.TaskMapperContext;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.core.utils.IDGenerator;
import com.netflix.conductor.core.utils.ParametersUtils;
Expand Down Expand Up @@ -73,6 +73,7 @@ public class DeciderService {
private final ParametersUtils parametersUtils;
private final ExternalPayloadStorageUtils externalPayloadStorageUtils;
private final MetadataDAO metadataDAO;
private final SystemTaskRegistry systemTaskRegistry;
private final long taskPendingTimeThresholdMins;

private final Map<TaskType, TaskMapper> taskMappers;
Expand All @@ -86,13 +87,15 @@ public class DeciderService {

public DeciderService(ParametersUtils parametersUtils, MetadataDAO metadataDAO,
ExternalPayloadStorageUtils externalPayloadStorageUtils,
SystemTaskRegistry systemTaskRegistry,
@Qualifier("taskProcessorsMap") Map<TaskType, TaskMapper> taskMappers,
@Value("${conductor.app.taskPendingTimeThreshold:60m}") Duration taskPendingTimeThreshold) {
this.metadataDAO = metadataDAO;
this.parametersUtils = parametersUtils;
this.taskMappers = taskMappers;
this.externalPayloadStorageUtils = externalPayloadStorageUtils;
this.taskPendingTimeThresholdMins = taskPendingTimeThreshold.toMinutes();
this.systemTaskRegistry = systemTaskRegistry;
}

public DeciderOutcome decide(Workflow workflow) throws TerminateWorkflowException {
Expand Down Expand Up @@ -160,7 +163,7 @@ private DeciderOutcome decide(final Workflow workflow, List<Task> preScheduledTa
// A new workflow does not enter this code branch
for (Task pendingTask : pendingTasks) {

if (WorkflowSystemTask.is(pendingTask.getTaskType()) && !pendingTask.getStatus().isTerminal()) {
if (systemTaskRegistry.isSystemTask(pendingTask.getTaskType()) && !pendingTask.getStatus().isTerminal()) {
tasksToBeScheduled.putIfAbsent(pendingTask.getReferenceTaskName(), pendingTask);
executedTaskRefNames.remove(pendingTask.getReferenceTaskName());
}
Expand Down Expand Up @@ -388,7 +391,7 @@ List<Task> getNextTask(Workflow workflow, Task task) {
final WorkflowDef workflowDef = workflow.getWorkflowDefinition();

// Get the following task after the last completed task
if (WorkflowSystemTask.is(task.getTaskType()) && TaskType.TASK_TYPE_DECISION.equals(task.getTaskType())) {
if (systemTaskRegistry.isSystemTask(task.getTaskType()) && TaskType.TASK_TYPE_DECISION.equals(task.getTaskType())) {
if (task.getInputData().get("hasChildren") != null) {
return Collections.emptyList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.netflix.conductor.core.exception.ApplicationException.Code;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.execution.tasks.SubWorkflow;
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
import com.netflix.conductor.core.execution.tasks.Terminate;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
Expand Down Expand Up @@ -102,6 +103,7 @@ public class WorkflowExecutor {
private final ExecutionDAOFacade executionDAOFacade;
private final ParametersUtils parametersUtils;
private final WorkflowStatusListener workflowStatusListener;
private final SystemTaskRegistry systemTaskRegistry;

private long activeWorkerLastPollMs;
private final long queueTaskMessagePostponeSecs;
Expand All @@ -116,15 +118,14 @@ public class WorkflowExecutor {
private final Predicate<PollData> validateLastPolledTime = pollData ->
pollData.getLastPollTime() > System.currentTimeMillis() - activeWorkerLastPollMs;

private static final Predicate<Task> SYSTEM_TASK = task -> WorkflowSystemTask.is(task.getTaskType());

private static final Predicate<Task> NON_TERMINAL_TASK = task -> !task.getStatus().isTerminal();

@Autowired
public WorkflowExecutor(DeciderService deciderService, MetadataDAO metadataDAO, QueueDAO queueDAO,
MetadataMapperService metadataMapperService, WorkflowStatusListener workflowStatusListener,
ExecutionDAOFacade executionDAOFacade, ConductorProperties properties,
ExecutionLockService executionLockService,
SystemTaskRegistry systemTaskRegistry,
ParametersUtils parametersUtils) {
this.deciderService = deciderService;
this.metadataDAO = metadataDAO;
Expand All @@ -137,6 +138,7 @@ public WorkflowExecutor(DeciderService deciderService, MetadataDAO metadataDAO,
this.workflowStatusListener = workflowStatusListener;
this.executionLockService = executionLockService;
this.parametersUtils = parametersUtils;
this.systemTaskRegistry = systemTaskRegistry;
}

/**
Expand Down Expand Up @@ -465,7 +467,7 @@ public void resetCallbacksForWorkflow(String workflowId) {

// Get SIMPLE tasks in SCHEDULED state that have callbackAfterSeconds > 0 and set the callbackAfterSeconds to 0
workflow.getTasks().stream()
.filter(task -> SYSTEM_TASK.negate().test(task)
.filter(task -> !systemTaskRegistry.isSystemTask(task.getTaskType())
&& SCHEDULED == task.getStatus()
&& task.getCallbackAfterSeconds() > 0)
.forEach(task -> {
Expand Down Expand Up @@ -924,7 +926,7 @@ public void updateTask(TaskResult taskResult) {

// for system tasks, setting to SCHEDULED would mean restarting the task which is undesirable
// for worker tasks, set status to SCHEDULED and push to the queue
if (!SYSTEM_TASK.test(task) && taskResult.getStatus() == Status.IN_PROGRESS) {
if (!systemTaskRegistry.isSystemTask(task.getTaskType()) && taskResult.getStatus() == Status.IN_PROGRESS) {
task.setStatus(SCHEDULED);
} else {
task.setStatus(valueOf(taskResult.getStatus().name()));
Expand Down Expand Up @@ -1110,8 +1112,8 @@ public boolean decide(String workflowId) {

Workflow workflowInstance = deciderService.populateWorkflowAndTaskData(workflow);
for (Task task : outcome.tasksToBeScheduled) {
if (SYSTEM_TASK.and(NON_TERMINAL_TASK).test(task)) {
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
if (systemTaskRegistry.isSystemTask(task.getTaskType()) && NON_TERMINAL_TASK.test(task)) {
WorkflowSystemTask workflowSystemTask = systemTaskRegistry.get(task.getTaskType());
deciderService.populateTaskData(task);
if (!workflowSystemTask.isAsync() && workflowSystemTask.execute(workflowInstance, task, this)) {
tasksToBeUpdated.add(task);
Expand Down Expand Up @@ -1220,8 +1222,8 @@ List<String> cancelNonTerminalTasks(Workflow workflow) {
if (!task.getStatus().isTerminal()) {
// Cancel the ones which are not completed yet....
task.setStatus(CANCELED);
if (SYSTEM_TASK.test(task)) {
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
if (systemTaskRegistry.isSystemTask(task.getTaskType())) {
WorkflowSystemTask workflowSystemTask = systemTaskRegistry.get(task.getTaskType());
try {
workflowSystemTask.cancel(workflow, task, this);
} catch (Exception e) {
Expand Down Expand Up @@ -1558,16 +1560,16 @@ boolean scheduleTask(Workflow workflow, List<Task> tasks) {
createdTasks = executionDAOFacade.createTasks(tasks);

List<Task> systemTasks = createdTasks.stream()
.filter(SYSTEM_TASK)
.filter(task -> systemTaskRegistry.isSystemTask(task.getTaskType()))
.collect(Collectors.toList());

tasksToBeQueued = createdTasks.stream()
.filter(SYSTEM_TASK.negate())
.filter(task -> !systemTaskRegistry.isSystemTask(task.getTaskType()))
.collect(Collectors.toList());

// Traverse through all the system tasks, start the sync tasks, in case of async queue the tasks
for (Task task : systemTasks) {
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
WorkflowSystemTask workflowSystemTask = systemTaskRegistry.get(task.getTaskType());
if (workflowSystemTask == null) {
throw new ApplicationException(NOT_FOUND, "No system task found by name " + task.getTaskType());
}
Expand Down Expand Up @@ -1810,7 +1812,7 @@ void updateParentWorkflowTask(Workflow subWorkflow) {
}

private void executeSubworkflowTaskAndSyncData(Workflow subWorkflow, Task subWorkflowTask) {
WorkflowSystemTask subWorkflowSystemTask = WorkflowSystemTask.get(SubWorkflow.NAME);
WorkflowSystemTask subWorkflowSystemTask = systemTaskRegistry.get(SubWorkflow.NAME);
subWorkflowSystemTask.execute(subWorkflow, subWorkflowTask, this);
// Keep Subworkflow task's data consistent with Subworkflow's.
if (subWorkflowTask.getStatus().isTerminal() && subWorkflowTask.getExternalOutputPayloadStoragePath() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.utils.QueueUtils;
import com.netflix.conductor.dao.ExecutionDAO;
Expand Down Expand Up @@ -45,23 +46,25 @@ public class WorkflowRepairService {
private final ExecutionDAO executionDAO;
private final QueueDAO queueDAO;
private final ConductorProperties properties;
private SystemTaskRegistry systemTaskRegistry;

// For system task -> Verify the task isAsync(), not isAsyncComplete() and in SCHEDULED or IN_PROGRESS state
// For simple task -> Verify the task is in SCHEDULED state
private final Predicate<Task> isTaskRepairable = task -> {
if (WorkflowSystemTask.is(task.getTaskType())) { // If system task
WorkflowSystemTask workflowSystemTask = WorkflowSystemTask.get(task.getTaskType());
if (systemTaskRegistry.isSystemTask(task.getTaskType())) { // If system task
WorkflowSystemTask workflowSystemTask = systemTaskRegistry.get(task.getTaskType());
return workflowSystemTask.isAsync() && !workflowSystemTask.isAsyncComplete(task) &&
(task.getStatus() == Task.Status.IN_PROGRESS || task.getStatus() == Task.Status.SCHEDULED);
} else { // Else if simple task
return task.getStatus() == Task.Status.SCHEDULED;
}
};

public WorkflowRepairService(ExecutionDAO executionDAO, QueueDAO queueDAO, ConductorProperties properties) {
public WorkflowRepairService(ExecutionDAO executionDAO, QueueDAO queueDAO, ConductorProperties properties, SystemTaskRegistry systemTaskRegistry) {
this.executionDAO = executionDAO;
this.queueDAO = queueDAO;
this.properties = properties;
this.systemTaskRegistry = systemTaskRegistry;
LOGGER.info("WorkflowRepairService Initialized");
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2021 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package com.netflix.conductor.core.execution.tasks;

import org.springframework.stereotype.Component;

import java.util.Collection;
import java.util.Map;
import java.util.Optional;

/**
* A container class that holds a mapping of system task types {@link com.netflix.conductor.common.metadata.tasks.TaskType} to
* {@link WorkflowSystemTask} instances.
*/
@Component
public class SystemTaskRegistry {

private final Map<String, WorkflowSystemTask> registry;

/**
* Spring creates a map of bean names to {@link WorkflowSystemTask} instances and injects it.
* <p>
* NOTE: It is important the {@link WorkflowSystemTask} instances are "qualified" with their respective
* {@link com.netflix.conductor.common.metadata.tasks.TaskType}.
*/
public SystemTaskRegistry(Map<String, WorkflowSystemTask> registry) {
this.registry = registry;
}

public WorkflowSystemTask get(String taskType) {
return Optional.ofNullable(registry.get(taskType))
.orElseThrow(() -> new IllegalStateException(taskType + "not found in " + getClass().getSimpleName()));
}

public boolean isSystemTask(String taskType) {
return registry.containsKey(taskType);
}

public Collection<WorkflowSystemTask> all() {
return registry.values();
}
}
Loading

0 comments on commit 609c422

Please sign in to comment.