diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index fae69ce362..20cab7a551 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -1722,6 +1722,7 @@ boolean scheduleTask(WorkflowModel workflow, List tasks) { } if (!workflowSystemTask.isAsync()) { try { + // start execution of synchronous system tasks workflowSystemTask.start(workflow, task, this); } catch (Exception e) { String errorMsg = diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java index c1c50db6ef..8a2f4439cf 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java @@ -24,7 +24,7 @@ import com.netflix.conductor.core.events.EventQueues; import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; -import com.netflix.conductor.core.exception.TransientException; +import com.netflix.conductor.core.exception.NonTransientException; import com.netflix.conductor.core.execution.WorkflowExecutor; import com.netflix.conductor.core.utils.ParametersUtils; import com.netflix.conductor.model.TaskModel; @@ -41,6 +41,8 @@ public class Event extends WorkflowSystemTask { private static final Logger LOGGER = LoggerFactory.getLogger(Event.class); public static final String NAME = "EVENT"; + private static final String EVENT_PRODUCED = "event_produced"; + private final ObjectMapper objectMapper; private final ParametersUtils parametersUtils; private final EventQueues eventQueues; @@ -61,21 +63,35 @@ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor workf payload.put("workflowVersion", workflow.getWorkflowVersion()); payload.put("correlationId", workflow.getCorrelationId()); + task.setStatus(TaskModel.Status.IN_PROGRESS); + task.getOutputData().putAll(payload); + try { - String payloadJson = objectMapper.writeValueAsString(payload); - Message message = new Message(task.getTaskId(), payloadJson, task.getTaskId()); - ObservableQueue queue = getQueue(workflow, task); + task.getOutputData().put(EVENT_PRODUCED, computeQueueName(workflow, task)); + } catch (Exception e) { + task.setStatus(TaskModel.Status.FAILED); + task.setReasonForIncompletion(e.getMessage()); + LOGGER.error( + "Error executing task: {}, workflow: {}", + task.getTaskId(), + workflow.getWorkflowId(), + e); + } + } + + @Override + public boolean execute( + WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) { + try { + String queueName = (String) task.getOutputData().get(EVENT_PRODUCED); + ObservableQueue queue = getQueue(queueName, task.getTaskId()); + Message message = getPopulatedMessage(task); queue.publish(List.of(message)); LOGGER.debug("Published message:{} to queue:{}", message.getId(), queue.getName()); - task.getOutputData().putAll(payload); - task.setStatus( - isAsyncComplete(task) - ? TaskModel.Status.IN_PROGRESS - : TaskModel.Status.COMPLETED); - } catch (TransientException te) { - LOGGER.info( - "A transient backend error happened when task {} tried to publish an event.", - task.getTaskId()); + if (!isAsyncComplete(task)) { + task.setStatus(TaskModel.Status.COMPLETED); + return true; + } } catch (JsonProcessingException jpe) { task.setStatus(TaskModel.Status.FAILED); task.setReasonForIncompletion("Error serializing JSON payload: " + jpe.getMessage()); @@ -92,22 +108,19 @@ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor workf workflow.getWorkflowId(), e); } + return false; } @Override public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) { Message message = new Message(task.getTaskId(), null, task.getTaskId()); - ObservableQueue queue = getQueue(workflow, task); + String queueName = computeQueueName(workflow, task); + ObservableQueue queue = getQueue(queueName, task.getTaskId()); queue.ack(List.of(message)); } - @Override - public boolean isAsync() { - return false; - } - @VisibleForTesting - ObservableQueue getQueue(WorkflowModel workflow, TaskModel task) { + String computeQueueName(WorkflowModel workflow, TaskModel task) { String sinkValueRaw = (String) task.getInputData().get("sink"); Map input = new HashMap<>(); input.put("sink", sinkValueRaw); @@ -135,19 +148,28 @@ ObservableQueue getQueue(WorkflowModel workflow, TaskModel task) { "Invalid / Unsupported sink specified: " + sinkValue); } } + return queueName; + } - task.getOutputData().put("event_produced", queueName); - + @VisibleForTesting + ObservableQueue getQueue(String queueName, String taskId) { try { return eventQueues.getQueue(queueName); } catch (IllegalArgumentException e) { throw new IllegalStateException( - "Error loading queue for name:" + "Error loading queue:" + queueName - + ", sink:" - + sinkValue + + ", for task:" + + taskId + ", error: " + e.getMessage()); + } catch (Exception e) { + throw new NonTransientException("Unable to find queue name for task " + taskId); } } + + Message getPopulatedMessage(TaskModel task) throws JsonProcessingException { + String payloadJson = objectMapper.writeValueAsString(task.getOutputData()); + return new Message(task.getTaskId(), payloadJson, task.getTaskId()); + } } diff --git a/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java b/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java index 008c5202de..ae0d510804 100644 --- a/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java +++ b/core/src/main/java/com/netflix/conductor/core/utils/ExternalPayloadStorageUtils.java @@ -14,7 +14,6 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.util.HashMap; @@ -32,6 +31,7 @@ import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.core.exception.NonTransientException; import com.netflix.conductor.core.exception.TerminateWorkflowException; +import com.netflix.conductor.core.exception.TransientException; import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; @@ -69,7 +69,9 @@ public Map downloadPayload(String path) { try (InputStream inputStream = externalPayloadStorage.download(path)) { return objectMapper.readValue( IOUtils.toString(inputStream, StandardCharsets.UTF_8), Map.class); - } catch (IOException e) { + } catch (TransientException te) { + throw te; + } catch (Exception e) { LOGGER.error("Unable to download payload from external storage path: {}", path, e); throw new NonTransientException( "Unable to download payload from external storage path: " + path, e); @@ -186,7 +188,9 @@ public void verifyAndUpload(T entity, PayloadType payloadType) { break; } } - } catch (IOException e) { + } catch (TransientException te) { + throw te; + } catch (Exception e) { LOGGER.error( "Unable to upload payload to external storage for workflow: {}", workflowId, e); throw new NonTransientException( diff --git a/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/EventSpec.groovy b/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/EventSpec.groovy index d7cddf68c7..837b81b16c 100644 --- a/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/EventSpec.groovy +++ b/core/src/test/groovy/com/netflix/conductor/core/execution/tasks/EventSpec.groovy @@ -93,14 +93,19 @@ class EventSpec extends Specification { event.start(workflow, task, null) then: - task.status == TaskModel.Status.COMPLETED + task.status == TaskModel.Status.IN_PROGRESS verifyOutputData(task, queueName) - 1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': 'conductor'] + + when: + event.execute(workflow, task, null) + + then: + task.status == TaskModel.Status.COMPLETED + verifyOutputData(task, queueName) 1 * eventQueues.getQueue(queueName) >> observableQueue // capture the Message object sent to the publish method. Event.start sends a list with one Message object 1 * observableQueue.publish({ it.size() == 1 }) >> { it -> expectedMessage = it[0][0] as Message } - verifyMessage(expectedMessage, task) } @@ -118,14 +123,19 @@ class EventSpec extends Specification { event.start(workflow, task, null) then: - task.status == TaskModel.Status.COMPLETED + task.status == TaskModel.Status.IN_PROGRESS verifyOutputData(task, queueName) - 1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue] + + when: + event.execute(workflow, task, null) + + then: + task.status == TaskModel.Status.COMPLETED + verifyOutputData(task, queueName) 1 * eventQueues.getQueue(queueName) >> observableQueue // capture the Message object sent to the publish method. Event.start sends a list with one Message object 1 * observableQueue.publish({ it.size() == 1 }) >> { it -> expectedMessage = it[0][0] as Message } - verifyMessage(expectedMessage, task) } @@ -144,14 +154,19 @@ class EventSpec extends Specification { event.start(workflow, task, null) then: - task.status == TaskModel.Status.COMPLETED + task.status == TaskModel.Status.IN_PROGRESS verifyOutputData(task, queueName) - 1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue] + + when: + event.execute(workflow, task, null) + + then: + task.status == TaskModel.Status.COMPLETED + verifyOutputData(task, queueName) 1 * eventQueues.getQueue(queueName) >> observableQueue // capture the Message object sent to the publish method. Event.start sends a list with one Message object 1 * observableQueue.publish({ it.size() == 1 }) >> { it -> expectedMessage = it[0][0] as Message } - verifyMessage(expectedMessage, task) } @@ -168,12 +183,18 @@ class EventSpec extends Specification { then: task.status == TaskModel.Status.IN_PROGRESS verifyOutputData(task, queueName) - 1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': 'conductor'] + + when: + boolean isTaskUpdateRequired = event.execute(workflow, task, null) + + then: + !isTaskUpdateRequired + task.status == TaskModel.Status.IN_PROGRESS + verifyOutputData(task, queueName) 1 * eventQueues.getQueue(queueName) >> observableQueue // capture the Message object sent to the publish method. Event.start sends a list with one Message object 1 * observableQueue.publish({ it.size() == 1 }) >> { args -> expectedMessage = args[0][0] as Message } - verifyMessage(expectedMessage, task) } @@ -189,7 +210,7 @@ class EventSpec extends Specification { then: task.status == TaskModel.Status.FAILED task.reasonForIncompletion != null - + task.reasonForIncompletion.contains('Invalid / Unsupported sink specified:') 1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue] } @@ -205,15 +226,20 @@ class EventSpec extends Specification { when: event.start(workflow, task, null) + then: + task.status == TaskModel.Status.IN_PROGRESS + 1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue] + + when: + event.execute(workflow, task, null) + then: task.status == TaskModel.Status.FAILED task.reasonForIncompletion != null - - 1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue] 1 * eventQueues.getQueue(queueName) >> {throw new IllegalArgumentException() } } - def "publishing to a queue throws a retryable TransientException"() { + def "publishing to a queue throws a TransientException"() { given: String sinkValue = 'conductor' @@ -223,9 +249,14 @@ class EventSpec extends Specification { event.start(workflow, task, null) then: - task.status == TaskModel.Status.SCHEDULED - + task.status == TaskModel.Status.IN_PROGRESS 1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue] + + when: + event.execute(workflow, task, null) + + then: + task.status == TaskModel.Status.FAILED 1 * eventQueues.getQueue(_) >> observableQueue // capture the Message object sent to the publish method. Event.start sends a list with one Message object 1 * observableQueue.publish(_) >> { throw new TransientException("transient error") } @@ -240,11 +271,16 @@ class EventSpec extends Specification { when: event.start(workflow, task, null) + then: + task.status == TaskModel.Status.IN_PROGRESS + 1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue] + + when: + event.execute(workflow, task, null) + then: task.status == TaskModel.Status.FAILED task.reasonForIncompletion != null - - 1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue] 1 * eventQueues.getQueue(_) >> observableQueue // capture the Message object sent to the publish method. Event.start sends a list with one Message object 1 * observableQueue.publish(_) >> { throw new NonTransientException("fatal error") } @@ -259,10 +295,16 @@ class EventSpec extends Specification { when: event.start(workflow, task, null) + then: + task.status == TaskModel.Status.IN_PROGRESS + 1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue] + + when: + event.execute(workflow, task, null) + then: task.status == TaskModel.Status.FAILED task.reasonForIncompletion != null - 1 * objectMapper.writeValueAsString(_ as Map) >> { throw new JsonParseException(null, "invalid json") } } @@ -275,11 +317,16 @@ class EventSpec extends Specification { when: event.start(workflow, task, null) + then: + task.status == TaskModel.Status.IN_PROGRESS + 1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue] + + when: + event.execute(workflow, task, null) + then: task.status == TaskModel.Status.FAILED task.reasonForIncompletion != null - - 1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue] 1 * eventQueues.getQueue(_) >> { throw new NullPointerException("some object is null") } } diff --git a/core/src/test/java/com/netflix/conductor/core/execution/tasks/EventQueueResolutionTest.java b/core/src/test/java/com/netflix/conductor/core/execution/tasks/EventQueueResolutionTest.java index 8d1ed59afa..664ee7c6bc 100644 --- a/core/src/test/java/com/netflix/conductor/core/execution/tasks/EventQueueResolutionTest.java +++ b/core/src/test/java/com/netflix/conductor/core/execution/tasks/EventQueueResolutionTest.java @@ -39,8 +39,8 @@ import static org.junit.Assert.assertNotNull; /** - * Tests the {@link Event#getQueue(WorkflowModel, TaskModel)} method with a real {@link - * ParametersUtils} object. + * Tests the {@link Event#computeQueueName(WorkflowModel, TaskModel)} and {@link + * Event#getQueue(String, String)} methods with a real {@link ParametersUtils} object. */ @ContextConfiguration(classes = {TestObjectMapperConfiguration.class}) @RunWith(SpringRunner.class) @@ -93,28 +93,32 @@ public void testSinkParam() { workflow.getTasks().add(task); Event event = new Event(eventQueues, parametersUtils, objectMapper); - ObservableQueue queue = event.getQueue(workflow, task); + String queueName = event.computeQueueName(workflow, task); + ObservableQueue queue = event.getQueue(queueName, task.getTaskId()); assertNotNull(task.getReasonForIncompletion(), queue); assertEquals("queue_name", queue.getName()); assertEquals("sqs", queue.getType()); sink = "sqs:${t1.output.q}"; task.getInputData().put("sink", sink); - queue = event.getQueue(workflow, task); + queueName = event.computeQueueName(workflow, task); + queue = event.getQueue(queueName, task.getTaskId()); assertNotNull(queue); assertEquals("t1_queue", queue.getName()); assertEquals("sqs", queue.getType()); sink = "sqs:${t2.output.q}"; task.getInputData().put("sink", sink); - queue = event.getQueue(workflow, task); + queueName = event.computeQueueName(workflow, task); + queue = event.getQueue(queueName, task.getTaskId()); assertNotNull(queue); assertEquals("task2_queue", queue.getName()); assertEquals("sqs", queue.getType()); sink = "conductor"; task.getInputData().put("sink", sink); - queue = event.getQueue(workflow, task); + queueName = event.computeQueueName(workflow, task); + queue = event.getQueue(queueName, task.getTaskId()); assertNotNull(queue); assertEquals( workflow.getWorkflowName() + ":" + task.getReferenceTaskName(), queue.getName()); @@ -122,11 +126,11 @@ public void testSinkParam() { sink = "sqs:static_value"; task.getInputData().put("sink", sink); - queue = event.getQueue(workflow, task); + queueName = event.computeQueueName(workflow, task); + queue = event.getQueue(queueName, task.getTaskId()); assertNotNull(queue); assertEquals("static_value", queue.getName()); assertEquals("sqs", queue.getType()); - assertEquals(sink, task.getOutputData().get("event_produced")); } @Test @@ -141,18 +145,17 @@ public void testDynamicSinks() { task.setStatus(TaskModel.Status.IN_PROGRESS); task.getInputData().put("sink", "conductor:some_arbitary_queue"); - ObservableQueue queue = event.getQueue(workflow, task); + String queueName = event.computeQueueName(workflow, task); + ObservableQueue queue = event.getQueue(queueName, task.getTaskId()); assertEquals(TaskModel.Status.IN_PROGRESS, task.getStatus()); assertNotNull(queue); assertEquals("testWorkflow:some_arbitary_queue", queue.getName()); assertEquals("testWorkflow:some_arbitary_queue", queue.getURI()); assertEquals("conductor", queue.getType()); - assertEquals( - "conductor:testWorkflow:some_arbitary_queue", - task.getOutputData().get("event_produced")); task.getInputData().put("sink", "conductor"); - queue = event.getQueue(workflow, task); + queueName = event.computeQueueName(workflow, task); + queue = event.getQueue(queueName, task.getTaskId()); assertEquals( "not in progress: " + task.getReasonForIncompletion(), TaskModel.Status.IN_PROGRESS, @@ -161,7 +164,8 @@ public void testDynamicSinks() { assertEquals("testWorkflow:task0", queue.getName()); task.getInputData().put("sink", "sqs:my_sqs_queue_name"); - queue = event.getQueue(workflow, task); + queueName = event.computeQueueName(workflow, task); + queue = event.getQueue(queueName, task.getTaskId()); assertEquals( "not in progress: " + task.getReasonForIncompletion(), TaskModel.Status.IN_PROGRESS, diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/EventTaskSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/EventTaskSpec.groovy index 43c227640d..3f47fc75b0 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/EventTaskSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/EventTaskSpec.groovy @@ -15,7 +15,9 @@ package com.netflix.conductor.test.integration import org.springframework.beans.factory.annotation.Autowired import com.netflix.conductor.common.metadata.tasks.Task +import com.netflix.conductor.common.metadata.tasks.TaskResult import com.netflix.conductor.common.metadata.tasks.TaskType +import com.netflix.conductor.common.metadata.workflow.WorkflowDef import com.netflix.conductor.common.run.Workflow import com.netflix.conductor.core.execution.tasks.Event import com.netflix.conductor.dao.QueueDAO @@ -48,9 +50,57 @@ class EventTaskSpec extends AbstractSpecification { tasks.size() == 2 tasks[0].taskType == TaskType.EVENT.name() tasks[0].status == Task.Status.COMPLETED + tasks[0].outputData['event_produced'] + tasks[1].taskType == 'integration_task_1' + tasks[1].status == Task.Status.SCHEDULED + } + + when: "The integration_task_1 is polled and completed" + def polledAndCompletedTry1 = workflowTestUtil.pollAndCompleteTask('integration_task_1', 'task1.integration.worker') + + then: "verify that the task was polled and completed and the workflow is in a complete state" + verifyPolledAndAcknowledgedTask(polledAndCompletedTry1) + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.COMPLETED + tasks.size() == 2 + tasks[1].taskType == 'integration_task_1' + tasks[1].status == Task.Status.COMPLETED } + } + + def "Test a workflow with event task that is asyncComplete "() { + setup: "Register a workflow definition with event task as asyncComplete" + def persistedWorkflowDefinition = metadataService.getWorkflowDef(EVENT_BASED_WORKFLOW, 1) + def modifiedWorkflowDefinition = new WorkflowDef() + modifiedWorkflowDefinition.name = persistedWorkflowDefinition.name + modifiedWorkflowDefinition.version = persistedWorkflowDefinition.version + modifiedWorkflowDefinition.tasks = persistedWorkflowDefinition.tasks + modifiedWorkflowDefinition.inputParameters = persistedWorkflowDefinition.inputParameters + modifiedWorkflowDefinition.outputParameters = persistedWorkflowDefinition.outputParameters + modifiedWorkflowDefinition.ownerEmail = persistedWorkflowDefinition.ownerEmail + modifiedWorkflowDefinition.tasks[0].asyncComplete = true + metadataService.updateWorkflowDef([modifiedWorkflowDefinition]) + + when: "The event task workflow is started" + def workflowInstanceId = workflowExecutor.startWorkflow(EVENT_BASED_WORKFLOW, 1, + '', [:], null, null, null) then: "Retrieve the workflow" + with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { + status == Workflow.WorkflowStatus.RUNNING + tasks.size() == 1 + tasks[0].taskType == TaskType.EVENT.name() + tasks[0].status == Task.Status.IN_PROGRESS + tasks[0].outputData['event_produced'] + } + + when: "The event task is updated async using the API" + Task task = workflowExecutionService.getExecutionStatus(workflowInstanceId, true).getTaskByRefName('wait0') + TaskResult taskResult = new TaskResult(task) + taskResult.setStatus(TaskResult.Status.COMPLETED) + workflowExecutor.updateTask(taskResult) + + then: "Ensure that event task is COMPLETED and workflow has progressed" with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.RUNNING tasks.size() == 2 @@ -69,8 +119,14 @@ class EventTaskSpec extends AbstractSpecification { with(workflowExecutionService.getExecutionStatus(workflowInstanceId, true)) { status == Workflow.WorkflowStatus.COMPLETED tasks.size() == 2 + tasks[0].taskType == TaskType.EVENT.name() + tasks[0].status == Task.Status.COMPLETED + tasks[0].outputData['event_produced'] tasks[1].taskType == 'integration_task_1' tasks[1].status == Task.Status.COMPLETED } + + cleanup: "Ensure that the changes to the workflow def are reverted" + metadataService.updateWorkflowDef([persistedWorkflowDefinition]) } } diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/util/WorkflowTestUtil.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/util/WorkflowTestUtil.groovy index 0a1e383cab..6690d4728c 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/util/WorkflowTestUtil.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/util/WorkflowTestUtil.groovy @@ -158,8 +158,8 @@ class WorkflowTestUtil { TaskDef eventTaskX = new TaskDef() eventTaskX.name = 'eventX' - eventTaskX.timeoutSeconds = 1 - eventTaskX.responseTimeoutSeconds = 1 + eventTaskX.timeoutSeconds = 10 + eventTaskX.responseTimeoutSeconds = 10 eventTaskX.ownerEmail = DEFAULT_EMAIL_ADDRESS metadataService.registerTaskDef(