Skip to content

Commit

Permalink
Split event system task implementation between start and execute (Net…
Browse files Browse the repository at this point in the history
…flix#3187)

* event system task implementation is split between start and execute

* added integ test
  • Loading branch information
apanicker-nflx authored Aug 18, 2022
1 parent 52ba7ce commit 8b62e35
Show file tree
Hide file tree
Showing 7 changed files with 200 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1722,6 +1722,7 @@ boolean scheduleTask(WorkflowModel workflow, List<TaskModel> tasks) {
}
if (!workflowSystemTask.isAsync()) {
try {
// start execution of synchronous system tasks
workflowSystemTask.start(workflow, task, this);
} catch (Exception e) {
String errorMsg =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.netflix.conductor.core.events.EventQueues;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.core.exception.TransientException;
import com.netflix.conductor.core.exception.NonTransientException;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.utils.ParametersUtils;
import com.netflix.conductor.model.TaskModel;
Expand All @@ -41,6 +41,8 @@ public class Event extends WorkflowSystemTask {
private static final Logger LOGGER = LoggerFactory.getLogger(Event.class);
public static final String NAME = "EVENT";

private static final String EVENT_PRODUCED = "event_produced";

private final ObjectMapper objectMapper;
private final ParametersUtils parametersUtils;
private final EventQueues eventQueues;
Expand All @@ -61,21 +63,35 @@ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor workf
payload.put("workflowVersion", workflow.getWorkflowVersion());
payload.put("correlationId", workflow.getCorrelationId());

task.setStatus(TaskModel.Status.IN_PROGRESS);
task.getOutputData().putAll(payload);

try {
String payloadJson = objectMapper.writeValueAsString(payload);
Message message = new Message(task.getTaskId(), payloadJson, task.getTaskId());
ObservableQueue queue = getQueue(workflow, task);
task.getOutputData().put(EVENT_PRODUCED, computeQueueName(workflow, task));
} catch (Exception e) {
task.setStatus(TaskModel.Status.FAILED);
task.setReasonForIncompletion(e.getMessage());
LOGGER.error(
"Error executing task: {}, workflow: {}",
task.getTaskId(),
workflow.getWorkflowId(),
e);
}
}

@Override
public boolean execute(
WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {
try {
String queueName = (String) task.getOutputData().get(EVENT_PRODUCED);
ObservableQueue queue = getQueue(queueName, task.getTaskId());
Message message = getPopulatedMessage(task);
queue.publish(List.of(message));
LOGGER.debug("Published message:{} to queue:{}", message.getId(), queue.getName());
task.getOutputData().putAll(payload);
task.setStatus(
isAsyncComplete(task)
? TaskModel.Status.IN_PROGRESS
: TaskModel.Status.COMPLETED);
} catch (TransientException te) {
LOGGER.info(
"A transient backend error happened when task {} tried to publish an event.",
task.getTaskId());
if (!isAsyncComplete(task)) {
task.setStatus(TaskModel.Status.COMPLETED);
return true;
}
} catch (JsonProcessingException jpe) {
task.setStatus(TaskModel.Status.FAILED);
task.setReasonForIncompletion("Error serializing JSON payload: " + jpe.getMessage());
Expand All @@ -92,22 +108,19 @@ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor workf
workflow.getWorkflowId(),
e);
}
return false;
}

@Override
public void cancel(WorkflowModel workflow, TaskModel task, WorkflowExecutor workflowExecutor) {
Message message = new Message(task.getTaskId(), null, task.getTaskId());
ObservableQueue queue = getQueue(workflow, task);
String queueName = computeQueueName(workflow, task);
ObservableQueue queue = getQueue(queueName, task.getTaskId());
queue.ack(List.of(message));
}

@Override
public boolean isAsync() {
return false;
}

@VisibleForTesting
ObservableQueue getQueue(WorkflowModel workflow, TaskModel task) {
String computeQueueName(WorkflowModel workflow, TaskModel task) {
String sinkValueRaw = (String) task.getInputData().get("sink");
Map<String, Object> input = new HashMap<>();
input.put("sink", sinkValueRaw);
Expand Down Expand Up @@ -135,19 +148,28 @@ ObservableQueue getQueue(WorkflowModel workflow, TaskModel task) {
"Invalid / Unsupported sink specified: " + sinkValue);
}
}
return queueName;
}

task.getOutputData().put("event_produced", queueName);

@VisibleForTesting
ObservableQueue getQueue(String queueName, String taskId) {
try {
return eventQueues.getQueue(queueName);
} catch (IllegalArgumentException e) {
throw new IllegalStateException(
"Error loading queue for name:"
"Error loading queue:"
+ queueName
+ ", sink:"
+ sinkValue
+ ", for task:"
+ taskId
+ ", error: "
+ e.getMessage());
} catch (Exception e) {
throw new NonTransientException("Unable to find queue name for task " + taskId);
}
}

Message getPopulatedMessage(TaskModel task) throws JsonProcessingException {
String payloadJson = objectMapper.writeValueAsString(task.getOutputData());
return new Message(task.getTaskId(), payloadJson, task.getTaskId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
Expand All @@ -32,6 +31,7 @@
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.exception.NonTransientException;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.exception.TransientException;
import com.netflix.conductor.metrics.Monitors;
import com.netflix.conductor.model.TaskModel;
import com.netflix.conductor.model.WorkflowModel;
Expand Down Expand Up @@ -69,7 +69,9 @@ public Map<String, Object> downloadPayload(String path) {
try (InputStream inputStream = externalPayloadStorage.download(path)) {
return objectMapper.readValue(
IOUtils.toString(inputStream, StandardCharsets.UTF_8), Map.class);
} catch (IOException e) {
} catch (TransientException te) {
throw te;
} catch (Exception e) {
LOGGER.error("Unable to download payload from external storage path: {}", path, e);
throw new NonTransientException(
"Unable to download payload from external storage path: " + path, e);
Expand Down Expand Up @@ -186,7 +188,9 @@ public <T> void verifyAndUpload(T entity, PayloadType payloadType) {
break;
}
}
} catch (IOException e) {
} catch (TransientException te) {
throw te;
} catch (Exception e) {
LOGGER.error(
"Unable to upload payload to external storage for workflow: {}", workflowId, e);
throw new NonTransientException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,19 @@ class EventSpec extends Specification {
event.start(workflow, task, null)

then:
task.status == TaskModel.Status.COMPLETED
task.status == TaskModel.Status.IN_PROGRESS
verifyOutputData(task, queueName)

1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': 'conductor']

when:
event.execute(workflow, task, null)

then:
task.status == TaskModel.Status.COMPLETED
verifyOutputData(task, queueName)
1 * eventQueues.getQueue(queueName) >> observableQueue
// capture the Message object sent to the publish method. Event.start sends a list with one Message object
1 * observableQueue.publish({ it.size() == 1 }) >> { it -> expectedMessage = it[0][0] as Message }

verifyMessage(expectedMessage, task)
}

Expand All @@ -118,14 +123,19 @@ class EventSpec extends Specification {
event.start(workflow, task, null)

then:
task.status == TaskModel.Status.COMPLETED
task.status == TaskModel.Status.IN_PROGRESS
verifyOutputData(task, queueName)

1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue]

when:
event.execute(workflow, task, null)

then:
task.status == TaskModel.Status.COMPLETED
verifyOutputData(task, queueName)
1 * eventQueues.getQueue(queueName) >> observableQueue
// capture the Message object sent to the publish method. Event.start sends a list with one Message object
1 * observableQueue.publish({ it.size() == 1 }) >> { it -> expectedMessage = it[0][0] as Message }

verifyMessage(expectedMessage, task)
}

Expand All @@ -144,14 +154,19 @@ class EventSpec extends Specification {
event.start(workflow, task, null)

then:
task.status == TaskModel.Status.COMPLETED
task.status == TaskModel.Status.IN_PROGRESS
verifyOutputData(task, queueName)

1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue]

when:
event.execute(workflow, task, null)

then:
task.status == TaskModel.Status.COMPLETED
verifyOutputData(task, queueName)
1 * eventQueues.getQueue(queueName) >> observableQueue
// capture the Message object sent to the publish method. Event.start sends a list with one Message object
1 * observableQueue.publish({ it.size() == 1 }) >> { it -> expectedMessage = it[0][0] as Message }

verifyMessage(expectedMessage, task)
}

Expand All @@ -168,12 +183,18 @@ class EventSpec extends Specification {
then:
task.status == TaskModel.Status.IN_PROGRESS
verifyOutputData(task, queueName)

1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': 'conductor']

when:
boolean isTaskUpdateRequired = event.execute(workflow, task, null)

then:
!isTaskUpdateRequired
task.status == TaskModel.Status.IN_PROGRESS
verifyOutputData(task, queueName)
1 * eventQueues.getQueue(queueName) >> observableQueue
// capture the Message object sent to the publish method. Event.start sends a list with one Message object
1 * observableQueue.publish({ it.size() == 1 }) >> { args -> expectedMessage = args[0][0] as Message }

verifyMessage(expectedMessage, task)
}

Expand All @@ -189,7 +210,7 @@ class EventSpec extends Specification {
then:
task.status == TaskModel.Status.FAILED
task.reasonForIncompletion != null

task.reasonForIncompletion.contains('Invalid / Unsupported sink specified:')
1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue]
}

Expand All @@ -205,15 +226,20 @@ class EventSpec extends Specification {
when:
event.start(workflow, task, null)

then:
task.status == TaskModel.Status.IN_PROGRESS
1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue]

when:
event.execute(workflow, task, null)

then:
task.status == TaskModel.Status.FAILED
task.reasonForIncompletion != null

1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue]
1 * eventQueues.getQueue(queueName) >> {throw new IllegalArgumentException() }
}

def "publishing to a queue throws a retryable TransientException"() {
def "publishing to a queue throws a TransientException"() {
given:
String sinkValue = 'conductor'

Expand All @@ -223,9 +249,14 @@ class EventSpec extends Specification {
event.start(workflow, task, null)

then:
task.status == TaskModel.Status.SCHEDULED

task.status == TaskModel.Status.IN_PROGRESS
1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue]

when:
event.execute(workflow, task, null)

then:
task.status == TaskModel.Status.FAILED
1 * eventQueues.getQueue(_) >> observableQueue
// capture the Message object sent to the publish method. Event.start sends a list with one Message object
1 * observableQueue.publish(_) >> { throw new TransientException("transient error") }
Expand All @@ -240,11 +271,16 @@ class EventSpec extends Specification {
when:
event.start(workflow, task, null)

then:
task.status == TaskModel.Status.IN_PROGRESS
1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue]

when:
event.execute(workflow, task, null)

then:
task.status == TaskModel.Status.FAILED
task.reasonForIncompletion != null

1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue]
1 * eventQueues.getQueue(_) >> observableQueue
// capture the Message object sent to the publish method. Event.start sends a list with one Message object
1 * observableQueue.publish(_) >> { throw new NonTransientException("fatal error") }
Expand All @@ -259,10 +295,16 @@ class EventSpec extends Specification {
when:
event.start(workflow, task, null)

then:
task.status == TaskModel.Status.IN_PROGRESS
1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue]

when:
event.execute(workflow, task, null)

then:
task.status == TaskModel.Status.FAILED
task.reasonForIncompletion != null

1 * objectMapper.writeValueAsString(_ as Map) >> { throw new JsonParseException(null, "invalid json") }
}

Expand All @@ -275,11 +317,16 @@ class EventSpec extends Specification {
when:
event.start(workflow, task, null)

then:
task.status == TaskModel.Status.IN_PROGRESS
1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue]

when:
event.execute(workflow, task, null)

then:
task.status == TaskModel.Status.FAILED
task.reasonForIncompletion != null

1 * parametersUtils.getTaskInputV2(_, workflow, task.taskId, _) >> ['sink': sinkValue]
1 * eventQueues.getQueue(_) >> { throw new NullPointerException("some object is null") }
}

Expand Down
Loading

0 comments on commit 8b62e35

Please sign in to comment.