Skip to content

Commit

Permalink
disable background processes when instance is disabled
Browse files Browse the repository at this point in the history
  • Loading branch information
apanicker-nflx committed Mar 10, 2021
1 parent a42ef55 commit 4b1162a
Show file tree
Hide file tree
Showing 18 changed files with 231 additions and 108 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ Modified properties in the `core` module:
| APP_ID | conductor.app.appId | conductor |
| workflow.executor.service.max.threads | conductor.app.executorServiceMaxThreadCount | 50 |
| decider.sweep.frequency.seconds | conductor.app.sweepFrequency | 30s |
| decider.sweep.disable | conductor.app.sweepDisabled | false |
| workflow.sweeper.thread.count | conductor.app.sweeperThreadCount | 5 |
| workflow.event.processor.thread.count | conductor.app.eventProcessorThreadCount | 2 |
| workflow.event.message.indexing.enabled | conductor.app.eventMessageIndexingEnabled | true |
Expand Down Expand Up @@ -273,6 +272,7 @@ Modified properties that are used for configuring components:
| --- | --- | --- |
| db | conductor.db.type | "" |
| workflow.indexing.enabled | conductor.indexing.enabled | true |
| decider.sweep.disable | conductor.workflow-sweeper.enabled | true |
| conductor.grpc.server.enabled | conductor.grpc-server.enabled | false |
| workflow.external.payload.storage | conductor.external-payload-storage.type | dummy |
| workflow.default.event.processor.enabled | conductor.default-event-processor.enabled | true |
Expand Down
6 changes: 3 additions & 3 deletions buildViaTravis.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ if [ "$TRAVIS_PULL_REQUEST" != "false" ]; then
./gradlew build coveralls
elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" == "" ]; then
echo -e 'Build Branch with Snapshot => Branch ['$TRAVIS_BRANCH']'
./gradlew -Prelease.travisci=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" build snapshot coveralls --info --stacktrace
./gradlew -Prelease.travisci=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" build snapshot coveralls
elif [ "$TRAVIS_PULL_REQUEST" == "false" ] && [ "$TRAVIS_TAG" != "" ]; then
echo -e 'Build Branch for Release => Branch ['$TRAVIS_BRANCH'] Tag ['$TRAVIS_TAG']'
case "$TRAVIS_TAG" in
*-rc\.*)
./gradlew -Prelease.travisci=true -Prelease.useLastTag=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" candidate coveralls --info --stacktrace
./gradlew -Prelease.travisci=true -Prelease.useLastTag=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" candidate coveralls
;;
*)
./gradlew -Prelease.travisci=true -Prelease.useLastTag=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" final coveralls --info --stacktrace
./gradlew -Prelease.travisci=true -Prelease.useLastTag=true -PbintrayUser="${bintrayUser}" -PbintrayKey="${bintrayKey}" -PsonatypeUsername="${sonatypeUsername}" -PsonatypePassword="${sonatypePassword}" final coveralls
;;
esac
else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -63,6 +65,7 @@ public class AMQPObservableQueue implements ObservableQueue {
private Channel channel;
private final Address[] addresses;
protected LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
private final AtomicBoolean running = new AtomicBoolean();

public AMQPObservableQueue(ConnectionFactory factory, Address[] addresses, boolean useExchange,
AMQPSettings settings, int batchSize, int pollTimeInMS) {
Expand Down Expand Up @@ -128,24 +131,29 @@ public Observable<Message> observe() {
Observable.OnSubscribe<Message> onSubscribe = subscriber -> {
Observable<Long> interval = Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS);
interval.flatMap((Long x) -> {
List<Message> available = new LinkedList<>();
messages.drainTo(available);

if (!available.isEmpty()) {
AtomicInteger count = new AtomicInteger(0);
StringBuilder buffer = new StringBuilder();
available.forEach(msg -> {
buffer.append(msg.getId()).append("=").append(msg.getPayload());
count.incrementAndGet();

if (count.get() < available.size()) {
buffer.append(",");
}
});
LOGGER.info(String.format("Batch from %s to conductor is %s", settings.getQueueOrExchangeName(),
buffer.toString()));
if (!isRunning()) {
LOGGER.debug("Instance disabled, skip listening for messages from RabbitMQ");
return Observable.from(Collections.emptyList());
} else {
List<Message> available = new LinkedList<>();
messages.drainTo(available);

if (!available.isEmpty()) {
AtomicInteger count = new AtomicInteger(0);
StringBuilder buffer = new StringBuilder();
available.forEach(msg -> {
buffer.append(msg.getId()).append("=").append(msg.getPayload());
count.incrementAndGet();

if (count.get() < available.size()) {
buffer.append(",");
}
});
LOGGER.info(String.format("Batch from %s to conductor is %s", settings.getQueueOrExchangeName(),
buffer.toString()));
}
return Observable.from(available);
}
return Observable.from(available);
}).subscribe(subscriber::onNext, subscriber::onError);
};
return Observable.create(onSubscribe);
Expand Down Expand Up @@ -261,6 +269,21 @@ public void close() {
closeConnection();
}

@Override
public void start() {
running.set(true);
}

@Override
public void stop() {
running.set(false);
}

@Override
public boolean isRunning() {
return running.get();
}

public static class Builder {

private final Address[] addresses;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import io.nats.client.NUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
Expand Down Expand Up @@ -51,6 +52,8 @@ public abstract class NATSAbstractQueue implements ObservableQueue {
private boolean observable;
private boolean isOpened;

private final AtomicBoolean running = new AtomicBoolean();

NATSAbstractQueue(String queueURI, String queueType, Scheduler scheduler) {
this.queueURI = queueURI;
this.queueType = queueType;
Expand Down Expand Up @@ -93,25 +96,29 @@ public Observable<Message> observe() {
Observable.OnSubscribe<Message> onSubscribe = subscriber -> {
Observable<Long> interval = Observable.interval(100, TimeUnit.MILLISECONDS, scheduler);
interval.flatMap((Long x) -> {
List<Message> available = new LinkedList<>();
messages.drainTo(available);

if (!available.isEmpty()) {
AtomicInteger count = new AtomicInteger(0);
StringBuilder buffer = new StringBuilder();
available.forEach(msg -> {
buffer.append(msg.getId()).append("=").append(msg.getPayload());
count.incrementAndGet();

if (count.get() < available.size()) {
buffer.append(",");
}
});

LOGGER.info(String.format("Batch from %s to conductor is %s", subject, buffer.toString()));
if (!isRunning()) {
LOGGER.debug("Instance disabled, skip listening for messages from NATS Queue");
return Observable.from(Collections.emptyList());
} else {
List<Message> available = new LinkedList<>();
messages.drainTo(available);

if (!available.isEmpty()) {
AtomicInteger count = new AtomicInteger(0);
StringBuilder buffer = new StringBuilder();
available.forEach(msg -> {
buffer.append(msg.getId()).append("=").append(msg.getPayload());
count.incrementAndGet();

if (count.get() < available.size()) {
buffer.append(",");
}
});
LOGGER.info(String.format("Batch from %s to conductor is %s", subject, buffer.toString()));
}

return Observable.from(available);
}

return Observable.from(available);
}).subscribe(subscriber::onNext, subscriber::onError);
};
return Observable.create(onSubscribe);
Expand Down Expand Up @@ -254,4 +261,19 @@ void ensureConnected() {
abstract void closeSubs();

abstract void closeConn();

@Override
public void start() {
running.set(true);
}

@Override
public void stop() {
running.set(false);
}

@Override
public boolean isRunning() {
return running.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.netflix.conductor.core.events.queue.Message;
import com.netflix.conductor.core.events.queue.ObservableQueue;
import com.netflix.conductor.metrics.Monitors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class SQSObservableQueue implements ObservableQueue {
private final long pollTimeInMS;
private final String queueURL;
private final Scheduler scheduler;
private final AtomicBoolean running = new AtomicBoolean();

private SQSObservableQueue(String queueName, AmazonSQSClient client, int visibilityTimeoutInSeconds, int batchSize,
long pollTimeInMS, List<String> accountsToAuthorize, Scheduler scheduler) {
Expand Down Expand Up @@ -143,6 +145,21 @@ public int getVisibilityTimeoutInSeconds() {
return visibilityTimeoutInSeconds;
}

@Override
public void start() {
running.set(true);
}

@Override
public void stop() {
running.set(false);
}

@Override
public boolean isRunning() {
return running.get();
}

public static class Builder {

private String queueName;
Expand Down Expand Up @@ -296,8 +313,12 @@ OnSubscribe<Message> getOnSubscribe() {
return subscriber -> {
Observable<Long> interval = Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS);
interval.flatMap((Long x) -> {
List<Message> msgs = receiveMessages();
return Observable.from(msgs);
if (!isRunning()) {
LOGGER.debug("Instance disabled, skip listening for messages from SQS");
return Observable.from(Collections.emptyList());
}
List<Message> messages = receiveMessages();
return Observable.from(messages);
}).subscribe(subscriber::onNext, subscriber::onError);
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ public static void setup() {

queue = mock(SQSObservableQueue.class);
when(queue.getOrCreateQueue()).thenReturn("junit_queue_url");
when(queue.isRunning()).thenReturn(true);
Answer<?> answer = (Answer<List<Message>>) invocation -> {
List<Message> copy = new LinkedList<>(messages);
messages.clear();
Expand Down Expand Up @@ -122,7 +123,6 @@ public static void setup() {
}).when(executionService).updateTask(any(Task.class));
}


@Test
public void test() throws Exception {
queueManager.updateByTaskRefName("v_0", "t0", new HashMap<>(), Status.COMPLETED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand All @@ -47,6 +48,7 @@ public void test() {
when(queue.getOrCreateQueue()).thenReturn("junit_queue_url");
Answer<?> answer = (Answer<List<Message>>) invocation -> Collections.emptyList();
when(queue.receiveMessages()).thenReturn(messages).thenAnswer(answer);
when(queue.isRunning()).thenReturn(true);
when(queue.getOnSubscribe()).thenCallRealMethod();
when(queue.observe()).thenCallRealMethod();

Expand Down Expand Up @@ -80,14 +82,14 @@ public void testException() {
SQSObservableQueue queue = new SQSObservableQueue.Builder()
.withQueueName("junit")
.withClient(client).build();
queue.start();

List<Message> found = new LinkedList<>();
Observable<Message> observable = queue.observe();
assertNotNull(observable);
observable.subscribe(found::add);

Uninterruptibles.sleepUninterruptibly(1000, TimeUnit.MILLISECONDS);

assertEquals(1, found.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ public class ConductorProperties {
@DurationUnit(ChronoUnit.SECONDS)
private Duration sweepFrequency = Duration.ofSeconds(30);

/**
* Used to enable/disable the workflow sweeper.
*/
private boolean sweepDisabled = false;

/**
* The number of threads to configure the threadpool in the workflow sweeper.
*/
Expand Down Expand Up @@ -289,14 +284,6 @@ public void setSweepFrequency(Duration sweepFrequency) {
this.sweepFrequency = sweepFrequency;
}

public boolean isSweepDisabled() {
return sweepDisabled;
}

public void setSweepDisabled(boolean sweepDisabled) {
this.sweepDisabled = sweepDisabled;
}

public int getSweeperThreadCount() {
return sweeperThreadCount;
}
Expand Down
Loading

0 comments on commit 4b1162a

Please sign in to comment.