Skip to content

Commit 67b555e

Browse files
Merge pull request #93 from JaidenAshmore/issue/91_concurrent_message_broker_error_safety
refs #91: Added more safety to the ConcurrentMessageBroker if the properties throw exceptions.
2 parents d0d5e41 + 224f99e commit 67b555e

File tree

12 files changed

+465
-60
lines changed

12 files changed

+465
-60
lines changed

examples/java-dynamic-sqs-listener-core-examples/src/main/java/com/jashmore/sqs/examples/ConcurrentBrokerExample.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.concurrent.atomic.AtomicInteger;
5050
import java.util.stream.IntStream;
5151
import javax.validation.constraints.Min;
52+
import javax.validation.constraints.PositiveOrZero;
5253

5354
/**
5455
* This example shows the core framework being used to processing messages place onto the queue with a dynamic level of concurrency via the
@@ -63,7 +64,7 @@
6364
*/
6465
@Slf4j
6566
public class ConcurrentBrokerExample {
66-
private static final int CONCURRENCY_LEVEL_PERIOD_IN_MS = 5000;
67+
private static final long CONCURRENCY_LEVEL_PERIOD_IN_MS = 5000L;
6768
private static final int CONCURRENCY_LEVEL_LIMIT = 10;
6869

6970
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -130,14 +131,19 @@ public Integer getConcurrencyLevel() {
130131
}
131132

132133
@Override
133-
public @Min(0) Integer getPreferredConcurrencyPollingRateInMilliseconds() {
134+
public @Min(0) Long getPreferredConcurrencyPollingRateInMilliseconds() {
134135
return CONCURRENCY_LEVEL_PERIOD_IN_MS;
135136
}
136137

137138
@Override
138-
public String threadNameFormat() {
139+
public String getThreadNameFormat() {
139140
return "my-message-listener-thread-%d";
140141
}
142+
143+
@Override
144+
public @PositiveOrZero Long getErrorBackoffTimeInMilliseconds() {
145+
return 0L;
146+
}
141147
})
142148
);
143149

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/broker/concurrent/CachingConcurrentMessageBrokerProperties.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@
77
import net.jcip.annotations.ThreadSafe;
88

99
import java.util.concurrent.TimeUnit;
10+
import javax.annotation.Nullable;
1011
import javax.validation.constraints.Min;
12+
import javax.validation.constraints.PositiveOrZero;
1113

1214
/**
1315
* Implementation that will cache the values as the methods to retrieve the values may be costly.
@@ -24,7 +26,8 @@ public class CachingConcurrentMessageBrokerProperties implements ConcurrentMessa
2426
private static final int SINGLE_CACHE_VALUE_KEY = 0;
2527

2628
private final LoadingCache<Integer, Integer> cachedConcurrencyLevel;
27-
private final LoadingCache<Integer, Integer> cachedPreferredConcurrencyPollingRateInSeconds;
29+
private final LoadingCache<Integer, Long> cachedPreferredConcurrencyPollingRateInSeconds;
30+
private final LoadingCache<Integer, Long> cachedErrorBackoffTimeInMilliseconds;
2831
private final String threadNameFormat;
2932

3033
/**
@@ -43,7 +46,11 @@ public CachingConcurrentMessageBrokerProperties(final int cachingTimeoutInMs,
4346
.expireAfterWrite(cachingTimeoutInMs, TimeUnit.MILLISECONDS)
4447
.build(CacheLoader.from(delegateProperties::getPreferredConcurrencyPollingRateInMilliseconds));
4548

46-
this.threadNameFormat = delegateProperties.threadNameFormat();
49+
this.cachedErrorBackoffTimeInMilliseconds = CacheBuilder.newBuilder()
50+
.expireAfterWrite(cachingTimeoutInMs, TimeUnit.MILLISECONDS)
51+
.build(CacheLoader.from(delegateProperties::getErrorBackoffTimeInMilliseconds));
52+
53+
this.threadNameFormat = delegateProperties.getThreadNameFormat();
4754
}
4855

4956
@Override
@@ -52,12 +59,18 @@ public CachingConcurrentMessageBrokerProperties(final int cachingTimeoutInMs,
5259
}
5360

5461
@Override
55-
public @Min(0) Integer getPreferredConcurrencyPollingRateInMilliseconds() {
62+
public @Min(0) Long getPreferredConcurrencyPollingRateInMilliseconds() {
5663
return cachedPreferredConcurrencyPollingRateInSeconds.getUnchecked(SINGLE_CACHE_VALUE_KEY);
5764
}
5865

5966
@Override
60-
public String threadNameFormat() {
67+
public String getThreadNameFormat() {
6168
return threadNameFormat;
6269
}
70+
71+
@Nullable
72+
@Override
73+
public @PositiveOrZero Long getErrorBackoffTimeInMilliseconds() {
74+
return cachedErrorBackoffTimeInMilliseconds.getUnchecked(SINGLE_CACHE_VALUE_KEY);
75+
}
6376
}

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/broker/concurrent/ConcurrentMessageBroker.java

Lines changed: 78 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,17 @@
11
package com.jashmore.sqs.broker.concurrent;
22

3+
import static com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerConstants.DEFAULT_BACKOFF_TIME_IN_MS;
34
import static java.util.concurrent.TimeUnit.MILLISECONDS;
45
import static java.util.concurrent.TimeUnit.MINUTES;
56

7+
import com.google.common.base.Preconditions;
68
import com.google.common.util.concurrent.ThreadFactoryBuilder;
79

810
import com.jashmore.sqs.broker.MessageBroker;
911
import com.jashmore.sqs.processor.MessageProcessor;
1012
import com.jashmore.sqs.retriever.MessageRetriever;
1113
import com.jashmore.sqs.util.ResizableSemaphore;
14+
import com.jashmore.sqs.util.properties.PropertyUtils;
1215
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
1316
import lombok.extern.slf4j.Slf4j;
1417

@@ -46,30 +49,41 @@ public void run() {
4649
final ResizableSemaphore concurrentMessagesBeingProcessedSemaphore = new ResizableSemaphore(0);
4750

4851
while (!Thread.currentThread().isInterrupted()) {
49-
updateConcurrencyLevelIfChanged(concurrentMessagesBeingProcessedSemaphore);
50-
51-
final long numberOfMillisecondsToObtainPermit = properties.getPreferredConcurrencyPollingRateInMilliseconds();
5252
try {
53-
final boolean obtainedPermit = concurrentMessagesBeingProcessedSemaphore.tryAcquire(numberOfMillisecondsToObtainPermit, MILLISECONDS);
54-
if (!obtainedPermit) {
53+
updateConcurrencyLevelIfChanged(concurrentMessagesBeingProcessedSemaphore);
54+
55+
final long numberOfMillisecondsToObtainPermit = getNumberOfMillisecondsToObtainPermit();
56+
try {
57+
final boolean obtainedPermit = concurrentMessagesBeingProcessedSemaphore.tryAcquire(numberOfMillisecondsToObtainPermit, MILLISECONDS);
58+
if (!obtainedPermit) {
59+
continue;
60+
}
61+
} catch (final InterruptedException interruptedException) {
62+
log.debug("Interrupted exception caught while adding more listeners, shutting down!");
63+
Thread.currentThread().interrupt();
5564
continue;
5665
}
57-
} catch (final InterruptedException interruptedException) {
58-
log.info("Interrupted exception caught while adding more listeners, shutting down!");
59-
Thread.currentThread().interrupt();
60-
continue;
61-
}
6266

63-
CompletableFuture.supplyAsync(() -> {
67+
CompletableFuture.supplyAsync(() -> {
68+
try {
69+
return messageRetriever.retrieveMessage();
70+
} catch (final InterruptedException exception) {
71+
log.debug("Thread interrupted waiting for a message");
72+
throw new RuntimeException("Failure to get message");
73+
}
74+
}, concurrentThreadsExecutor)
75+
.thenAcceptAsync(messageProcessor::processMessage, messageProcessingThreadsExecutor)
76+
.whenComplete((ignoredResult, throwable) -> concurrentMessagesBeingProcessedSemaphore.release());
77+
} catch (final Throwable throwable) {
6478
try {
65-
return messageRetriever.retrieveMessage();
66-
} catch (final InterruptedException exception) {
67-
log.trace("Thread interrupted waiting for a message");
68-
throw new RuntimeException("Failure to get message");
79+
final long errorBackoffTimeInMilliseconds = getErrorBackoffTimeInMilliseconds();
80+
log.error("Error thrown while organising threads to process messages. Backing off for {}ms", errorBackoffTimeInMilliseconds, throwable);
81+
Thread.sleep(errorBackoffTimeInMilliseconds);
82+
} catch (final InterruptedException interruptedException) {
83+
log.debug("Thread interrupted during backoff period");
84+
Thread.currentThread().interrupt();
6985
}
70-
}, concurrentThreadsExecutor)
71-
.thenAcceptAsync(messageProcessor::processMessage, messageProcessingThreadsExecutor)
72-
.whenComplete((ignoredResult, throwable) -> concurrentMessagesBeingProcessedSemaphore.release());
86+
}
7387
}
7488

7589
log.debug("Shutting down message controller");
@@ -88,6 +102,7 @@ public void run() {
88102
*/
89103
private void updateConcurrencyLevelIfChanged(final ResizableSemaphore resizableSemaphore) {
90104
final int newConcurrencyLevel = properties.getConcurrencyLevel();
105+
Preconditions.checkArgument(newConcurrencyLevel >= 0, "concurrencyLevel should be non-negative");
91106

92107
if (resizableSemaphore.getMaximumPermits() != newConcurrencyLevel) {
93108
log.debug("Changing concurrency from {} to {}", resizableSemaphore.getMaximumPermits(), newConcurrencyLevel);
@@ -111,13 +126,54 @@ private void shutdownConcurrentThreads(final ExecutorService concurrentThreadsEx
111126
}
112127
}
113128

129+
/**
130+
* Build the {@link ExecutorService} that will be used for the threads that are processing the messages.
131+
*
132+
* <p>This will provide the logic to name the threads to make it easier to debug multiple different message listeners in the system.
133+
*
134+
* @return the executor service that will be used for processing messages
135+
*/
114136
private ExecutorService buildMessageProcessingExecutorService() {
115-
final ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
137+
try {
138+
final ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
116139

117-
if (properties.threadNameFormat() != null) {
118-
threadFactoryBuilder.setNameFormat(properties.threadNameFormat());
140+
final String threadNameFormat = properties.getThreadNameFormat();
141+
if (threadNameFormat != null) {
142+
threadFactoryBuilder.setNameFormat(threadNameFormat);
143+
}
144+
145+
return Executors.newCachedThreadPool(threadFactoryBuilder.build());
146+
} catch (final Throwable throwable) {
147+
log.error("Error thrown building message processing executor service, returning default");
148+
return Executors.newCachedThreadPool();
119149
}
150+
}
120151

121-
return Executors.newCachedThreadPool(threadFactoryBuilder.build());
152+
/**
153+
* Get the number of seconds that the thread should wait when there was an error trying to organise a thread to process.
154+
*
155+
* @return the backoff time in milliseconds
156+
* @see ConcurrentMessageBrokerProperties#getErrorBackoffTimeInMilliseconds() for more information
157+
*/
158+
private long getErrorBackoffTimeInMilliseconds() {
159+
return PropertyUtils.safelyGetPositiveOrZeroLongValue(
160+
"errorBackoffTimeInMilliseconds",
161+
properties::getErrorBackoffTimeInMilliseconds,
162+
DEFAULT_BACKOFF_TIME_IN_MS
163+
);
164+
}
165+
166+
/**
167+
* Safely get the number of milliseconds that should wait to get a permit for creating a new thread.
168+
*
169+
* @return the number of milliseconds to wait
170+
* @see ConcurrentMessageBrokerProperties#getPreferredConcurrencyPollingRateInMilliseconds() for more information
171+
*/
172+
private long getNumberOfMillisecondsToObtainPermit() {
173+
return PropertyUtils.safelyGetPositiveLongValue(
174+
"preferredConcurrencyPollingRateInMilliseconds",
175+
properties::getPreferredConcurrencyPollingRateInMilliseconds,
176+
DEFAULT_BACKOFF_TIME_IN_MS
177+
);
122178
}
123179
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.jashmore.sqs.broker.concurrent;
2+
3+
import lombok.experimental.UtilityClass;
4+
5+
@UtilityClass
6+
public class ConcurrentMessageBrokerConstants {
7+
/**
8+
* The default amount of time to sleep the thread when there was an error organising the message processing threads.
9+
*/
10+
public static final int DEFAULT_BACKOFF_TIME_IN_MS = 10_000;
11+
12+
/**
13+
* The default amount of time the thread should wait for a thread to process a message before it tries again and checks the available concurrency.
14+
*/
15+
public static final long DEFAULT_CONCURRENCY_POLLING_IN_MS = 60_000L;
16+
}

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/broker/concurrent/ConcurrentMessageBrokerProperties.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33
import com.jashmore.sqs.retriever.MessageRetriever;
44

5+
import javax.annotation.Nullable;
56
import javax.annotation.concurrent.NotThreadSafe;
67
import javax.validation.constraints.Min;
8+
import javax.validation.constraints.PositiveOrZero;
79

810
/**
911
* Properties for dynamically configuring how the {@link ConcurrentMessageBroker} is able to process messages concurrently.
@@ -68,7 +70,7 @@ public interface ConcurrentMessageBrokerProperties {
6870
* @return the number of milliseconds between polls for the concurrency level
6971
*/
7072
@Min(0)
71-
Integer getPreferredConcurrencyPollingRateInMilliseconds();
73+
Long getPreferredConcurrencyPollingRateInMilliseconds();
7274

7375
/**
7476
* The String format for the name of the children threads that process the message.
@@ -80,5 +82,20 @@ public interface ConcurrentMessageBrokerProperties {
8082
*
8183
* @return the format for the name of threads that process messages
8284
*/
83-
String threadNameFormat();
85+
@Nullable
86+
String getThreadNameFormat();
87+
88+
/**
89+
* The number of milliseconds that the background thread for organising concurrent threads should backoff before attempting again.
90+
*
91+
* <p>This is needed to stop the background thread from trying again and again over and over causing a flood of error log messages that may make it
92+
* difficult to debug.
93+
*
94+
* <p>If this value is null, negative or zero, {@link ConcurrentMessageBrokerConstants#DEFAULT_BACKOFF_TIME_IN_MS} will be used as the backoff period.
95+
*
96+
* @return the number of milliseconds to sleep the thread after an error is thrown
97+
*/
98+
@Nullable
99+
@PositiveOrZero
100+
Long getErrorBackoffTimeInMilliseconds();
84101
}

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/broker/concurrent/StaticConcurrentMessageBrokerProperties.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.jashmore.sqs.broker.concurrent;
22

3+
import static com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerConstants.DEFAULT_CONCURRENCY_POLLING_IN_MS;
4+
35
import com.google.common.base.Preconditions;
46

57
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -9,8 +11,11 @@
911
import lombok.ToString;
1012
import net.jcip.annotations.ThreadSafe;
1113

14+
import java.util.Locale;
1215
import java.util.Optional;
16+
import javax.annotation.Nullable;
1317
import javax.validation.constraints.Min;
18+
import javax.validation.constraints.PositiveOrZero;
1419

1520
/**
1621
* Implementation that stores the value as non-mutable field values and therefore will return the same value on every call.
@@ -24,20 +29,18 @@
2429
@Builder(toBuilder = true)
2530
@ThreadSafe
2631
public final class StaticConcurrentMessageBrokerProperties implements ConcurrentMessageBrokerProperties {
27-
private static final Integer DEFAULT_CONCURRENCY_POLLING_IN_MS = 60_000;
28-
2932
@NonNull
3033
private final Integer concurrencyLevel;
31-
3234
@NonNull
33-
private final Integer preferredConcurrencyPollingRateInMilliseconds;
34-
35+
private final Long preferredConcurrencyPollingRateInMilliseconds;
3536
private final String threadNameFormat;
37+
private final Long errorBackoffTimeInMilliseconds;
3638

3739
@SuppressFBWarnings("RV_RETURN_VAL")
3840
public StaticConcurrentMessageBrokerProperties(final Integer concurrencyLevel,
39-
final Integer preferredConcurrencyPollingRateInMilliseconds,
40-
final String threadNameFormat) {
41+
final Long preferredConcurrencyPollingRateInMilliseconds,
42+
final String threadNameFormat,
43+
final Long errorBackoffTimeInMilliseconds) {
4144
Preconditions.checkArgument(concurrencyLevel == null || concurrencyLevel >= 0, "concurrencyLevel should be greater than or equal to zero");
4245
Preconditions.checkArgument(preferredConcurrencyPollingRateInMilliseconds == null || preferredConcurrencyPollingRateInMilliseconds >= 0,
4346
"preferredConcurrencyPollingRateInMilliseconds should be greater than or equal to zero");
@@ -46,12 +49,14 @@ public StaticConcurrentMessageBrokerProperties(final Integer concurrencyLevel,
4649
.orElse(0);
4750
this.preferredConcurrencyPollingRateInMilliseconds = Optional.ofNullable(preferredConcurrencyPollingRateInMilliseconds)
4851
.orElse(DEFAULT_CONCURRENCY_POLLING_IN_MS);
52+
this.errorBackoffTimeInMilliseconds = errorBackoffTimeInMilliseconds;
4953

5054
this.threadNameFormat = threadNameFormat;
5155
if (threadNameFormat != null) {
56+
5257
// Test that the thread name is in the correct format
5358
//noinspection ResultOfMethodCallIgnored
54-
String.format(threadNameFormat, 0);
59+
String.format(Locale.ROOT, threadNameFormat, 1);
5560
}
5661
}
5762

@@ -61,12 +66,18 @@ public StaticConcurrentMessageBrokerProperties(final Integer concurrencyLevel,
6166
}
6267

6368
@Override
64-
public @Min(0) Integer getPreferredConcurrencyPollingRateInMilliseconds() {
69+
public @Min(0) Long getPreferredConcurrencyPollingRateInMilliseconds() {
6570
return preferredConcurrencyPollingRateInMilliseconds;
6671
}
6772

6873
@Override
69-
public String threadNameFormat() {
74+
public String getThreadNameFormat() {
7075
return threadNameFormat;
7176
}
77+
78+
@Nullable
79+
@Override
80+
public @PositiveOrZero Long getErrorBackoffTimeInMilliseconds() {
81+
return errorBackoffTimeInMilliseconds;
82+
}
7283
}

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/retriever/batching/BatchingMessageRetrieverProperties.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ public interface BatchingMessageRetrieverProperties {
5959
* set on the SQS queue will be used.
6060
*
6161
* @return the visibility timeout for the message
62-
* @see ReceiveMessageRequest#visibilityTimeout for where this is applied against
62+
* @see ReceiveMessageRequest#visibilityTimeout() for where this is applied against
6363
*/
6464
@Nullable
6565
@Positive
@@ -73,7 +73,7 @@ public interface BatchingMessageRetrieverProperties {
7373
* <p>If this value is null, the {@link AwsConstants#MAX_SQS_RECEIVE_WAIT_TIME_IN_SECONDS} will be used when requesting messages from SQS.
7474
*
7575
* @return the wait time in seconds for obtaining messages
76-
* @see ReceiveMessageRequest#waitTimeSeconds for the usage
76+
* @see ReceiveMessageRequest#waitTimeSeconds() for the usage
7777
*/
7878
@Nullable
7979
@PositiveOrZero

0 commit comments

Comments
 (0)