Skip to content

Commit

Permalink
rename WorkflowPoller -> WorkflowReconciler; update property
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Apr 27, 2021
1 parent 84197ca commit 163f7c4
Show file tree
Hide file tree
Showing 23 changed files with 181 additions and 142 deletions.
3 changes: 0 additions & 3 deletions .codecov.yml

This file was deleted.

15 changes: 0 additions & 15 deletions .github/workflows/issues_triage.yml

This file was deleted.

15 changes: 0 additions & 15 deletions .github/workflows/pull_request_triage.yml

This file was deleted.

2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ Component configuration:
| db | conductor.db.type | "" |
| workflow.indexing.enabled | conductor.indexing.enabled | true |
| conductor.disable.async.workers | conductor.system-task-workers.enabled | true |
| decider.sweep.disable | conductor.workflow-sweeper.enabled | true |
| decider.sweep.disable | conductor.workflow-reconciler.enabled | true |
| conductor.grpc.server.enabled | conductor.grpc-server.enabled | false |
| workflow.external.payload.storage | conductor.external-payload-storage.type | dummy |
| workflow.default.event.processor.enabled | conductor.default-event-processor.enabled | true |
Expand Down
18 changes: 9 additions & 9 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ plugins {
id 'io.spring.dependency-management' version '1.0.9.RELEASE'
id 'java'
id 'application'
id 'jacoco'
id 'nebula.netflixoss' version '9.2.2'
id 'com.github.kt3k.coveralls' version '2.8.2'
id 'org.sonarqube' version '3.1.1'
}

Expand Down Expand Up @@ -112,6 +112,8 @@ allprojects {
showStandardStreams = false
}
}


}

// all client and their related modules are published with Java 8 compatibility
Expand All @@ -123,14 +125,12 @@ allprojects {
}
}

coveralls {
sourceDirs = subprojects.sourceSets.main.allSource.srcDirs.flatten()
jacocoReportPath = "${project.buildDir}/reports/jacoco/report.xml"
}

tasks.coveralls {
group = "Coverage reports"
description = "Uploads the aggregated coverage report to Coveralls"
jacocoTestReport {
reports {
html.enabled = true
xml.enabled = true
csv.enabled = false
}
}

task server {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,47 +12,47 @@
*/
package com.netflix.conductor.contribs.metrics;

import com.netflix.spectator.api.Meter;
import static org.junit.Assert.assertTrue;

import com.netflix.spectator.api.Registry;
import com.netflix.spectator.api.Spectator;
import com.netflix.spectator.micrometer.MicrometerRegistry;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
import org.junit.Assert;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.boot.test.context.runner.ApplicationContextRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.context.annotation.Primary;
import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.junit4.SpringRunner;

import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;

@RunWith(SpringRunner.class)
@Import({PrometheusMetricsConfiguration.class})
@TestPropertySource(properties = {"conductor.metrics-prometheus.enabled=true"})
public class PrometheusMetricsConfigurationTest {


@SuppressWarnings("unchecked")
@Test
public void testCollector() throws IllegalAccessException {
final Optional<Field> registries = Arrays
.stream(Spectator.globalRegistry().getClass().getDeclaredFields())
.filter(f -> f.getName().equals("registries")).findFirst();
Assert.assertTrue(registries.isPresent());
.stream(Spectator.globalRegistry().getClass().getDeclaredFields())
.filter(f -> f.getName().equals("registries")).findFirst();
assertTrue(registries.isPresent());
registries.get().setAccessible(true);

List<Registry> meters = (List<Registry>) registries.get().get(Spectator.globalRegistry());
Assert.assertTrue ( meters.size() > 0);
Optional<Registry> microMeterReg = meters.stream().filter(r -> r.getClass().equals(MicrometerRegistry.class)).findFirst();
Assert.assertTrue(microMeterReg.isPresent());
assertTrue(meters.size() > 0);
Optional<Registry> microMeterReg = meters.stream()
.filter(r -> r.getClass().equals(MicrometerRegistry.class))
.findFirst();
assertTrue(microMeterReg.isPresent());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils;
import com.netflix.conductor.core.utils.ParametersUtils;
import com.netflix.conductor.dao.MetadataDAO;
import org.junit.*;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.mockserver.client.MockServerClient;
import org.mockserver.model.MediaType;
import org.testcontainers.containers.MockServerContainer;
Expand Down Expand Up @@ -367,7 +371,8 @@ public void testOptional() {
ParametersUtils parametersUtils = mock(ParametersUtils.class);
SystemTaskRegistry systemTaskRegistry = mock(SystemTaskRegistry.class);

new DeciderService(parametersUtils, metadataDAO, externalPayloadStorageUtils, systemTaskRegistry, Collections.emptyMap(),
new DeciderService(parametersUtils, metadataDAO, externalPayloadStorageUtils, systemTaskRegistry,
Collections.emptyMap(),
Duration.ofMinutes(60))
.decide(workflow);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.concurrent.ThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
Expand All @@ -37,7 +38,6 @@ public class SchedulerConfiguration implements SchedulingConfigurer {

private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerConfiguration.class);
public static final String SWEEPER_EXECUTOR_NAME = "WorkflowSweeperExecutor";
public static final String EVENT_PROCESSOR_EXECUTOR_NAME = "EventProcessorExecutor";

/**
* Used by some {@link com.netflix.conductor.core.events.queue.ObservableQueue} implementations.
Expand All @@ -59,7 +59,7 @@ public Scheduler scheduler(ConductorProperties properties) {
public Executor sweeperExecutor(ConductorProperties properties) {
if (properties.getSweeperThreadCount() <= 0) {
throw new IllegalStateException("Cannot set workflow sweeper thread count to <=0. To disable workflow "
+ "sweeper, set conductor.workflow-sweeper.enabled=false.");
+ "sweeper, set conductor.workflow-reconciler.enabled=false.");
}
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat("sweeper-thread-%d")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
*/
package com.netflix.conductor.core.events.queue;

import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_WAIT;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -22,26 +24,25 @@
import com.netflix.conductor.core.exception.ApplicationException;
import com.netflix.conductor.core.exception.ApplicationException.Code;
import com.netflix.conductor.service.ExecutionService;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;

import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_WAIT;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

/**
* Monitors and processes messages on the default event queues that Conductor listens on.
* <p>
* The default event queue type is controlled using the property: <code>conductor.default-event-queue.type</code>
*/
@Component
@ConditionalOnProperty(name = "conductor.default-event-queue-processor.enabled", havingValue = "true", matchIfMissing = true)
public class DefaultEventQueueProcessor {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultEventQueueProcessor.class);
Expand All @@ -57,6 +58,7 @@ public DefaultEventQueueProcessor(Map<Status, ObservableQueue> queues, Execution
this.executionService = executionService;
this.objectMapper = objectMapper;
queues.forEach(this::startMonitor);
LOGGER.info("DefaultEventQueueProcessor initialized with {} queues", queues.entrySet().size());
}

private void startMonitor(Status status, ObservableQueue queue) {
Expand Down Expand Up @@ -94,8 +96,8 @@ private void startMonitor(Status status, ObservableQueue queue) {
"No taskRefName found in the message. If there is only one WAIT task, will mark it as completed. {}",
payload);
taskOptional = workflow.getTasks().stream()
.filter(task -> !task.getStatus().isTerminal() && task.getTaskType().equals(
TASK_TYPE_WAIT)).findFirst();
.filter(task -> !task.getStatus().isTerminal()
&& task.getTaskType().equals(TASK_TYPE_WAIT)).findFirst();
} else {
taskOptional = workflow.getTasks().stream().filter(
task -> !task.getStatus().isTerminal() && task.getReferenceTaskName().equals(taskRefName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,13 @@
*/
package com.netflix.conductor.core.execution;

import static com.netflix.conductor.common.metadata.tasks.Task.Status.COMPLETED_WITH_ERRORS;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.IN_PROGRESS;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SCHEDULED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.SKIPPED;
import static com.netflix.conductor.common.metadata.tasks.Task.Status.TIMED_OUT;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TERMINATE;

import com.google.common.annotations.VisibleForTesting;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.metadata.tasks.Task.Status;
Expand All @@ -33,22 +40,24 @@
import com.netflix.conductor.core.utils.ParametersUtils;
import com.netflix.conductor.dao.MetadataDAO;
import com.netflix.conductor.metrics.Monitors;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static com.netflix.conductor.common.metadata.tasks.Task.Status.*;
import static com.netflix.conductor.common.metadata.tasks.TaskType.SUB_WORKFLOW;
import static com.netflix.conductor.common.metadata.tasks.TaskType.TERMINATE;

/**
* Decider evaluates the state of the workflow by inspecting the current state along with the blueprint. The result of
* the evaluation is either to schedule further tasks, complete/fail the workflow or do nothing.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,39 +10,41 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution;
package com.netflix.conductor.core.reconciliation;

import static com.netflix.conductor.core.execution.WorkflowExecutor.DECIDER_QUEUE;

import com.netflix.conductor.core.LifecycleAwareComponent;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.dao.QueueDAO;
import com.netflix.conductor.metrics.Monitors;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.concurrent.CompletableFuture;

import static com.netflix.conductor.core.execution.WorkflowExecutor.DECIDER_QUEUE;

/**
* Periodically polls all running workflows in the system and evaluates them for timeouts and/or maintain consistency.
*/
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Component
@ConditionalOnProperty(name = "conductor.workflow-sweeper.enabled", havingValue = "true", matchIfMissing = true)
public class WorkflowPoller extends LifecycleAwareComponent {
@ConditionalOnProperty(name = "conductor.workflow-reconciler.enabled", havingValue = "true", matchIfMissing = true)
public class WorkflowReconciler extends LifecycleAwareComponent {

private final WorkflowSweeper workflowSweeper;
private final QueueDAO queueDAO;
private final int sweeperThreadCount;

private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowPoller.class);
private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowReconciler.class);

public WorkflowPoller(WorkflowSweeper workflowSweeper, QueueDAO queueDAO, ConductorProperties properties) {
public WorkflowReconciler(WorkflowSweeper workflowSweeper, QueueDAO queueDAO, ConductorProperties properties) {
this.workflowSweeper = workflowSweeper;
this.queueDAO = queueDAO;
this.sweeperThreadCount = properties.getSweeperThreadCount();
LOGGER.info("WorkflowPoller initialized with {} sweeper threads", properties.getSweeperThreadCount());
LOGGER.info("WorkflowReconciler initialized with {} sweeper threads", properties.getSweeperThreadCount());
}

@Scheduled(fixedDelayString = "${conductor.sweep-frequency.millis:500}", initialDelayString = "${conductor.sweep-frequency.millis:500}")
Expand All @@ -65,8 +67,12 @@ public void pollAndSweep() {
recordQueueDepth();
}
} catch (Exception e) {
Monitors.error(WorkflowPoller.class.getSimpleName(), "poll");
Monitors.error(WorkflowReconciler.class.getSimpleName(), "poll");
LOGGER.error("Error when polling for workflows", e);
if (e instanceof InterruptedException) {
// Restore interrupted state...
Thread.currentThread().interrupt();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.core.execution;
package com.netflix.conductor.core.reconciliation;

import com.google.common.annotations.VisibleForTesting;
import com.netflix.conductor.common.metadata.tasks.Task;
import com.netflix.conductor.common.run.Workflow;
import com.netflix.conductor.core.config.ConductorProperties;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry;
import com.netflix.conductor.core.execution.tasks.WorkflowSystemTask;
import com.netflix.conductor.core.utils.QueueUtils;
Expand Down
Loading

0 comments on commit 163f7c4

Please sign in to comment.