Skip to content

Commit

Permalink
removed Task NAME constants and used constants from TaskType
Browse files Browse the repository at this point in the history
  • Loading branch information
Aravindan Ramkumar committed Apr 12, 2021
1 parent 609c422 commit cb8cac3
Show file tree
Hide file tree
Showing 35 changed files with 467 additions and 473 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
* Copyright 2020 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Copyright 2021 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.common.metadata.tasks;

Expand Down Expand Up @@ -90,24 +90,24 @@ public enum RetryLogic {FIXED, EXPONENTIAL_BACKOFF}
// @ProtoField(id = 13)
// private Integer rateLimitPerSecond;

@ProtoField(id = 14)
@ProtoField(id = 14)
private Integer rateLimitPerFrequency;

@ProtoField(id = 15)
@ProtoField(id = 15)
private Integer rateLimitFrequencyInSeconds;

@ProtoField(id = 16)
@ProtoField(id = 16)
private String isolationGroupId;

@ProtoField(id = 17)
@ProtoField(id = 17)
private String executionNameSpace;

@ProtoField(id = 18)
@ProtoField(id = 18)
@OwnerEmailMandatoryConstraint
@Email(message = "ownerEmail should be valid email address")
private String ownerEmail;

@ProtoField(id = 19)
@ProtoField(id = 19)
@Min(value = 0, message = "TaskDef pollTimeoutSeconds: {value} must be >= 0")
private Integer pollTimeoutSeconds;

Expand All @@ -131,7 +131,7 @@ public TaskDef(String name, String description, int retryCount, long timeoutSeco
}

public TaskDef(String name, String description, String ownerEmail, int retryCount,
long timeoutSeconds, long responseTimeoutSeconds) {
long timeoutSeconds, long responseTimeoutSeconds) {
this.name = name;
this.description = description;
this.ownerEmail = ownerEmail;
Expand Down Expand Up @@ -411,29 +411,29 @@ public boolean equals(Object o) {
}
TaskDef taskDef = (TaskDef) o;
return getRetryCount() == taskDef.getRetryCount() &&
getTimeoutSeconds() == taskDef.getTimeoutSeconds() &&
getRetryDelaySeconds() == taskDef.getRetryDelaySeconds() &&
getResponseTimeoutSeconds() == taskDef.getResponseTimeoutSeconds() &&
Objects.equals(getName(), taskDef.getName()) &&
Objects.equals(getDescription(), taskDef.getDescription()) &&
Objects.equals(getInputKeys(), taskDef.getInputKeys()) &&
Objects.equals(getOutputKeys(), taskDef.getOutputKeys()) &&
getTimeoutPolicy() == taskDef.getTimeoutPolicy() &&
getRetryLogic() == taskDef.getRetryLogic() &&
Objects.equals(getConcurrentExecLimit(), taskDef.getConcurrentExecLimit()) &&
Objects.equals(getRateLimitPerFrequency(), taskDef.getRateLimitPerFrequency()) &&
Objects.equals(getInputTemplate(), taskDef.getInputTemplate()) &&
Objects.equals(getIsolationGroupId(), taskDef.getIsolationGroupId()) &&
Objects.equals(getExecutionNameSpace(), taskDef.getExecutionNameSpace()) &&
Objects.equals(getOwnerEmail(), taskDef.getOwnerEmail());
getTimeoutSeconds() == taskDef.getTimeoutSeconds() &&
getRetryDelaySeconds() == taskDef.getRetryDelaySeconds() &&
getResponseTimeoutSeconds() == taskDef.getResponseTimeoutSeconds() &&
Objects.equals(getName(), taskDef.getName()) &&
Objects.equals(getDescription(), taskDef.getDescription()) &&
Objects.equals(getInputKeys(), taskDef.getInputKeys()) &&
Objects.equals(getOutputKeys(), taskDef.getOutputKeys()) &&
getTimeoutPolicy() == taskDef.getTimeoutPolicy() &&
getRetryLogic() == taskDef.getRetryLogic() &&
Objects.equals(getConcurrentExecLimit(), taskDef.getConcurrentExecLimit()) &&
Objects.equals(getRateLimitPerFrequency(), taskDef.getRateLimitPerFrequency()) &&
Objects.equals(getInputTemplate(), taskDef.getInputTemplate()) &&
Objects.equals(getIsolationGroupId(), taskDef.getIsolationGroupId()) &&
Objects.equals(getExecutionNameSpace(), taskDef.getExecutionNameSpace()) &&
Objects.equals(getOwnerEmail(), taskDef.getOwnerEmail());
}

@Override
public int hashCode() {

return Objects.hash(getName(), getDescription(), getRetryCount(), getTimeoutSeconds(), getInputKeys(),
getOutputKeys(), getTimeoutPolicy(), getRetryLogic(), getRetryDelaySeconds(),
getResponseTimeoutSeconds(), getConcurrentExecLimit(), getRateLimitPerFrequency(), getInputTemplate(),
getIsolationGroupId(), getExecutionNameSpace(), getOwnerEmail());
getOutputKeys(), getTimeoutPolicy(), getRetryLogic(), getRetryDelaySeconds(),
getResponseTimeoutSeconds(), getConcurrentExecLimit(), getRateLimitPerFrequency(), getInputTemplate(),
getIsolationGroupId(), getExecutionNameSpace(), getOwnerEmail());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.netflix.conductor.core.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
Expand All @@ -40,10 +39,12 @@
import java.util.List;
import java.util.Map;

import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_HTTP;

/**
* Task that enables calling another HTTP endpoint as part of its execution
*/
@Component(HttpTask.NAME)
@Component(TASK_TYPE_HTTP)
public class HttpTask extends WorkflowSystemTask {

private static final Logger LOGGER = LoggerFactory.getLogger(HttpTask.class);
Expand All @@ -53,8 +54,6 @@ public class HttpTask extends WorkflowSystemTask {
static final String MISSING_REQUEST = "Missing HTTP request. Task input MUST have a '" + REQUEST_PARAMETER_NAME
+ "' key with HttpTask.Input as value. See documentation for HttpTask for required input parameters";

public static final String NAME = "HTTP";

private final TypeReference<Map<String, Object>> mapOfObj = new TypeReference<Map<String, Object>>() {
};
private final TypeReference<List<Object>> listOfObj = new TypeReference<List<Object>>() {
Expand All @@ -63,10 +62,9 @@ public class HttpTask extends WorkflowSystemTask {
protected RestTemplateProvider restTemplateProvider;
private final String requestParameter;

@Autowired
public HttpTask(RestTemplateProvider restTemplateProvider,
ObjectMapper objectMapper) {
this(NAME, restTemplateProvider, objectMapper);
this(TASK_TYPE_HTTP, restTemplateProvider, objectMapper);
}

public HttpTask(String name,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
* Copyright 2020 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Copyright 2021 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.contribs.tasks.kafka;

Expand Down Expand Up @@ -41,13 +41,14 @@
import java.util.concurrent.Future;
import java.util.stream.Collectors;

@Component(KafkaPublishTask.NAME)
import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_KAFKA_PUBLISH;

@Component(TASK_TYPE_KAFKA_PUBLISH)
public class KafkaPublishTask extends WorkflowSystemTask {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPublishTask.class);

static final String REQUEST_PARAMETER_NAME = "kafka_request";
public static final String NAME = "KAFKA_PUBLISH";
private static final String MISSING_REQUEST =
"Missing Kafka request. Task input MUST have a '" + REQUEST_PARAMETER_NAME
+ "' key with KafkaTask.Input as value. See documentation for KafkaTask for required input parameters";
Expand All @@ -62,11 +63,11 @@ public class KafkaPublishTask extends WorkflowSystemTask {

@Autowired
public KafkaPublishTask(KafkaProducerManager clientManager, ObjectMapper objectMapper) {
super(NAME);
super(TASK_TYPE_KAFKA_PUBLISH);
this.requestParameter = REQUEST_PARAMETER_NAME;
this.producerManager = clientManager;
this.objectMapper = objectMapper;
LOGGER.info("KafkaTask initialized...");
LOGGER.info("KafkaTask initialized.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
* Copyright 2020 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Copyright 2021 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.contribs.queue.sqs;

Expand All @@ -18,10 +18,9 @@
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.events.queue.DefaultEventQueueProcessor;
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.core.events.queue.DefaultEventQueueProcessor;
import com.netflix.conductor.core.execution.tasks.Wait;
import com.netflix.conductor.service.ExecutionService;
import org.junit.Before;
import org.junit.BeforeClass;
Expand All @@ -38,6 +37,7 @@
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_WAIT;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -91,15 +91,15 @@ public static void setup() {
task0.setStatus(Status.IN_PROGRESS);
task0.setTaskId("t0");
task0.setReferenceTaskName("t0");
task0.setTaskType(Wait.NAME);
task0.setTaskType(TASK_TYPE_WAIT);
Workflow workflow0 = new Workflow();
workflow0.setWorkflowId("v_0");
workflow0.getTasks().add(task0);

Task task2 = new Task();
task2.setStatus(Status.IN_PROGRESS);
task2.setTaskId("t2");
task2.setTaskType(Wait.NAME);
task2.setTaskType(TASK_TYPE_WAIT);
Workflow workflow2 = new Workflow();
workflow2.setWorkflowId("v_2");
workflow2.getTasks().add(task2);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
/*
* Copyright 2021 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Copyright 2021 Netflix, Inc.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.events.queue;

Expand All @@ -21,18 +21,20 @@
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.exception.ApplicationException.Code;
import com.netflix.conductor.core.execution.tasks.Wait;
import com.netflix.conductor.service.ExecutionService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_WAIT;

/**
* Monitors and processes messages on the default event queues that Conductor listens on.
Expand Down Expand Up @@ -93,7 +95,7 @@ private void startMonitor(Status status, ObservableQueue queue) {
payload);
taskOptional = workflow.getTasks().stream()
.filter(task -> !task.getStatus().isTerminal() && task.getTaskType().equals(
Wait.NAME)).findFirst();
TASK_TYPE_WAIT)).findFirst();
} else {
taskOptional = workflow.getTasks().stream().filter(
task -> !task.getStatus().isTerminal() && task.getReferenceTaskName().equals(taskRefName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.exception.ApplicationException.Code;
import com.netflix.conductor.core.exception.TerminateWorkflowException;
import com.netflix.conductor.core.execution.tasks.SubWorkflow;
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
import com.netflix.conductor.core.execution.tasks.Terminate;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
Expand Down Expand Up @@ -1717,7 +1716,7 @@ private boolean rerunWF(String workflowId, String taskId, Map<String, Object> ta
// If not found look into sub workflows
if (rerunFromTask == null) {
for (Task task : workflow.getTasks()) {
if (task.getTaskType().equalsIgnoreCase(SubWorkflow.NAME)) {
if (task.getTaskType().equalsIgnoreCase(TASK_TYPE_SUB_WORKFLOW)) {
String subWorkflowId = task.getSubWorkflowId();
if (rerunWF(subWorkflowId, taskId, taskInput, null, null)) {
rerunFromTask = task;
Expand Down Expand Up @@ -1760,7 +1759,7 @@ private boolean rerunWF(String workflowId, String taskId, Map<String, Object> ta
rerunFromTask.setRetried(false);
rerunFromTask.setExecuted(false);
rerunFromTask.setExternalOutputPayloadStoragePath(null);
if (rerunFromTask.getTaskType().equalsIgnoreCase(SubWorkflow.NAME)) {
if (rerunFromTask.getTaskType().equalsIgnoreCase(TASK_TYPE_SUB_WORKFLOW)) {
// if task is sub workflow set task as IN_PROGRESS and reset start time
rerunFromTask.setStatus(IN_PROGRESS);
rerunFromTask.setStartTime(System.currentTimeMillis());
Expand Down Expand Up @@ -1812,7 +1811,7 @@ void updateParentWorkflowTask(Workflow subWorkflow) {
}

private void executeSubworkflowTaskAndSyncData(Workflow subWorkflow, Task subWorkflowTask) {
WorkflowSystemTask subWorkflowSystemTask = systemTaskRegistry.get(SubWorkflow.NAME);
WorkflowSystemTask subWorkflowSystemTask = systemTaskRegistry.get(TASK_TYPE_SUB_WORKFLOW);
subWorkflowSystemTask.execute(subWorkflow, subWorkflowTask, this);
// Keep Subworkflow task's data consistent with Subworkflow's.
if (subWorkflowTask.getStatus().isTerminal() && subWorkflowTask.getExternalOutputPayloadStoragePath() != null
Expand Down
Loading

0 comments on commit cb8cac3

Please sign in to comment.