Skip to content

Commit 3374637

Browse files
none: move around utilities, bump some dependencies and improve documentation (#147)
1 parent 02b817b commit 3374637

File tree

37 files changed

+318
-133
lines changed

37 files changed

+318
-133
lines changed

README.md

Lines changed: 38 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,40 @@ or other configuration properties.
1313
To keep the README minimal and easy to digest, the rest of the documentation is kept in the [doc](./doc/documentation.md) folder which provides a more
1414
thorough overview of how to use the library.
1515

16+
## Spring Quick Guide
17+
The following provides some examples using the Spring Starter for this library. *Note that this library is not Spring specific as the main implementations are
18+
kept in the [core module](./java-dynamic-sqs-listener-core) which is framework agnostic.*
19+
20+
### Using the Spring Starter
21+
This guide will give a quick guide to getting started for Spring Boot using the Spring Stater.
22+
23+
Include the maven dependency in your Spring Boot pom.xml:
24+
```xml
25+
<dependency>
26+
<groupId>com.jashmore</groupId>
27+
<artifactId>java-dynamic-sqs-listener-spring-starter</artifactId>
28+
<version>${sqs.listener.version}</version>
29+
</dependency>
30+
```
31+
32+
In one of your beans, attach a
33+
[@QueueListener](./java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java)
34+
annotation to a method indicating that it should process messages from a queue.
35+
36+
```java
37+
@Service
38+
public class MyMessageListener {
39+
@QueueListener("${insert.queue.url.here}") // The queue here can point to your SQS server, e.g. a local SQS server or one on AWS
40+
public void processMessage(@Payload final String payload) {
41+
// process the message payload here
42+
}
43+
}
44+
```
45+
46+
This will use any user configured `SqsAsyncClient` in the application context for connecting to the queue, otherwise if none is defined, a default
47+
is provided that will look for AWS credentials/region from multiple areas, like the environment variables. See
48+
[How to connect to AWS SQS Queues](./doc/how-to-guides/how-to-connect-to-aws-sqs-queue.md) for information about connecting to an actual queue in SQS.
49+
1650
## Core Infrastructure
1751
This library has been divided into four main components each with distinct responsibilities. The following is a diagram describing a simple flow of a
1852
single SQS message flowing through each of the components to eventually be executed by some code.
@@ -49,41 +83,7 @@ for compatibility.
4983
- [Jackson Databind](https://github.com/FasterXML/jackson-databind): 2.9.10.3
5084
- [Spring Framework](java-dynamic-sqs-listener-spring)
5185
- All of the core dependencies
52-
- [Spring Boot](https://github.com/spring-projects/spring-boot): 2.2.5.RELEAS
53-
54-
## Spring Quick Guide
55-
The following provides some examples using the Spring Starter for this library. *Note that this library is not Spring specific as the main implementations are
56-
kept in the [core module](./java-dynamic-sqs-listener-core) which is framework agnostic.*
57-
58-
### Using the Spring Starter
59-
This guide will give a quick guide to getting started for Spring Boot using the Spring Stater.
60-
61-
Include the maven dependency in your Spring Boot pom.xml:
62-
```xml
63-
<dependency>
64-
<groupId>com.jashmore</groupId>
65-
<artifactId>java-dynamic-sqs-listener-spring-starter</artifactId>
66-
<version>${sqs.listener.version}</version>
67-
</dependency>
68-
```
69-
70-
In one of your beans, attach a
71-
[@QueueListener](./java-dynamic-sqs-listener-spring/java-dynamic-sqs-listener-spring-core/src/main/java/com/jashmore/sqs/spring/container/basic/QueueListener.java)
72-
annotation to a method indicating that it should process messages from a queue.
73-
74-
```java
75-
@Service
76-
public class MyMessageListener {
77-
@QueueListener("${insert.queue.url.here}") // The queue here can point to your SQS server, e.g. a local SQS server or one on AWS
78-
public void processMessage(@Payload final String payload) {
79-
// process the message payload here
80-
}
81-
}
82-
```
83-
84-
This will use any user configured `SqsAsyncClient` in the application context for connecting to the queue, otherwise if none is defined, a default
85-
is provided that will look for AWS credentials/region from multiple areas, like the environment variables. See
86-
[How to connect to AWS SQS Queues](./doc/how-to-guides/how-to-connect-to-aws-sqs-queue.md) for information about connecting to an actual queue in SQS.
86+
- [Spring Boot](https://github.com/spring-projects/spring-boot): 2.2.5.RELEASE
8787

8888
### How to Mark the message as successfully processed
8989
When the method executing the message finishes without throwing an exception, the
@@ -168,11 +168,11 @@ do the logic for converting the message payload to uppercase.
168168
}
169169
}
170170
```
171-
You may be curious why a custom `AnnotationUtils.findParameterAnnotation` function is used instead of getting the annotation directly from the parameter.
171+
You may be curious why a custom `AnnotationUtils.findParameterAnnotation` function is being used instead of getting the annotation directly from the parameter.
172172
The reason for this is due to potential proxying of beans in the application, such as by applying Aspects around your code via CGLIB. As libraries, like
173173
CGLIB, won't copy the annotations to the proxied classes the resolver needs to look through the class hierarchy to find the original class to get the
174174
annotations. For more information about this, take a look at the JavaDoc provided in
175-
[AnnotationUtils](./java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/util/annotation/AnnotationUtils.java). You can also see an example of
175+
[AnnotationUtils](./util/common-utils/src/main/java/com/jashmore/sqs/util/annotation/AnnotationUtils.java). You can also see an example of
176176
this problem being tested in
177177
[PayloadArgumentResolver_ProxyClassTest.java](./java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/argument/payload/PayloadArgumentResolver_ProxyClassTest.java).
178178
1. Include the custom [ArgumentResolver](./java-dynamic-sqs-listener-api/src/main/java/com/jashmore/sqs/argument/ArgumentResolver.java) in the application
@@ -248,7 +248,7 @@ If you want to see the difference between this library and others like the
248248
module. This allows you to test the performance and usage of each library for different scenarios, such as heavy IO message processing, etc.
249249

250250
### Other examples
251-
See [examples](./examples) for all of the other available examples.
251+
See [examples](./examples) for all the other available examples.
252252

253253
## Bugs and Feedback
254254
For bugs, questions and discussions please use [Github Issues](https://github.com/JaidenAshmore/java-dynamic-sqs-listener/issues).
@@ -279,4 +279,3 @@ See [CONTRIBUTING.md](./CONTRIBUTING.md) for more details.
279279
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
280280
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
281281
SOFTWARE.
282-

examples/spring-aws-example/pom.xml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,8 @@
99

1010
<artifactId>java-dynamic-sqs-listener-spring-aws-example</artifactId>
1111

12-
1312
<name>Java Dynamic SQS Listener - Spring Starter - AWS Example</name>
14-
<description>Contains examples for </description>
13+
<description>Contains examples for connecting to an actual AWS SQS Queue</description>
1514

1615
<properties>
1716
<spotbugs.config.location>../../configuration/spotbugs/bugsExcludeFilter.xml</spotbugs.config.location>

examples/spring-integration-test-example/pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
<dependency>
2121
<groupId>org.springframework.boot</groupId>
2222
<artifactId>spring-boot-starter</artifactId>
23-
<version>${spring.boot.version}</version>
2423
</dependency>
2524

2625
<dependency>
@@ -80,5 +79,12 @@
8079
<version>${project.version}</version>
8180
<scope>test</scope>
8281
</dependency>
82+
83+
<dependency>
84+
<groupId>com.jashmore</groupId>
85+
<artifactId>expected-test-exception</artifactId>
86+
<version>${project.version}</version>
87+
<scope>test</scope>
88+
</dependency>
8389
</dependencies>
8490
</project>

examples/spring-integration-test-example/src/test/java/it/com/jashmore/sqs/examples/integrationtests/SqsListenerExampleJunit4IntegrationTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import com.jashmore.sqs.examples.integrationtests.IntegrationTestExampleApplication;
1313
import com.jashmore.sqs.test.LocalSqsRule;
1414
import com.jashmore.sqs.test.PurgeQueuesRule;
15+
import com.jashmore.sqs.util.ExpectedTestException;
1516
import com.jashmore.sqs.util.LocalSqsAsyncClient;
1617
import com.jashmore.sqs.util.SqsQueuesConfig;
1718
import org.junit.ClassRule;
@@ -92,7 +93,7 @@ public void messageFailingToProcessWillBeProcessedAgain() throws Exception {
9293
final AtomicBoolean processedMessageOnce = new AtomicBoolean();
9394
doAnswer(invocationOnMock -> {
9495
if (!processedMessageOnce.getAndSet(true)) {
95-
throw new RuntimeException("Expected Test Exception");
96+
throw new ExpectedTestException();
9697
}
9798
messageReceivedCountDownLatch.countDown();
9899
return null;
@@ -113,7 +114,7 @@ public void messageThatContinuesToFailWillBePlacedIntoDlq() throws Exception {
113114
final String queueUrl = localSqsAsyncClient.getQueueUrl(QUEUE_NAME);
114115
doAnswer(invocationOnMock -> {
115116
messageReceivedCountDownLatch.countDown();
116-
throw new RuntimeException("Expected Test Exception");
117+
throw new ExpectedTestException();
117118
}).when(mockSomeService).run(anyString());
118119

119120
// act

examples/spring-integration-test-example/src/test/java/it/com/jashmore/sqs/examples/integrationtests/SqsListenerExampleJunit5IntegrationTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import com.jashmore.sqs.examples.integrationtests.IntegrationTestExampleApplication;
1313
import com.jashmore.sqs.test.LocalSqsExtension;
14+
import com.jashmore.sqs.util.ExpectedTestException;
1415
import com.jashmore.sqs.util.LocalSqsAsyncClient;
1516
import com.jashmore.sqs.util.SqsQueuesConfig;
1617
import org.junit.jupiter.api.Test;
@@ -87,7 +88,7 @@ void messageFailingToProcessWillBeProcessedAgain() throws Exception {
8788
final AtomicBoolean processedMessageOnce = new AtomicBoolean();
8889
doAnswer(invocationOnMock -> {
8990
if (!processedMessageOnce.getAndSet(true)) {
90-
throw new RuntimeException("Expected Test Exception");
91+
throw new ExpectedTestException();
9192
}
9293
messageReceivedCountDownLatch.countDown();
9394
return null;
@@ -108,7 +109,7 @@ void messageThatContinuesToFailWillBePlacedIntoDlq() throws Exception {
108109
final String queueUrl = localSqsAsyncClient.getQueueUrl(QUEUE_NAME);
109110
doAnswer(invocationOnMock -> {
110111
messageReceivedCountDownLatch.countDown();
111-
throw new RuntimeException("Expected Test Exception");
112+
throw new ExpectedTestException();
112113
}).when(mockSomeService).run(anyString());
113114

114115
// act

java-dynamic-sqs-listener-api/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,10 @@
3232
<groupId>javax.validation</groupId>
3333
<artifactId>validation-api</artifactId>
3434
</dependency>
35+
36+
<dependency>
37+
<groupId>com.google.guava</groupId>
38+
<artifactId>guava</artifactId>
39+
</dependency>
3540
</dependencies>
3641
</project>

java-dynamic-sqs-listener-core/pom.xml

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@
2626
<version>${project.version}</version>
2727
</dependency>
2828

29+
<dependency>
30+
<groupId>com.jashmore</groupId>
31+
<artifactId>common-utils</artifactId>
32+
<version>${project.version}</version>
33+
</dependency>
34+
2935
<dependency>
3036
<groupId>software.amazon.awssdk</groupId>
3137
<artifactId>sqs</artifactId>
@@ -72,9 +78,16 @@
7278
</dependency>
7379

7480
<dependency>
75-
<groupId>cglib</groupId>
76-
<artifactId>cglib</artifactId>
77-
<version>3.2.10</version>
81+
<groupId>com.jashmore</groupId>
82+
<artifactId>proxy-method-interceptor</artifactId>
83+
<version>${project.version}</version>
84+
<scope>test</scope>
85+
</dependency>
86+
87+
<dependency>
88+
<groupId>com.jashmore</groupId>
89+
<artifactId>expected-test-exception</artifactId>
90+
<version>${project.version}</version>
7891
<scope>test</scope>
7992
</dependency>
8093
</dependencies>
Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
package com.jashmore.sqs.retriever.prefetch.util;
1+
package com.jashmore.sqs.retriever.prefetch;
22

3-
import javafx.util.Pair;
43
import lombok.extern.slf4j.Slf4j;
54
import software.amazon.awssdk.services.sqs.model.Message;
65

@@ -31,7 +30,7 @@
3130
*/
3231
@Slf4j
3332
@ThreadSafe
34-
public class PrefetchingMessageFutureConsumerQueue {
33+
class PrefetchingMessageFutureConsumerQueue {
3534
private final Queue<CompletableFuture<Message>> futureQueue;
3635
private final Queue<Message> messageQueue;
3736
private final Integer messageCapacity;
@@ -45,7 +44,7 @@ public class PrefetchingMessageFutureConsumerQueue {
4544
* @param messageCapacity the maximum number of messages to batch before {@link #pushMessage(Message)} blocks until
4645
* {@link #pushCompletableFuture(CompletableFuture)} is called
4746
*/
48-
public PrefetchingMessageFutureConsumerQueue(final Integer messageCapacity) {
47+
PrefetchingMessageFutureConsumerQueue(final Integer messageCapacity) {
4948
this.futureQueue = new LinkedList<>();
5049
this.messageQueue = new LinkedList<>();
5150
this.messageCapacity = messageCapacity;
@@ -57,7 +56,7 @@ public PrefetchingMessageFutureConsumerQueue(final Integer messageCapacity) {
5756
*
5857
* @param completableFuture the future to include in the queue
5958
*/
60-
public void pushCompletableFuture(@Nonnull CompletableFuture<Message> completableFuture) {
59+
void pushCompletableFuture(@Nonnull CompletableFuture<Message> completableFuture) {
6160
final Message message;
6261
lock.lock();
6362
try {
@@ -89,7 +88,7 @@ public void pushCompletableFuture(@Nonnull CompletableFuture<Message> completabl
8988
* @param message the message to add
9089
* @throws InterruptedException if the thread was interrupted while waiting for the lock or adding a message onto the internal message queue
9190
*/
92-
public void pushMessage(@Nonnull final Message message) throws InterruptedException {
91+
void pushMessage(@Nonnull final Message message) throws InterruptedException {
9392
CompletableFuture<Message> completableFuture;
9493
lock.lockInterruptibly();
9594
try {
@@ -116,7 +115,7 @@ public void pushMessage(@Nonnull final Message message) throws InterruptedExcept
116115
*
117116
* @throws InterruptedException if the thread was interrupted while waiting for a slot
118117
*/
119-
public void blockUntilFreeSlotForMessage() throws InterruptedException {
118+
void blockUntilFreeSlotForMessage() throws InterruptedException {
120119
lock.lockInterruptibly();
121120
try {
122121
while (messageQueue.size() == messageCapacity) {
@@ -132,29 +131,26 @@ public void blockUntilFreeSlotForMessage() throws InterruptedException {
132131
*
133132
* @return the total messages batched
134133
*/
135-
public int getNumberOfBatchedMessages() {
134+
int getNumberOfBatchedMessages() {
136135
return messageQueue.size();
137136
}
138137

139138
/**
140-
* Drain the queues (thus emptying) and return them as a {@link Pair}.
141-
*
142-
* <p>Note that only a single queue should contain entries. I wanted this to return an "Either" but Java does not provide this type, I did't want
143-
* to import another library to provide this and I didn't want to implement it.
139+
* Drain the queues (thus emptying) and return the messages to be resolved as well as the messages that have not been used yet.
144140
*
145141
* @return the queues of futures and messages that were in this queue
146142
*/
147-
public Pair<Queue<CompletableFuture<Message>>, Queue<Message>> drain() {
143+
QueueDrain drain() {
148144
lock.lock();
149145
try {
150-
final LinkedList<CompletableFuture<Message>> copiedFutureList = new LinkedList<>(futureQueue);
151-
final LinkedList<Message> copiedMessageList = new LinkedList<>(messageQueue);
146+
final LinkedList<CompletableFuture<Message>> futuresWaitingForMessages = new LinkedList<>(futureQueue);
147+
final LinkedList<Message> messagesAvailableForProcessing = new LinkedList<>(messageQueue);
152148
futureQueue.clear();
153149
messageQueue.clear();
154-
return new Pair<>(
155-
copiedFutureList,
156-
copiedMessageList
157-
);
150+
return QueueDrain.builder()
151+
.futuresWaitingForMessages(futuresWaitingForMessages)
152+
.messagesAvailableForProcessing(messagesAvailableForProcessing)
153+
.build();
158154
} finally {
159155
lock.unlock();
160156
}

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/retriever/prefetch/PrefetchingMessageRetriever.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@
99
import com.jashmore.sqs.QueueProperties;
1010
import com.jashmore.sqs.aws.AwsConstants;
1111
import com.jashmore.sqs.retriever.MessageRetriever;
12-
import com.jashmore.sqs.retriever.prefetch.util.PrefetchingMessageFutureConsumerQueue;
1312
import com.jashmore.sqs.util.properties.PropertyUtils;
14-
import javafx.util.Pair;
1513
import lombok.extern.slf4j.Slf4j;
1614
import software.amazon.awssdk.core.exception.SdkClientException;
1715
import software.amazon.awssdk.core.exception.SdkInterruptedException;
@@ -24,7 +22,6 @@
2422
import java.util.LinkedList;
2523
import java.util.List;
2624
import java.util.ListIterator;
27-
import java.util.Queue;
2825
import java.util.concurrent.CompletableFuture;
2926
import java.util.concurrent.ExecutionException;
3027

@@ -144,11 +141,10 @@ public List<Message> run() {
144141
}
145142
}
146143

147-
final Pair<Queue<CompletableFuture<Message>>, Queue<Message>> pairQueue = pairConsumerQueue.drain();
148-
final Queue<CompletableFuture<Message>> extraThreads = pairQueue.getKey();
149-
extraThreads.forEach(future -> future.cancel(true));
144+
final QueueDrain pairQueue = pairConsumerQueue.drain();
145+
pairQueue.getFuturesWaitingForMessages().forEach(future -> future.cancel(true));
150146
return ImmutableList.<Message>builder()
151-
.addAll(pairQueue.getValue())
147+
.addAll(pairQueue.getMessagesAvailableForProcessing())
152148
.addAll(listsNotPublished)
153149
.build();
154150
}

0 commit comments

Comments
 (0)