11package com .jashmore .sqs .retriever .batching ;
22
33import static com .jashmore .sqs .retriever .batching .BatchingMessageRetrieverConstants .DEFAULT_BACKOFF_TIME_IN_MS ;
4+ import static com .jashmore .sqs .retriever .batching .BatchingMessageRetrieverConstants .DEFAULT_BATCHING_TRIGGER ;
45
56import com .google .common .annotations .VisibleForTesting ;
67
78import com .jashmore .sqs .QueueProperties ;
89import com .jashmore .sqs .aws .AwsConstants ;
10+ import com .jashmore .sqs .broker .concurrent .ConcurrentMessageBrokerProperties ;
911import com .jashmore .sqs .retriever .AsyncMessageRetriever ;
1012import com .jashmore .sqs .retriever .MessageRetriever ;
11- import com .jashmore .sqs .util .RetrieverUtils ;
13+ import com .jashmore .sqs .util .properties .PropertyUtils ;
14+ import com .jashmore .sqs .util .retriever .RetrieverUtils ;
1215import edu .umd .cs .findbugs .annotations .SuppressFBWarnings ;
1316import lombok .extern .slf4j .Slf4j ;
1417import software .amazon .awssdk .services .sqs .SqsAsyncClient ;
1518import software .amazon .awssdk .services .sqs .model .Message ;
1619import software .amazon .awssdk .services .sqs .model .ReceiveMessageRequest ;
1720import software .amazon .awssdk .services .sqs .model .ReceiveMessageResponse ;
1821
19- import java .util .Optional ;
2022import java .util .concurrent .BlockingQueue ;
2123import java .util .concurrent .LinkedBlockingQueue ;
2224import java .util .concurrent .atomic .AtomicInteger ;
@@ -52,11 +54,11 @@ public BatchingMessageRetriever(final QueueProperties queueProperties,
5254
5355 @ VisibleForTesting
5456 BatchingMessageRetriever (final QueueProperties queueProperties ,
55- final SqsAsyncClient sqsAsyncClient ,
56- final BatchingMessageRetrieverProperties properties ,
57- final AtomicInteger numberWaitingForMessages ,
58- final BlockingQueue <Message > messagesDownloaded ,
59- final Object shouldObtainMessagesLock ) {
57+ final SqsAsyncClient sqsAsyncClient ,
58+ final BatchingMessageRetrieverProperties properties ,
59+ final AtomicInteger numberWaitingForMessages ,
60+ final BlockingQueue <Message > messagesDownloaded ,
61+ final Object shouldObtainMessagesLock ) {
6062 this .queueProperties = queueProperties ;
6163 this .sqsAsyncClient = sqsAsyncClient ;
6264 this .properties = properties ;
@@ -84,9 +86,10 @@ public Message retrieveMessage() throws InterruptedException {
8486 private void incrementWaitingCountAndNotify () {
8587 synchronized (shouldObtainMessagesLock ) {
8688 final int currentThreads = numberWaitingForMessages .incrementAndGet ();
87- if (currentThreads >= properties .getNumberOfThreadsWaitingTrigger ()) {
89+ final int numberOfThreadsWaitingTrigger = getNumberOfThreadsWaitingTrigger ();
90+ if (currentThreads >= numberOfThreadsWaitingTrigger ) {
8891 log .debug ("Maximum number of threads({}) waiting has arrived requesting any sleeping threads to wake up to process" ,
89- properties . getNumberOfThreadsWaitingTrigger () );
92+ numberOfThreadsWaitingTrigger );
9093 // notify that we should grab a message
9194 shouldObtainMessagesLock .notifyAll ();
9295 }
@@ -97,30 +100,30 @@ private void incrementWaitingCountAndNotify() {
97100 public void run () {
98101 log .debug ("Started background thread" );
99102 while (true ) {
100- final int numberOfMessagesToObtain ;
101- synchronized (shouldObtainMessagesLock ) {
102- final int triggerValue = properties .getNumberOfThreadsWaitingTrigger ();
103- if ((numberWaitingForMessages .get () - messagesDownloaded .size ()) < triggerValue ) {
104- try {
105- waitForEnoughThreadsToRequestMessages (getPollingPeriodInMs (triggerValue ));
106- } catch (InterruptedException exception ) {
107- log .debug ("Thread interrupted while waiting for messages" );
108- break ;
103+ try {
104+ final int numberOfMessagesToObtain ;
105+ synchronized (shouldObtainMessagesLock ) {
106+ final int triggerValue = getNumberOfThreadsWaitingTrigger ();
107+ if ((numberWaitingForMessages .get () - messagesDownloaded .size ()) < triggerValue ) {
108+ try {
109+ waitForEnoughThreadsToRequestMessages (getPollingPeriodInMs ());
110+ } catch (InterruptedException exception ) {
111+ log .debug ("Thread interrupted while waiting for messages" );
112+ break ;
113+ }
109114 }
115+ numberOfMessagesToObtain = Math .min (numberWaitingForMessages .get () - messagesDownloaded .size (),
116+ AwsConstants .MAX_NUMBER_OF_MESSAGES_FROM_SQS );
110117 }
111- numberOfMessagesToObtain = Math .min (numberWaitingForMessages .get () - messagesDownloaded .size (),
112- AwsConstants .MAX_NUMBER_OF_MESSAGES_FROM_SQS );
113- }
114118
115- if (numberOfMessagesToObtain <= 0 ) {
116- log .debug ("Requesting 0 messages" );
117- // We don't want to go request out if there are no messages to retrieve
118- continue ;
119- }
119+ if (numberOfMessagesToObtain <= 0 ) {
120+ log .debug ("Requesting 0 messages" );
121+ // We don't want to go request out if there are no messages to retrieve
122+ continue ;
123+ }
120124
121- log .debug ("Requesting {} messages" , numberOfMessagesToObtain );
125+ log .debug ("Requesting {} messages" , numberOfMessagesToObtain );
122126
123- try {
124127 final ReceiveMessageResponse response ;
125128 try {
126129 response = sqsAsyncClient .receiveMessage (buildReceiveMessageRequest (numberOfMessagesToObtain ))
@@ -140,9 +143,11 @@ public void run() {
140143 } catch (final Throwable throwable ) {
141144 log .error ("Error thrown trying to obtain messages" , throwable );
142145 try {
143- backoff (RetrieverUtils .safelyGetBackoffTime (properties .getErrorBackoffTimeInMilliseconds (), DEFAULT_BACKOFF_TIME_IN_MS ));
146+ final long errorBackoffTimeInMilliseconds = getErrorBackoffTimeInMilliseconds ();
147+ log .error ("Error thrown while organising threads to process messages. Backing off for {}ms" , errorBackoffTimeInMilliseconds , throwable );
148+ backoff (errorBackoffTimeInMilliseconds );
144149 } catch (final InterruptedException interruptedException ) {
145- log .trace ("Thread interrupted during error backoff thread sleeping " );
150+ log .debug ("Thread interrupted during backoff period " );
146151 break ;
147152 }
148153 }
@@ -155,6 +160,33 @@ void waitForEnoughThreadsToRequestMessages(final long waitPeriodInMs) throws Int
155160 shouldObtainMessagesLock .wait (waitPeriodInMs );
156161 }
157162
163+ /**
164+ * Get the number of seconds that the thread should wait when there was an error trying to organise a thread to process.
165+ *
166+ * @return the backoff time in milliseconds
167+ * @see ConcurrentMessageBrokerProperties#getErrorBackoffTimeInMilliseconds() for more information
168+ */
169+ private long getErrorBackoffTimeInMilliseconds () {
170+ return PropertyUtils .safelyGetPositiveOrZeroLongValue (
171+ "errorBackoffTimeInMilliseconds" ,
172+ properties ::getErrorBackoffTimeInMilliseconds ,
173+ DEFAULT_BACKOFF_TIME_IN_MS
174+ );
175+ }
176+
177+ /**
178+ * Safely get the total number of threads requiring messages before it sends a batch request for messages.
179+ *
180+ * @return the total number of threads for the batching trigger
181+ */
182+ private int getNumberOfThreadsWaitingTrigger () {
183+ return PropertyUtils .safelyGetIntegerValue (
184+ "numberOfThreadsWaitingTrigger" ,
185+ properties ::getNumberOfThreadsWaitingTrigger ,
186+ DEFAULT_BATCHING_TRIGGER
187+ );
188+ }
189+
158190 @ VisibleForTesting
159191 void backoff (final long backoffTimeInMs ) throws InterruptedException {
160192 Thread .sleep (backoffTimeInMs );
@@ -171,12 +203,12 @@ private ReceiveMessageRequest buildReceiveMessageRequest(final int numberOfMessa
171203 .builder ()
172204 .queueUrl (queueProperties .getQueueUrl ())
173205 .maxNumberOfMessages (numberOfMessagesToObtain )
174- .waitTimeSeconds (RetrieverUtils .safelyGetWaitTimeInSeconds (properties . getMessageWaitTimeInSeconds () ));
206+ .waitTimeSeconds (RetrieverUtils .safelyGetWaitTimeInSeconds (properties :: getMessageWaitTimeInSeconds ));
175207
176208 final Integer visibilityTimeoutInSeconds = properties .getVisibilityTimeoutInSeconds ();
177209 if (visibilityTimeoutInSeconds != null ) {
178210 if (visibilityTimeoutInSeconds < 0 ) {
179- log .warn ("Non-positive visibilityTimeoutInSeconds provided: " , visibilityTimeoutInSeconds );
211+ log .warn ("Non-positive visibilityTimeoutInSeconds provided: {} " , visibilityTimeoutInSeconds );
180212 } else {
181213 requestBuilder .visibilityTimeout (visibilityTimeoutInSeconds );
182214 }
@@ -190,15 +222,13 @@ private ReceiveMessageRequest buildReceiveMessageRequest(final int numberOfMessa
190222 * could cause this retriever to block forever if the number of threads never reaches
191223 * {@link BatchingMessageRetrieverProperties#getNumberOfThreadsWaitingTrigger()}.
192224 *
193- * @param triggerValue the number of threads that would trigger the batch retrieval of messages
194225 * @return the polling period in ms
195226 */
196- private long getPollingPeriodInMs (final int triggerValue ) {
197- return Optional .ofNullable (properties .getMessageRetrievalPollingPeriodInMs ())
198- .orElseGet (() -> {
199- log .warn ("No polling period specifically set, defaulting to zero which will have the thread blocked until {} threads request messages" ,
200- triggerValue );
201- return 0L ;
202- });
227+ private long getPollingPeriodInMs () {
228+ return PropertyUtils .safelyGetLongValue (
229+ "messageRetrievalPollingPeriodInMs" ,
230+ properties ::getMessageRetrievalPollingPeriodInMs ,
231+ 0L
232+ );
203233 }
204234}
0 commit comments