From 4b1162a80037e91cd0ffae5db3549e686c762d2a Mon Sep 17 00:00:00 2001 From: Anoop Panicker Date: Thu, 4 Mar 2021 14:33:48 -0800 Subject: [PATCH] disable background processes when instance is disabled --- CHANGELOG.md | 2 +- buildViaTravis.sh | 6 +- .../queue/amqp/AMQPObservableQueue.java | 57 ++++++++++---- .../queue/nats/NATSAbstractQueue.java | 58 +++++++++----- .../queue/sqs/SQSObservableQueue.java | 25 +++++- .../contribs/queue/sqs/QueueManagerTest.java | 2 +- .../queue/sqs/SQSObservableQueueTest.java | 4 +- .../core/config/ConductorProperties.java | 13 ---- .../queue/ConductorObservableQueue.java | 35 +++++++-- .../core/events/queue/ObservableQueue.java | 5 +- .../core/execution/WorkflowExecutor.java | 3 +- .../core/execution/WorkflowSweeper.java | 78 +++++++++++-------- .../tasks/SystemTaskWorkerCoordinator.java | 22 +++++- .../core/events/MockObservableQueue.java | 16 +++- .../rest/controllers/QueueAdminResource.java | 5 +- .../rest/controllers/TaskResource.java | 2 +- .../rest/controllers/WorkflowResource.java | 4 +- .../application-integrationtest.properties | 2 +- 18 files changed, 231 insertions(+), 108 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c26497b42..a5553928fb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -120,7 +120,6 @@ Modified properties in the `core` module: | APP_ID | conductor.app.appId | conductor | | workflow.executor.service.max.threads | conductor.app.executorServiceMaxThreadCount | 50 | | decider.sweep.frequency.seconds | conductor.app.sweepFrequency | 30s | -| decider.sweep.disable | conductor.app.sweepDisabled | false | | workflow.sweeper.thread.count | conductor.app.sweeperThreadCount | 5 | | workflow.event.processor.thread.count | conductor.app.eventProcessorThreadCount | 2 | | workflow.event.message.indexing.enabled | conductor.app.eventMessageIndexingEnabled | true | @@ -273,6 +272,7 @@ Modified properties that are used for configuring components: | --- | --- | --- | | db | conductor.db.type | "" | | workflow.indexing.enabled | conductor.indexing.enabled | true | +| decider.sweep.disable | conductor.workflow-sweeper.enabled | true | | conductor.grpc.server.enabled | conductor.grpc-server.enabled | false | | workflow.external.payload.storage | conductor.external-payload-storage.type | dummy | | workflow.default.event.processor.enabled | conductor.default-event-processor.enabled | true | diff --git a/buildViaTravis.sh b/buildViaTravis.sh index 3972967a76..6951163cc5 100755 --- a/buildViaTravis.sh +++ b/buildViaTravis.sh @@ -5,15 +5,15 @@ if [ "$TRAVIS_PULL_REQUEST" != "false" ]; then ./gradlew build coveralls elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" == "" ]; then echo -e 'Build Branch with Snapshot => Branch ['$TRAVIS_BRANCH']' - ./gradlew -Prelease.travisci=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" build snapshot coveralls --info --stacktrace + ./gradlew -Prelease.travisci=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" build snapshot coveralls elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" != "" ]; then echo -e 'Build Branch for Release => Branch ['$TRAVIS_BRANCH'] Tag ['$TRAVIS_TAG']' case "$TRAVIS_TAG" in *-rc\.*) - ./gradlew -Prelease.travisci=true -Prelease.useLastTag=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" candidate coveralls --info --stacktrace + ./gradlew -Prelease.travisci=true -Prelease.useLastTag=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" candidate coveralls ;; *) - ./gradlew -Prelease.travisci=true -Prelease.useLastTag=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" final coveralls --info --stacktrace + ./gradlew -Prelease.travisci=true -Prelease.useLastTag=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" final coveralls ;; esac else diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java index 58b09096f2..3bb2d32916 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/amqp/AMQPObservableQueue.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -38,6 +39,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -63,6 +65,7 @@ public class AMQPObservableQueue implements ObservableQueue { private Channel channel; private final Address[] addresses; protected LinkedBlockingQueue messages = new LinkedBlockingQueue<>(); + private final AtomicBoolean running = new AtomicBoolean(); public AMQPObservableQueue(ConnectionFactory factory, Address[] addresses, boolean useExchange, AMQPSettings settings, int batchSize, int pollTimeInMS) { @@ -128,24 +131,29 @@ public Observable observe() { Observable.OnSubscribe onSubscribe = subscriber -> { Observable interval = Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS); interval.flatMap((Long x) -> { - List available = new LinkedList<>(); - messages.drainTo(available); - - if (!available.isEmpty()) { - AtomicInteger count = new AtomicInteger(0); - StringBuilder buffer = new StringBuilder(); - available.forEach(msg -> { - buffer.append(msg.getId()).append("=").append(msg.getPayload()); - count.incrementAndGet(); - - if (count.get() < available.size()) { - buffer.append(","); - } - }); - LOGGER.info(String.format("Batch from %s to conductor is %s", settings.getQueueOrExchangeName(), - buffer.toString())); + if (!isRunning()) { + LOGGER.debug("Instance disabled, skip listening for messages from RabbitMQ"); + return Observable.from(Collections.emptyList()); + } else { + List available = new LinkedList<>(); + messages.drainTo(available); + + if (!available.isEmpty()) { + AtomicInteger count = new AtomicInteger(0); + StringBuilder buffer = new StringBuilder(); + available.forEach(msg -> { + buffer.append(msg.getId()).append("=").append(msg.getPayload()); + count.incrementAndGet(); + + if (count.get() < available.size()) { + buffer.append(","); + } + }); + LOGGER.info(String.format("Batch from %s to conductor is %s", settings.getQueueOrExchangeName(), + buffer.toString())); + } + return Observable.from(available); } - return Observable.from(available); }).subscribe(subscriber::onNext, subscriber::onError); }; return Observable.create(onSubscribe); @@ -261,6 +269,21 @@ public void close() { closeConnection(); } + @Override + public void start() { + running.set(true); + } + + @Override + public void stop() { + running.set(false); + } + + @Override + public boolean isRunning() { + return running.get(); + } + public static class Builder { private final Address[] addresses; diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSAbstractQueue.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSAbstractQueue.java index 5c1f404efa..1e2ad76814 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSAbstractQueue.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/nats/NATSAbstractQueue.java @@ -15,6 +15,7 @@ import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; import io.nats.client.NUID; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; @@ -51,6 +52,8 @@ public abstract class NATSAbstractQueue implements ObservableQueue { private boolean observable; private boolean isOpened; + private final AtomicBoolean running = new AtomicBoolean(); + NATSAbstractQueue(String queueURI, String queueType, Scheduler scheduler) { this.queueURI = queueURI; this.queueType = queueType; @@ -93,25 +96,29 @@ public Observable observe() { Observable.OnSubscribe onSubscribe = subscriber -> { Observable interval = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler); interval.flatMap((Long x) -> { - List available = new LinkedList<>(); - messages.drainTo(available); - - if (!available.isEmpty()) { - AtomicInteger count = new AtomicInteger(0); - StringBuilder buffer = new StringBuilder(); - available.forEach(msg -> { - buffer.append(msg.getId()).append("=").append(msg.getPayload()); - count.incrementAndGet(); - - if (count.get() < available.size()) { - buffer.append(","); - } - }); - - LOGGER.info(String.format("Batch from %s to conductor is %s", subject, buffer.toString())); + if (!isRunning()) { + LOGGER.debug("Instance disabled, skip listening for messages from NATS Queue"); + return Observable.from(Collections.emptyList()); + } else { + List available = new LinkedList<>(); + messages.drainTo(available); + + if (!available.isEmpty()) { + AtomicInteger count = new AtomicInteger(0); + StringBuilder buffer = new StringBuilder(); + available.forEach(msg -> { + buffer.append(msg.getId()).append("=").append(msg.getPayload()); + count.incrementAndGet(); + + if (count.get() < available.size()) { + buffer.append(","); + } + }); + LOGGER.info(String.format("Batch from %s to conductor is %s", subject, buffer.toString())); + } + + return Observable.from(available); } - - return Observable.from(available); }).subscribe(subscriber::onNext, subscriber::onError); }; return Observable.create(onSubscribe); @@ -254,4 +261,19 @@ void ensureConnected() { abstract void closeSubs(); abstract void closeConn(); + + @Override + public void start() { + running.set(true); + } + + @Override + public void stop() { + running.set(false); + } + + @Override + public boolean isRunning() { + return running.get(); + } } diff --git a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java index 61d4ba57e0..5360e11ccf 100644 --- a/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java +++ b/contribs/src/main/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueue.java @@ -40,6 +40,7 @@ import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; import com.netflix.conductor.metrics.Monitors; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; @@ -67,6 +68,7 @@ public class SQSObservableQueue implements ObservableQueue { private final long pollTimeInMS; private final String queueURL; private final Scheduler scheduler; + private final AtomicBoolean running = new AtomicBoolean(); private SQSObservableQueue(String queueName, AmazonSQSClient client, int visibilityTimeoutInSeconds, int batchSize, long pollTimeInMS, List accountsToAuthorize, Scheduler scheduler) { @@ -143,6 +145,21 @@ public int getVisibilityTimeoutInSeconds() { return visibilityTimeoutInSeconds; } + @Override + public void start() { + running.set(true); + } + + @Override + public void stop() { + running.set(false); + } + + @Override + public boolean isRunning() { + return running.get(); + } + public static class Builder { private String queueName; @@ -296,8 +313,12 @@ OnSubscribe getOnSubscribe() { return subscriber -> { Observable interval = Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS); interval.flatMap((Long x) -> { - List msgs = receiveMessages(); - return Observable.from(msgs); + if (!isRunning()) { + LOGGER.debug("Instance disabled, skip listening for messages from SQS"); + return Observable.from(Collections.emptyList()); + } + List messages = receiveMessages(); + return Observable.from(messages); }).subscribe(subscriber::onNext, subscriber::onError); }; } diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/QueueManagerTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/QueueManagerTest.java index 815cd66a59..b1764bbc6e 100644 --- a/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/QueueManagerTest.java +++ b/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/QueueManagerTest.java @@ -75,6 +75,7 @@ public static void setup() { queue = mock(SQSObservableQueue.class); when(queue.getOrCreateQueue()).thenReturn("junit_queue_url"); + when(queue.isRunning()).thenReturn(true); Answer answer = (Answer>) invocation -> { List copy = new LinkedList<>(messages); messages.clear(); @@ -122,7 +123,6 @@ public static void setup() { }).when(executionService).updateTask(any(Task.class)); } - @Test public void test() throws Exception { queueManager.updateByTaskRefName("v_0", "t0", new HashMap<>(), Status.COMPLETED); diff --git a/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueueTest.java b/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueueTest.java index 98e7fc0bf3..1e43e68e65 100644 --- a/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueueTest.java +++ b/contribs/src/test/java/com/netflix/conductor/contribs/queue/sqs/SQSObservableQueueTest.java @@ -31,6 +31,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -47,6 +48,7 @@ public void test() { when(queue.getOrCreateQueue()).thenReturn("junit_queue_url"); Answer answer = (Answer>) invocation -> Collections.emptyList(); when(queue.receiveMessages()).thenReturn(messages).thenAnswer(answer); + when(queue.isRunning()).thenReturn(true); when(queue.getOnSubscribe()).thenCallRealMethod(); when(queue.observe()).thenCallRealMethod(); @@ -80,6 +82,7 @@ public void testException() { SQSObservableQueue queue = new SQSObservableQueue.Builder() .withQueueName("junit") .withClient(client).build(); + queue.start(); List found = new LinkedList<>(); Observable observable = queue.observe(); @@ -87,7 +90,6 @@ public void testException() { observable.subscribe(found::add); Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS); - assertEquals(1, found.size()); } } diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index af67c50615..f98427abb4 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -47,11 +47,6 @@ public class ConductorProperties { @DurationUnit(ChronoUnit.SECONDS) private Duration sweepFrequency = Duration.ofSeconds(30); - /** - * Used to enable/disable the workflow sweeper. - */ - private boolean sweepDisabled = false; - /** * The number of threads to configure the threadpool in the workflow sweeper. */ @@ -289,14 +284,6 @@ public void setSweepFrequency(Duration sweepFrequency) { this.sweepFrequency = sweepFrequency; } - public boolean isSweepDisabled() { - return sweepDisabled; - } - - public void setSweepDisabled(boolean sweepDisabled) { - this.sweepDisabled = sweepDisabled; - } - public int getSweeperThreadCount() { return sweeperThreadCount; } diff --git a/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorObservableQueue.java b/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorObservableQueue.java index fb22613c2d..60e96a4fa5 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorObservableQueue.java +++ b/core/src/main/java/com/netflix/conductor/core/events/queue/ConductorObservableQueue.java @@ -15,17 +15,18 @@ import com.netflix.conductor.core.config.ConductorProperties; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import rx.Observable; import rx.Observable.OnSubscribe; import rx.Scheduler; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - /** * An {@link ObservableQueue} implementation using the underlying {@link QueueDAO} implementation. */ @@ -41,6 +42,7 @@ public class ConductorObservableQueue implements ObservableQueue { private final int longPollTimeout; private final int pollCount; private final Scheduler scheduler; + private final AtomicBoolean running = new AtomicBoolean(); ConductorObservableQueue(String queueName, QueueDAO queueDAO, ConductorProperties properties, Scheduler scheduler) { this.queueName = queueName; @@ -111,9 +113,28 @@ private OnSubscribe getOnSubscribe() { return subscriber -> { Observable interval = Observable.interval(pollTimeMS, TimeUnit.MILLISECONDS, scheduler); interval.flatMap((Long x) -> { - List msgs = receiveMessages(); - return Observable.from(msgs); + if (!isRunning()) { + LOGGER.debug("Instance disabled, skip listening for messages from Conductor Queue"); + return Observable.from(Collections.emptyList()); + } + List messages = receiveMessages(); + return Observable.from(messages); }).subscribe(subscriber::onNext, subscriber::onError); }; } + + @Override + public void start() { + running.set(true); + } + + @Override + public void stop() { + running.set(false); + } + + @Override + public boolean isRunning() { + return running.get(); + } } diff --git a/core/src/main/java/com/netflix/conductor/core/events/queue/ObservableQueue.java b/core/src/main/java/com/netflix/conductor/core/events/queue/ObservableQueue.java index 4ab7174571..faa4082826 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/queue/ObservableQueue.java +++ b/core/src/main/java/com/netflix/conductor/core/events/queue/ObservableQueue.java @@ -12,11 +12,12 @@ */ package com.netflix.conductor.core.events.queue; +import org.springframework.context.SmartLifecycle; import rx.Observable; import java.util.List; -public interface ObservableQueue { +public interface ObservableQueue extends SmartLifecycle { /** * @return An observable for the given queue @@ -39,7 +40,7 @@ public interface ObservableQueue { String getURI(); /** - * @param messages messages to be ack'ed + * @param messages to be ack'ed * @return the id of the ones which could not be ack'ed */ List ack(List messages); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java index 51d7d1eaf7..86387ec4a8 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowExecutor.java @@ -1695,6 +1695,8 @@ private boolean rerunWF(String workflowId, String taskId, Map ta rerunFromTask.setUpdateTime(0); rerunFromTask.setEndTime(0); rerunFromTask.setOutputData(null); + rerunFromTask.setRetried(false); + rerunFromTask.setExecuted(false); rerunFromTask.setExternalOutputPayloadStoragePath(null); if (rerunFromTask.getTaskType().equalsIgnoreCase(SubWorkflow.NAME)) { // if task is sub workflow set task as IN_PROGRESS and reset start time @@ -1708,7 +1710,6 @@ private boolean rerunWF(String workflowId, String taskId, Map ta } addTaskToQueue(rerunFromTask); } - rerunFromTask.setExecuted(false); executionDAOFacade.updateTask(rerunFromTask); decide(workflowId); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowSweeper.java index 1f5ac18eb8..c42c037598 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/WorkflowSweeper.java @@ -17,11 +17,6 @@ import com.netflix.conductor.core.exception.ApplicationException; import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - import java.util.LinkedList; import java.util.List; import java.util.concurrent.ExecutorService; @@ -29,17 +24,26 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.SmartLifecycle; +import org.springframework.stereotype.Component; @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") -@Service -public class WorkflowSweeper { +@Component +@ConditionalOnProperty(name = "conductor.workflow-sweeper.enabled", havingValue = "true", matchIfMissing = true) +public class WorkflowSweeper implements SmartLifecycle { private static final Logger LOGGER = LoggerFactory.getLogger(WorkflowSweeper.class); - private ExecutorService executorService; + private final ExecutorService executorService; private final ConductorProperties properties; private final QueueDAO queueDAO; private final int executorThreadPoolSize; + private final AtomicBoolean running = new AtomicBoolean(); private static final String CLASS_NAME = WorkflowSweeper.class.getSimpleName(); @@ -48,36 +52,31 @@ public WorkflowSweeper(WorkflowExecutor workflowExecutor, WorkflowRepairService ConductorProperties properties, QueueDAO queueDAO) { this.properties = properties; this.queueDAO = queueDAO; - this.executorThreadPoolSize = properties.getSweeperThreadCount(); - if (this.executorThreadPoolSize > 0) { - this.executorService = Executors.newFixedThreadPool(executorThreadPoolSize); - init(workflowExecutor, workflowRepairService); - LOGGER.info("Workflow Sweeper Initialized"); - } else { - LOGGER.warn("Workflow sweeper is DISABLED"); - } + this.executorThreadPoolSize = properties.getSweeperThreadCount() > 0 ? properties.getSweeperThreadCount() : 1; + this.executorService = Executors.newFixedThreadPool(executorThreadPoolSize); + init(workflowExecutor, workflowRepairService); + LOGGER.info("Workflow Sweeper Initialized"); } public void init(WorkflowExecutor workflowExecutor, WorkflowRepairService workflowRepairService) { ScheduledExecutorService deciderPool = Executors.newScheduledThreadPool(1); deciderPool.scheduleWithFixedDelay(() -> { - try { - boolean disable = properties.isSweepDisabled(); - if (disable) { - LOGGER.info("Workflow sweep is disabled."); - return; + if (!running.get()) { + LOGGER.debug("Instance disabled, skip workflow sweep"); + } else { + try { + int currentQueueSize = queueDAO.getSize(WorkflowExecutor.DECIDER_QUEUE); + LOGGER.debug("Sweeper's current decider queue size: {}", currentQueueSize); + List workflowIds = queueDAO + .pop(WorkflowExecutor.DECIDER_QUEUE, 2 * executorThreadPoolSize, 2000); + if (workflowIds != null) { + LOGGER.debug("Sweeper retrieved {} workflows from the decider queue", workflowIds.size()); + sweep(workflowIds, workflowExecutor, workflowRepairService); + } + } catch (Exception e) { + Monitors.error(CLASS_NAME, "sweep"); + LOGGER.error("Error when sweeping workflow", e); } - List workflowIds = queueDAO - .pop(WorkflowExecutor.DECIDER_QUEUE, 2 * executorThreadPoolSize, 2000); - int currentQueueSize = queueDAO.getSize(WorkflowExecutor.DECIDER_QUEUE); - LOGGER.debug("Sweeper's current deciderqueue size: {}.", currentQueueSize); - int retrievedWorkflows = (workflowIds != null) ? workflowIds.size() : 0; - LOGGER.debug("Sweeper retrieved {} workflows from the decider queue.", retrievedWorkflows); - - sweep(workflowIds, workflowExecutor, workflowRepairService); - } catch (Exception e) { - Monitors.error(CLASS_NAME, "sweep"); - LOGGER.error("Error when sweeping workflow", e); } }, 500, 500, TimeUnit.MILLISECONDS); } @@ -130,4 +129,19 @@ public void sweep(List workflowIds, WorkflowExecutor workflowExecutor, future.get(); } } + + @Override + public void start() { + running.set(true); + } + + @Override + public void stop() { + running.set(false); + } + + @Override + public boolean isRunning() { + return running.get(); + } } diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java index 85e13da915..27d72f05f9 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/SystemTaskWorkerCoordinator.java @@ -19,10 +19,12 @@ import com.netflix.conductor.dao.QueueDAO; import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.service.ExecutionService; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.SmartLifecycle; import org.springframework.context.event.EventListener; import org.springframework.stereotype.Component; @@ -39,7 +41,7 @@ @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") @Component -public class SystemTaskWorkerCoordinator { +public class SystemTaskWorkerCoordinator implements SmartLifecycle { private static final Logger LOGGER = LoggerFactory.getLogger(SystemTaskWorkerCoordinator.class); @@ -59,6 +61,7 @@ public class SystemTaskWorkerCoordinator { private final QueueDAO queueDAO; private final WorkflowExecutor workflowExecutor; private final ExecutionService executionService; + private final AtomicBoolean running = new AtomicBoolean(); public SystemTaskWorkerCoordinator(QueueDAO queueDAO, WorkflowExecutor workflowExecutor, ConductorProperties properties, @@ -115,7 +118,7 @@ private void listen(String queueName) { } private void pollAndExecute(String queueName) { - if (properties.isSystemTaskWorkersDisabled()) { + if (properties.isSystemTaskWorkersDisabled() || !running.get()) { LOGGER.warn("System Task Worker is DISABLED. Not polling for system task in queue : {}", queueName); return; } @@ -142,4 +145,19 @@ boolean isAsyncSystemTask(String queue) { } return false; } + + @Override + public void start() { + running.set(true); + } + + @Override + public void stop() { + running.set(false); + } + + @Override + public boolean isRunning() { + return running.get(); + } } diff --git a/core/src/test/java/com/netflix/conductor/core/events/MockObservableQueue.java b/core/src/test/java/com/netflix/conductor/core/events/MockObservableQueue.java index a3e4b926ba..490a3b26a7 100644 --- a/core/src/test/java/com/netflix/conductor/core/events/MockObservableQueue.java +++ b/core/src/test/java/com/netflix/conductor/core/events/MockObservableQueue.java @@ -14,13 +14,12 @@ import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; -import rx.Observable; - import java.util.Comparator; import java.util.List; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; +import rx.Observable; public class MockObservableQueue implements ObservableQueue { @@ -78,4 +77,17 @@ public long size() { public String toString() { return "MockObservableQueue [uri=" + uri + ", name=" + name + ", type=" + type + "]"; } + + @Override + public void start() { + } + + @Override + public void stop() { + } + + @Override + public boolean isRunning() { + return false; + } } diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/QueueAdminResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/QueueAdminResource.java index 824fcefd12..bc52773c40 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/QueueAdminResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/QueueAdminResource.java @@ -21,6 +21,7 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -49,14 +50,14 @@ public Map names() { @Operation(summary = "Publish a message in queue to mark a wait task as completed.") @PostMapping(value = "/update/{workflowId}/{taskRefName}/{status}") public void update(@PathVariable("workflowId") String workflowId, @PathVariable("taskRefName") String taskRefName, - @PathVariable("status") Status status, Map output) throws Exception { + @PathVariable("status") Status status, @RequestBody Map output) throws Exception { queueManager.updateByTaskRefName(workflowId, taskRefName, output, status); } @Operation(summary = "Publish a message in queue to mark a wait task (by taskId) as completed.") @PostMapping("/update/{workflowId}/task/{taskId}/{status}") public void updateByTaskId(@PathVariable("workflowId") String workflowId, @PathVariable("taskId") String taskId, - @PathVariable("status") Status status, Map output) throws Exception { + @PathVariable("status") Status status, @RequestBody Map output) throws Exception { queueManager.updateByTaskId(workflowId, taskId, output, status); } } diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java index 81a84a831f..6566c6a15e 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/TaskResource.java @@ -105,7 +105,7 @@ public String ack(@PathVariable("taskId") String taskId, @PostMapping("/{taskId}/log") @Operation(summary = "Log Task Execution Details") - public void log(@PathVariable("taskId") String taskId, String log) { + public void log(@PathVariable("taskId") String taskId, @RequestBody String log) { taskService.log(taskId, log); } diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowResource.java index e3addfb42f..dd7960fc83 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowResource.java @@ -81,7 +81,7 @@ public List getWorkflows(@PathVariable("name") String name, public Map> getWorkflows(@PathVariable("name") String name, @RequestParam(value = "includeClosed", defaultValue = "false", required = false) boolean includeClosed, @RequestParam(value = "includeTasks", defaultValue = "false", required = false) boolean includeTasks, - List correlationIds) { + @RequestBody List correlationIds) { return workflowService.getWorkflows(name, includeClosed, includeTasks, correlationIds); } @@ -137,7 +137,7 @@ public void skipTaskFromWorkflow(@PathVariable("workflowId") String workflowId, @PostMapping(value = "/{workflowId}/rerun", produces = TEXT_PLAIN_VALUE) @Operation(summary = "Reruns the workflow from a specific task") public String rerun(@PathVariable("workflowId") String workflowId, - RerunWorkflowRequest request) { + @RequestBody RerunWorkflowRequest request) { return workflowService.rerunWorkflow(workflowId, request); } diff --git a/test-harness/src/test/resources/application-integrationtest.properties b/test-harness/src/test/resources/application-integrationtest.properties index 66358d10ae..10a7c20012 100644 --- a/test-harness/src/test/resources/application-integrationtest.properties +++ b/test-harness/src/test/resources/application-integrationtest.properties @@ -7,7 +7,7 @@ conductor.indexing.enabled=false conductor.app.stack=test conductor.app.appId=conductor -conductor.app.sweepDisabled=false +conductor.workflow-sweeper.enabled=true conductor.app.sweepFrequency=30 conductor.app.systemTaskWorkersDisabled=true