Skip to content

Commit 41b13b7

Browse files
refs #128: Give IndividualMessageRetriever some love
1 parent 8273a27 commit 41b13b7

File tree

10 files changed

+292
-59
lines changed

10 files changed

+292
-59
lines changed
Lines changed: 60 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,25 @@
11
package com.jashmore.sqs.retriever.individual;
22

33
import static com.jashmore.sqs.aws.AwsConstants.MAX_SQS_RECEIVE_WAIT_TIME_IN_SECONDS;
4+
import static com.jashmore.sqs.retriever.batching.BatchingMessageRetrieverConstants.DEFAULT_BACKOFF_TIME_IN_MS;
5+
6+
import com.google.common.annotations.VisibleForTesting;
47

58
import com.jashmore.sqs.QueueProperties;
9+
import com.jashmore.sqs.broker.concurrent.ConcurrentMessageBrokerProperties;
610
import com.jashmore.sqs.retriever.MessageRetriever;
11+
import com.jashmore.sqs.util.properties.PropertyUtils;
712
import lombok.AllArgsConstructor;
813
import lombok.extern.slf4j.Slf4j;
14+
import software.amazon.awssdk.core.exception.SdkClientException;
15+
import software.amazon.awssdk.core.exception.SdkInterruptedException;
916
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
1017
import software.amazon.awssdk.services.sqs.model.Message;
1118
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
1219
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
1320
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
1421

1522
import java.util.concurrent.ExecutionException;
16-
import java.util.concurrent.Future;
1723
import javax.annotation.concurrent.ThreadSafe;
1824

1925
/**
@@ -32,33 +38,71 @@ public class IndividualMessageRetriever implements MessageRetriever {
3238

3339
@Override
3440
public Message retrieveMessage() throws InterruptedException {
35-
final Message message;
3641
while (true) {
37-
final Future<ReceiveMessageResponse> receiveMessageResultFuture = sqsAsyncClient.receiveMessage(generateReceiveMessageRequest());
38-
42+
final ReceiveMessageResponse response;
3943
try {
40-
final ReceiveMessageResponse response = receiveMessageResultFuture.get();
41-
if (!response.messages().isEmpty()) {
42-
message = response.messages().get(0);
43-
break;
44+
response = sqsAsyncClient.receiveMessage(generateReceiveMessageRequest()).get();
45+
if (response.messages().isEmpty()) {
46+
continue;
47+
}
48+
49+
return response.messages().get(0);
50+
} catch (final ExecutionException | RuntimeException exception) {
51+
if (exception instanceof ExecutionException) {
52+
// Supposedly the SqsAsyncClient can get interrupted and this will remove the interrupted status from the thread and then wrap it
53+
// in it's own version of the interrupted exception...If this happens when the retriever is being shut down it will keep on processing
54+
// because it does not realise it is being shut down, therefore we have to check for this and quit if necessary
55+
final Throwable executionExceptionCause = exception.getCause();
56+
if (executionExceptionCause instanceof SdkClientException) {
57+
if (executionExceptionCause.getCause() instanceof SdkInterruptedException) {
58+
log.debug("Thread interrupted while receiving messages");
59+
throw new InterruptedException("Interrupted while retrieving messages");
60+
}
61+
}
4462
}
45-
} catch (final ExecutionException executionException) {
46-
throw new RuntimeException("Exception retrieving message", executionException.getCause());
63+
64+
final long errorBackoffTimeInMilliseconds = getErrorBackoffTimeInMilliseconds();
65+
log.error("Error thrown while organising threads to process messages. Backing off for {}ms", errorBackoffTimeInMilliseconds, exception);
66+
backoff(errorBackoffTimeInMilliseconds);
4767
}
4868
}
69+
}
4970

50-
return message;
71+
/**
72+
* Get the number of seconds that the thread should wait when there was an error trying to organise a thread to process.
73+
*
74+
* @return the backoff time in milliseconds
75+
* @see ConcurrentMessageBrokerProperties#getErrorBackoffTimeInMilliseconds() for more information
76+
*/
77+
private long getErrorBackoffTimeInMilliseconds() {
78+
return PropertyUtils.safelyGetPositiveOrZeroLongValue(
79+
"errorBackoffTimeInMilliseconds",
80+
properties::getErrorBackoffTimeInMilliseconds,
81+
DEFAULT_BACKOFF_TIME_IN_MS
82+
);
83+
}
84+
85+
@VisibleForTesting
86+
void backoff(final long backoffTimeInMs) throws InterruptedException {
87+
Thread.sleep(backoffTimeInMs);
5188
}
5289

5390
private ReceiveMessageRequest generateReceiveMessageRequest() {
54-
return ReceiveMessageRequest
55-
.builder()
91+
final ReceiveMessageRequest.Builder requestBuilder = ReceiveMessageRequest.builder()
5692
.queueUrl(queueProperties.getQueueUrl())
5793
.maxNumberOfMessages(1)
5894
.attributeNames(QueueAttributeName.ALL)
5995
.messageAttributeNames(QueueAttributeName.ALL.toString())
60-
.waitTimeSeconds(MAX_SQS_RECEIVE_WAIT_TIME_IN_SECONDS)
61-
.visibilityTimeout(properties.getVisibilityTimeoutForMessagesInSeconds())
62-
.build();
96+
.waitTimeSeconds(MAX_SQS_RECEIVE_WAIT_TIME_IN_SECONDS);
97+
final Integer visibilityTimeoutInSeconds = properties.getMessageVisibilityTimeoutInSeconds();
98+
if (visibilityTimeoutInSeconds != null) {
99+
if (visibilityTimeoutInSeconds <= 0) {
100+
log.warn("Non-positive visibilityTimeoutInSeconds provided: {}", visibilityTimeoutInSeconds);
101+
} else {
102+
requestBuilder.visibilityTimeout(visibilityTimeoutInSeconds);
103+
}
104+
}
105+
106+
return requestBuilder.build();
63107
}
64108
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.jashmore.sqs.retriever.individual;
2+
3+
import lombok.experimental.UtilityClass;
4+
5+
@UtilityClass
6+
public class IndividualMessageRetrieverConstants {
7+
/**
8+
* The default amount of time to sleep the thread when there was an error obtaining messages.
9+
*/
10+
public static final int DEFAULT_BACKOFF_TIME_IN_MS = 10_000;
11+
}
Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,38 @@
11
package com.jashmore.sqs.retriever.individual;
22

3-
import lombok.AllArgsConstructor;
4-
import lombok.Builder;
5-
import lombok.Value;
63
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
74

8-
@Value
9-
@Builder
10-
@AllArgsConstructor
11-
public class IndividualMessageRetrieverProperties {
5+
import javax.annotation.Nullable;
6+
import javax.validation.constraints.Positive;
7+
import javax.validation.constraints.PositiveOrZero;
8+
9+
public interface IndividualMessageRetrieverProperties {
1210
/**
13-
* The visibility timeout for the message.
11+
* Represents the time that messages received from the SQS queue should be invisible from other consumers of the queue before it is considered a failure
12+
* and placed onto the queue for future retrieval.
13+
*
14+
* <p>If a null or non-positive number is returned than no visibility timeout will be submitted for this message and therefore the default visibility
15+
* set on the SQS queue will be used.
16+
*
17+
* @return the visibility timeout for the message
18+
* @see ReceiveMessageRequest#visibilityTimeout() for where this is applied against
19+
*/
20+
@Nullable
21+
@Positive
22+
Integer getMessageVisibilityTimeoutInSeconds();
23+
24+
/**
25+
* The number of milliseconds that the background thread for receiving messages should sleep after an error is thrown.
26+
*
27+
* <p>This is needed to stop the background thread from constantly requesting for more messages which constantly throwing errors. For example, maybe the
28+
* connection to the SQS throws a 403 or some other error and we don't want to be constantly retrying to make the connection unnecessarily. This
29+
* therefore sleeps the thread for this period before attempting again.
1430
*
15-
* <p>E.g. the number of seconds that a message can be kept before it is assumed that it wasn't completed and will be put back onto the queue
31+
* <p>If this value is null, negative or zero, {@link IndividualMessageRetrieverConstants#DEFAULT_BACKOFF_TIME_IN_MS} will be used as the backoff period.
1632
*
17-
* @see ReceiveMessageRequest#visibilityTimeout for where this is applied against
33+
* @return the number of milliseconds to sleep the thread after an error is thrown
1834
*/
19-
private final Integer visibilityTimeoutForMessagesInSeconds;
35+
@Nullable
36+
@PositiveOrZero
37+
Long getErrorBackoffTimeInMilliseconds();
2038
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.jashmore.sqs.retriever.individual;
2+
3+
import lombok.AllArgsConstructor;
4+
import lombok.Builder;
5+
6+
import javax.annotation.Nullable;
7+
import javax.validation.constraints.PositiveOrZero;
8+
9+
@AllArgsConstructor
10+
@Builder(toBuilder = true)
11+
public class StaticIndividualMessageRetrieverProperties implements IndividualMessageRetrieverProperties {
12+
private final Integer visibilityTimeoutForMessagesInSeconds;
13+
private final Long errorBackoffTimeInMilliseconds;
14+
15+
@Override
16+
public Integer getMessageVisibilityTimeoutInSeconds() {
17+
return visibilityTimeoutForMessagesInSeconds;
18+
}
19+
20+
@Nullable
21+
@Override
22+
public @PositiveOrZero Long getErrorBackoffTimeInMilliseconds() {
23+
return errorBackoffTimeInMilliseconds;
24+
}
25+
}

0 commit comments

Comments
 (0)