Skip to content

Commit ca62f66

Browse files
refs #129: Improve the logging for other components
Allow for Shutdown to interrupt message processing threads on shutdown Allow the Shutdown time to be configurable for ConcurrentMessageBroker Fix bug in PrefetchingMessageRetriever desired being off by one
1 parent 78927f9 commit ca62f66

File tree

18 files changed

+501
-88
lines changed

18 files changed

+501
-88
lines changed

examples/core-examples/src/main/java/com/jashmore/sqs/examples/ConcurrentBrokerExample.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.concurrent.Future;
4949
import java.util.concurrent.atomic.AtomicInteger;
5050
import java.util.stream.IntStream;
51+
import javax.annotation.Nullable;
5152
import javax.validation.constraints.Min;
5253
import javax.validation.constraints.PositiveOrZero;
5354

@@ -143,6 +144,17 @@ public String getThreadNameFormat() {
143144
public @PositiveOrZero Long getErrorBackoffTimeInMilliseconds() {
144145
return 0L;
145146
}
147+
148+
@Nullable
149+
@Override
150+
public @PositiveOrZero Long getShutdownTimeoutInSeconds() {
151+
return null;
152+
}
153+
154+
@Override
155+
public boolean shouldInterruptThreadsProcessingMessagesOnShutdown() {
156+
return false;
157+
}
146158
})
147159
);
148160

examples/spring-integration-test-example/src/test/java/it/com/jashmore/sqs/examples/integrationtests/SqsListenerExampleIntegrationTest.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,14 @@
4242
public class SqsListenerExampleIntegrationTest {
4343
static final String QUEUE_NAME = "testQueue";
4444
private static final int QUEUE_MAX_RECEIVE_COUNT = 3;
45+
private static final int VISIBILITY_TIMEOUT_IN_SECONDS = 2;
4546

4647
@ClassRule
4748
public static final LocalSqsRule LOCAL_SQS_RULE = new LocalSqsRule(ImmutableList.of(
48-
SqsQueuesConfig.QueueConfig.builder().queueName(QUEUE_NAME).maxReceiveCount(QUEUE_MAX_RECEIVE_COUNT).visibilityTimeout(5).build()
49+
SqsQueuesConfig.QueueConfig.builder().queueName(QUEUE_NAME)
50+
.maxReceiveCount(QUEUE_MAX_RECEIVE_COUNT)
51+
.visibilityTimeout(VISIBILITY_TIMEOUT_IN_SECONDS)
52+
.build()
4953
));
5054

5155
@Rule
@@ -76,7 +80,7 @@ public void messagesPlacedOntoQueueArePickedUpMessageListener() throws Exception
7680

7781
// act
7882
localSqsAsyncClient.sendMessageToLocalQueue(QUEUE_NAME, "my message");
79-
messageReceivedCountDownLatch.await(5, TimeUnit.SECONDS);
83+
messageReceivedCountDownLatch.await(VISIBILITY_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
8084

8185
// assert
8286
verify(mockSomeService).run("my message");
@@ -89,15 +93,15 @@ public void messageFailingToProcessWillBeProcessedAgain() throws Exception {
8993
final AtomicBoolean processedMessageOnce = new AtomicBoolean();
9094
doAnswer(invocationOnMock -> {
9195
if (!processedMessageOnce.getAndSet(true)) {
92-
throw new RuntimeException("error");
96+
throw new RuntimeException("Expected Test Exception");
9397
}
9498
messageReceivedCountDownLatch.countDown();
9599
return null;
96100
}).when(mockSomeService).run(anyString());
97101

98102
// act
99103
localSqsAsyncClient.sendMessageToLocalQueue(QUEUE_NAME, "my message");
100-
messageReceivedCountDownLatch.await(10, TimeUnit.SECONDS);
104+
messageReceivedCountDownLatch.await(3 * VISIBILITY_TIMEOUT_IN_SECONDS, TimeUnit.SECONDS);
101105

102106
// assert
103107
verify(mockSomeService, times(2)).run("my message");
@@ -110,12 +114,12 @@ public void messageThatContinuesToFailWillBePlacedIntoDlq() throws Exception {
110114
final String queueUrl = localSqsAsyncClient.getQueueUrl(QUEUE_NAME);
111115
doAnswer(invocationOnMock -> {
112116
messageReceivedCountDownLatch.countDown();
113-
throw new RuntimeException("error");
117+
throw new RuntimeException("Expected Test Exception");
114118
}).when(mockSomeService).run(anyString());
115119

116120
// act
117121
localSqsAsyncClient.sendMessageToLocalQueue(QUEUE_NAME, "my message");
118-
messageReceivedCountDownLatch.await(20, TimeUnit.SECONDS);
122+
messageReceivedCountDownLatch.await(VISIBILITY_TIMEOUT_IN_SECONDS * (QUEUE_MAX_RECEIVE_COUNT + 1), TimeUnit.SECONDS);
119123
waitForMessageVisibilityToExpire();
120124

121125
// assert

examples/spring-starter-examples/src/main/java/com/jashmore/sqs/examples/ScheduledQueueListenerEnabler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public void turnOfSqsListener() throws InterruptedException {
2323

2424
messageListenerContainerCoordinator.stopContainer("message-listeners-method");
2525
Thread.sleep(5_000);
26-
log.info("Turning SQS Listener back om");
26+
log.info("Turning SQS Listener back on");
2727
messageListenerContainerCoordinator.startContainer("message-listeners-method");
2828
}
2929
}

examples/spring-starter-examples/src/main/resources/logback.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
<logger name="akka" level="OFF" />
1212
<logger name="io.netty" level="ERROR" />
1313

14-
<logger name="com.jashmore" level="DEBUG" />
14+
<logger name="com.jashmore" level="INFO" />
1515

1616
<root level="info">
1717
<appender-ref ref="STDOUT" />

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/broker/concurrent/CachingConcurrentMessageBrokerProperties.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@ public class CachingConcurrentMessageBrokerProperties implements ConcurrentMessa
2222
/**
2323
* Cache key as only a single value is being loaded into the cache.
2424
*/
25-
private static final int SINGLE_CACHE_VALUE_KEY = 0;
25+
private static final Boolean SINGLE_CACHE_VALUE_KEY = true;
2626

27-
private final LoadingCache<Integer, Integer> cachedConcurrencyLevel;
28-
private final LoadingCache<Integer, Long> cachedPreferredConcurrencyPollingRateInSeconds;
29-
private final LoadingCache<Integer, Long> cachedErrorBackoffTimeInMilliseconds;
27+
private final LoadingCache<Boolean, Integer> cachedConcurrencyLevel;
28+
private final LoadingCache<Boolean, Long> cachedPreferredConcurrencyPollingRateInSeconds;
29+
private final LoadingCache<Boolean, Long> cachedErrorBackoffTimeInMilliseconds;
3030
private final String threadNameFormat;
31+
private final LoadingCache<Boolean, Long> cachedShutdownTimeoutInSeconds;
32+
private final LoadingCache<Boolean, Boolean> interruptThreadsProcessingMessagesOnShutdownCache;
3133

3234
/**
3335
* Constructor.
@@ -50,6 +52,14 @@ public CachingConcurrentMessageBrokerProperties(final int cachingTimeoutInMs,
5052
.build(CacheLoader.from(delegateProperties::getErrorBackoffTimeInMilliseconds));
5153

5254
this.threadNameFormat = delegateProperties.getThreadNameFormat();
55+
56+
this.cachedShutdownTimeoutInSeconds = CacheBuilder.newBuilder()
57+
.expireAfterWrite(cachingTimeoutInMs, TimeUnit.MILLISECONDS)
58+
.build(CacheLoader.from(delegateProperties::getShutdownTimeoutInSeconds));
59+
60+
this.interruptThreadsProcessingMessagesOnShutdownCache = CacheBuilder.newBuilder()
61+
.expireAfterWrite(cachingTimeoutInMs, TimeUnit.MILLISECONDS)
62+
.build(CacheLoader.from(delegateProperties::shouldInterruptThreadsProcessingMessagesOnShutdown));
5363
}
5464

5565
@PositiveOrZero
@@ -76,4 +86,15 @@ public String getThreadNameFormat() {
7686
public Long getErrorBackoffTimeInMilliseconds() {
7787
return cachedErrorBackoffTimeInMilliseconds.getUnchecked(SINGLE_CACHE_VALUE_KEY);
7888
}
89+
90+
@Nullable
91+
@Override
92+
public @PositiveOrZero Long getShutdownTimeoutInSeconds() {
93+
return cachedShutdownTimeoutInSeconds.getUnchecked(SINGLE_CACHE_VALUE_KEY);
94+
}
95+
96+
@Override
97+
public boolean shouldInterruptThreadsProcessingMessagesOnShutdown() {
98+
return interruptThreadsProcessingMessagesOnShutdownCache.getUnchecked(SINGLE_CACHE_VALUE_KEY);
99+
}
79100
}

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/broker/concurrent/ConcurrentMessageBroker.java

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package com.jashmore.sqs.broker.concurrent;
22

33
import static com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerConstants.DEFAULT_BACKOFF_TIME_IN_MS;
4+
import static com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerConstants.DEFAULT_SHUTDOWN_TIME_IN_SECONDS;
45
import static java.util.concurrent.TimeUnit.MILLISECONDS;
5-
import static java.util.concurrent.TimeUnit.MINUTES;
6+
import static java.util.concurrent.TimeUnit.SECONDS;
67

8+
import com.google.common.annotations.VisibleForTesting;
79
import com.google.common.base.Preconditions;
810
import com.google.common.util.concurrent.ThreadFactoryBuilder;
911

@@ -44,11 +46,12 @@ public ConcurrentMessageBroker(final MessageRetriever messageRetriever,
4446
@Override
4547
@SuppressFBWarnings( {"RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"})
4648
public void run() {
49+
log.info("Started ConcurrentMessageBroker background thread");
4750
final ExecutorService concurrentThreadsExecutor = Executors.newCachedThreadPool();
4851
final ExecutorService messageProcessingThreadsExecutor = buildMessageProcessingExecutorService();
4952
final ResizableSemaphore concurrentMessagesBeingProcessedSemaphore = new ResizableSemaphore(0);
5053

51-
while (!Thread.currentThread().isInterrupted()) {
54+
while (true) {
5255
try {
5356
updateConcurrencyLevelIfChanged(concurrentMessagesBeingProcessedSemaphore);
5457

@@ -60,8 +63,7 @@ public void run() {
6063
}
6164
} catch (final InterruptedException interruptedException) {
6265
log.debug("Interrupted exception caught while adding more listeners, shutting down!");
63-
Thread.currentThread().interrupt();
64-
continue;
66+
break;
6567
}
6668

6769
CompletableFuture.supplyAsync(() -> {
@@ -86,15 +88,15 @@ public void run() {
8688
Thread.sleep(errorBackoffTimeInMilliseconds);
8789
} catch (final InterruptedException interruptedException) {
8890
log.debug("Thread interrupted during backoff period");
89-
Thread.currentThread().interrupt();
91+
break;
9092
}
9193
}
9294
}
9395

94-
log.debug("Shutting down message controller");
96+
log.info("ConcurrentMessageBroker background thread shutting down...");
9597
try {
9698
shutdownConcurrentThreads(concurrentThreadsExecutor, messageProcessingThreadsExecutor);
97-
log.debug("Message controller shut down");
99+
log.info("ConcurrentMessageBroker background thread successfully stopped");
98100
} catch (final RuntimeException runtimeException) {
99101
log.error("Exception thrown while waiting for broker to shutdown", runtimeException);
100102
}
@@ -115,19 +117,37 @@ private void updateConcurrencyLevelIfChanged(final ResizableSemaphore resizableS
115117
}
116118
}
117119

118-
private void shutdownConcurrentThreads(final ExecutorService concurrentThreadsExecutor,
119-
final ExecutorService messageProcessingThreadsExecutor) {
120+
/**
121+
* Shutdown all of the concurrent threads for retrieving messages by interrupting it but let any threads processing messages gracefully shutdown.
122+
*
123+
* <p>This method is visible for testing due to the difficulty in testing this.
124+
*
125+
* @param concurrentThreadsExecutor the executor for retrieving messages
126+
* @param messageProcessingThreadsExecutor the executor processing messages downloaded
127+
*/
128+
@VisibleForTesting
129+
void shutdownConcurrentThreads(final ExecutorService concurrentThreadsExecutor,
130+
final ExecutorService messageProcessingThreadsExecutor) {
120131
concurrentThreadsExecutor.shutdownNow();
121-
messageProcessingThreadsExecutor.shutdown();
122-
while (!concurrentThreadsExecutor.isTerminated() || !messageProcessingThreadsExecutor.isTerminated()) {
123-
log.debug("Waiting for all threads to finish...");
124-
try {
125-
concurrentThreadsExecutor.awaitTermination(1, MINUTES);
126-
messageProcessingThreadsExecutor.awaitTermination(1, MINUTES);
127-
} catch (final InterruptedException interruptedException) {
128-
log.warn("Interrupted while waiting for all messages to shutdown, some threads may still be running");
129-
return;
132+
if (properties.shouldInterruptThreadsProcessingMessagesOnShutdown()) {
133+
messageProcessingThreadsExecutor.shutdownNow();
134+
} else {
135+
messageProcessingThreadsExecutor.shutdown();
136+
}
137+
log.debug("Waiting for all threads to finish...");
138+
try {
139+
final long shutdownTimeoutInSeconds = getShutdownTimeoutInSeconds();
140+
final long timeNow = System.currentTimeMillis();
141+
final boolean concurrentThreadsShutdown = concurrentThreadsExecutor.awaitTermination(SECONDS.toMillis(shutdownTimeoutInSeconds), MILLISECONDS);
142+
final long leftOverShutdownTimeoutInMilliseconds = System.currentTimeMillis() - timeNow;
143+
final boolean messageProcessingThreadsShutdown = messageProcessingThreadsExecutor.awaitTermination(
144+
leftOverShutdownTimeoutInMilliseconds, MILLISECONDS);
145+
if (!concurrentThreadsShutdown || !messageProcessingThreadsShutdown) {
146+
log.error("Message processing threads did not shutdown within {} seconds", shutdownTimeoutInSeconds);
130147
}
148+
} catch (final InterruptedException interruptedException) {
149+
log.warn("Interrupted while waiting for all messages to shutdown, some threads may still be running");
150+
Thread.currentThread().interrupt();
131151
}
132152
}
133153

@@ -168,6 +188,19 @@ private long getErrorBackoffTimeInMilliseconds() {
168188
);
169189
}
170190

191+
/**
192+
* Get the amount of time in seconds that we should wait for the server to shutdown.
193+
*
194+
* @return the amount of time in seconds to wait for shutdown
195+
*/
196+
private long getShutdownTimeoutInSeconds() {
197+
return PropertyUtils.safelyGetPositiveOrZeroLongValue(
198+
"shutdownTimeoutInSeconds",
199+
properties::getShutdownTimeoutInSeconds,
200+
DEFAULT_SHUTDOWN_TIME_IN_SECONDS
201+
);
202+
}
203+
171204
/**
172205
* Safely get the number of milliseconds that should wait to get a permit for creating a new thread.
173206
*

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/broker/concurrent/ConcurrentMessageBrokerConstants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,9 @@ public class ConcurrentMessageBrokerConstants {
1313
* The default amount of time the thread should wait for a thread to process a message before it tries again and checks the available concurrency.
1414
*/
1515
public static final long DEFAULT_CONCURRENCY_POLLING_IN_MS = 60_000L;
16+
17+
/**
18+
* The default time that the broker will wait for background threads to process during a shutdown.
19+
*/
20+
public static final long DEFAULT_SHUTDOWN_TIME_IN_SECONDS = 60;
1621
}

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/broker/concurrent/ConcurrentMessageBrokerProperties.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,4 +97,24 @@ public interface ConcurrentMessageBrokerProperties {
9797
@Nullable
9898
@PositiveOrZero
9999
Long getErrorBackoffTimeInMilliseconds();
100+
101+
/**
102+
* The number of seconds that the broker should wait for the message processing threads to finish when a shutdown is initiated.
103+
*
104+
* <p>If this value is negative or null, then {@link ConcurrentMessageBrokerConstants#DEFAULT_SHUTDOWN_TIME_IN_SECONDS} will be used for instead.
105+
*
106+
* @return the time in seconds to wait for shutdown of message processing threads
107+
*/
108+
@Nullable
109+
@PositiveOrZero
110+
Long getShutdownTimeoutInSeconds();
111+
112+
/**
113+
* Whether the threads that are processing messages should be interrupted during shutdown of the broker.
114+
*
115+
* <p>Setting this to true is useful if it may take long to process messages and it is undesirable for them to finish before shutting down.
116+
*
117+
* @return whether the message processing threads should be interrupted during shutdown
118+
*/
119+
boolean shouldInterruptThreadsProcessingMessagesOnShutdown();
100120
}

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/broker/concurrent/StaticConcurrentMessageBrokerProperties.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,16 @@ public final class StaticConcurrentMessageBrokerProperties implements Concurrent
3434
private final Long preferredConcurrencyPollingRateInMilliseconds;
3535
private final String threadNameFormat;
3636
private final Long errorBackoffTimeInMilliseconds;
37+
private final Long shutdownTimeoutInSeconds;
38+
private final boolean interruptThreadsProcessingMessagesOnShutdown;
3739

3840
@SuppressFBWarnings("RV_RETURN_VAL")
3941
public StaticConcurrentMessageBrokerProperties(final Integer concurrencyLevel,
4042
final Long preferredConcurrencyPollingRateInMilliseconds,
4143
final String threadNameFormat,
42-
final Long errorBackoffTimeInMilliseconds) {
44+
final Long errorBackoffTimeInMilliseconds,
45+
final Long shutdownTimeoutInSeconds,
46+
final boolean interruptThreadsProcessingMessagesOnShutdown) {
4347
Preconditions.checkArgument(concurrencyLevel == null || concurrencyLevel >= 0, "concurrencyLevel should be greater than or equal to zero");
4448
Preconditions.checkArgument(preferredConcurrencyPollingRateInMilliseconds == null || preferredConcurrencyPollingRateInMilliseconds >= 0,
4549
"preferredConcurrencyPollingRateInMilliseconds should be greater than or equal to zero");
@@ -57,6 +61,8 @@ public StaticConcurrentMessageBrokerProperties(final Integer concurrencyLevel,
5761
//noinspection ResultOfMethodCallIgnored
5862
String.format(Locale.ROOT, threadNameFormat, 1);
5963
}
64+
this.shutdownTimeoutInSeconds = shutdownTimeoutInSeconds;
65+
this.interruptThreadsProcessingMessagesOnShutdown = interruptThreadsProcessingMessagesOnShutdown;
6066
}
6167

6268
@PositiveOrZero
@@ -83,4 +89,15 @@ public String getThreadNameFormat() {
8389
public Long getErrorBackoffTimeInMilliseconds() {
8490
return errorBackoffTimeInMilliseconds;
8591
}
92+
93+
@Nullable
94+
@Override
95+
public @PositiveOrZero Long getShutdownTimeoutInSeconds() {
96+
return shutdownTimeoutInSeconds;
97+
}
98+
99+
@Override
100+
public boolean shouldInterruptThreadsProcessingMessagesOnShutdown() {
101+
return interruptThreadsProcessingMessagesOnShutdown;
102+
}
86103
}

0 commit comments

Comments
 (0)