Skip to content

Commit 7a789f4

Browse files
Merge pull request #113 from JaidenAshmore/issue/111_allow_local_sqs_client_dlq_name
refs #111: allow the naming of the DLQ queue names
2 parents 37d6bb2 + 07d7b47 commit 7a789f4

File tree

4 files changed

+232
-10
lines changed

4 files changed

+232
-10
lines changed

util/local-amazon-sqs/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,13 @@
3232
<groupId>org.slf4j</groupId>
3333
<artifactId>slf4j-api</artifactId>
3434
</dependency>
35+
36+
<dependency>
37+
<groupId>org.elasticmq</groupId>
38+
<artifactId>elasticmq-rest-sqs_2.11</artifactId>
39+
<version>0.13.9</version>
40+
<scope>test</scope>
41+
</dependency>
3542
</dependencies>
3643

3744
</project>

util/local-amazon-sqs/src/main/java/com/jashmore/sqs/util/LocalSqsAsyncClient.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.jashmore.sqs.util;
22

33
import static com.jashmore.sqs.util.SqsQueuesConfig.DEFAULT_SQS_SERVER_URL;
4+
import static com.jashmore.sqs.util.SqsQueuesConfig.QueueConfig.DEFAULT_MAX_RECEIVE_COUNT;
45

56
import com.google.common.collect.ImmutableMap;
67
import com.google.common.collect.Maps;
@@ -126,11 +127,14 @@ public void buildQueues() {
126127
attributesBuilder.put(QueueAttributeName.VISIBILITY_TIMEOUT, String.valueOf(queueConfig.getVisibilityTimeout()));
127128
}
128129

129-
if (queueConfig.getMaxReceiveCount() != null) {
130-
final String deadLetterQueueArn = createDeadLetterQueue(queueConfig.getQueueName());
130+
if (queueConfig.getMaxReceiveCount() != null || queueConfig.getDeadLetterQueueName() != null) {
131+
final String deadLetterQueueName = Optional.ofNullable(queueConfig.getDeadLetterQueueName()).orElse(queueConfig.getQueueName() + "-dlq");
132+
final int maxReceiveCount = Optional.ofNullable(queueConfig.getMaxReceiveCount())
133+
.orElse(DEFAULT_MAX_RECEIVE_COUNT);
134+
final String deadLetterQueueArn = createDeadLetterQueue(deadLetterQueueName);
131135
attributesBuilder.put(
132136
QueueAttributeName.REDRIVE_POLICY,
133-
String.format("{\"deadLetterTargetArn\":\"%s\",\"maxReceiveCount\":\"%d\"}", deadLetterQueueArn, queueConfig.getMaxReceiveCount())
137+
String.format("{\"deadLetterTargetArn\":\"%s\",\"maxReceiveCount\":\"%d\"}", deadLetterQueueArn, maxReceiveCount)
134138
);
135139
}
136140

@@ -148,15 +152,13 @@ public void buildQueues() {
148152
/**
149153
* Create a Dead Letter Queue that should be used for the queue with the provided name.
150154
*
151-
* <p>This will create a queue with "-dlq" appended to the original queue's name.
152-
*
153155
* @param queueName the name of the queue that this dead letter queue is for
154156
* @return the queue ARN of the dead letter queue created
155157
*/
156158
private String createDeadLetterQueue(final String queueName) {
157159
try {
158-
log.debug("Creating local queue: {}-dlq", queueName);
159-
return delegate.createQueue((builder -> builder.queueName(queueName + "-dlq")))
160+
log.debug("Creating dead letter queue: {}", queueName);
161+
return delegate.createQueue((builder -> builder.queueName(queueName)))
160162
.thenCompose(createQueueResponse -> delegate.getQueueAttributes(builder -> builder
161163
.queueUrl(createQueueResponse.queueUrl())
162164
.attributeNames(QueueAttributeName.QUEUE_ARN))

util/local-amazon-sqs/src/main/java/com/jashmore/sqs/util/SqsQueuesConfig.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,22 +34,31 @@ public class SqsQueuesConfig {
3434
@Value
3535
@Builder
3636
public static class QueueConfig {
37+
public static final int DEFAULT_MAX_RECEIVE_COUNT = 3;
3738

3839
/**
3940
* The name of the queue.
4041
*/
4142
private final String queueName;
4243

44+
/**
45+
* The name of the dead letter queue that should be created and linked to the queue.
46+
*
47+
* <p>If this value is null a dead letter queue will not be created. However if {@link #maxReceiveCount} is not null than a dead letter queue will
48+
* be created with a default name of "{queueName}-dlq".
49+
*/
50+
private final String deadLetterQueueName;
51+
4352
/**
4453
* The amount of time messages should be visible before putting back onto the queue.
4554
*/
4655
private final Integer visibilityTimeout;
4756

4857
/**
49-
* The amount of times that a message can be retrieved before it should be plcaed into the Dead Letter Queue.
58+
* The amount of times that a message can be retrieved before it should be placed into the Dead Letter Queue.
5059
*
51-
* <p>If this value is non null a dead letter queue will be created and a redrive policy linked to it with this max receive count. If this value is null
52-
* no dead letter queue will be created and attached.
60+
* <p>If this value is non null a dead letter queue with a name of {@link #deadLetterQueueName} or "{queueName}-dlq" will be created and a
61+
* re-drive policy linked to it with this max receive count.
5362
*/
5463
private final Integer maxReceiveCount;
5564
}
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
package com.jashmore.sqs.util;
2+
3+
import static org.assertj.core.api.Assertions.assertThat;
4+
5+
import akka.http.scaladsl.Http;
6+
import org.elasticmq.rest.sqs.SQSRestServer;
7+
import org.elasticmq.rest.sqs.SQSRestServerBuilder;
8+
import org.junit.Before;
9+
import org.junit.Rule;
10+
import org.junit.Test;
11+
import org.mockito.junit.MockitoJUnit;
12+
import org.mockito.junit.MockitoRule;
13+
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
14+
import software.amazon.awssdk.services.sqs.model.ListQueuesResponse;
15+
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
16+
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
17+
18+
import java.util.concurrent.TimeUnit;
19+
20+
public class LocalSqsAsyncClientTest {
21+
@Rule
22+
public MockitoRule mockitoRule = MockitoJUnit.rule();
23+
24+
private String queueServerUrl;
25+
26+
@Before
27+
public void setUp() {
28+
final SQSRestServer sqsRestServer = SQSRestServerBuilder
29+
.withInterface("localhost")
30+
.withDynamicPort()
31+
.start();
32+
33+
final Http.ServerBinding serverBinding = sqsRestServer.waitUntilStarted();
34+
queueServerUrl = "http://localhost:" + serverBinding.localAddress().getPort();
35+
}
36+
37+
@Test
38+
public void whenBuildingLocalSqsAsyncClientQueuesAndDlqsWillBeCreatedAutomatically() throws Exception {
39+
// arrange
40+
final SqsQueuesConfig queuesConfig = SqsQueuesConfig.builder()
41+
.sqsServerUrl(queueServerUrl)
42+
.queue(SqsQueuesConfig.QueueConfig.builder()
43+
.queueName("queueName")
44+
.deadLetterQueueName("queueNameDlq")
45+
.build())
46+
.queue(SqsQueuesConfig.QueueConfig.builder()
47+
.queueName("queueName2")
48+
.deadLetterQueueName("queueName2Dlq")
49+
.build())
50+
.build();
51+
final LocalSqsAsyncClient sqsAsyncClient = new LocalSqsAsyncClient(queuesConfig);
52+
53+
// act
54+
sqsAsyncClient.buildQueues();
55+
56+
// assert
57+
final ListQueuesResponse listQueuesResponse = sqsAsyncClient.listQueues().get();
58+
assertThat(listQueuesResponse.queueUrls()).hasSize(4);
59+
}
60+
61+
@Test
62+
public void whenDeadLetterQueueIsBuiltItIsLinkedToCorrespondingQueue() throws Exception {
63+
// arrange
64+
final SqsQueuesConfig queuesConfig = SqsQueuesConfig.builder()
65+
.sqsServerUrl(queueServerUrl)
66+
.queue(SqsQueuesConfig.QueueConfig.builder()
67+
.queueName("queueName")
68+
.deadLetterQueueName("queueNameDlq")
69+
.build())
70+
.build();
71+
final LocalSqsAsyncClient sqsAsyncClient = new LocalSqsAsyncClient(queuesConfig);
72+
73+
// act
74+
sqsAsyncClient.buildQueues();
75+
76+
// assert
77+
final String reDrivePolicy = sqsAsyncClient.getQueueAttributes(GetQueueAttributesRequest.builder()
78+
.queueUrl(queueServerUrl + "/queue/queueName")
79+
.attributeNames(QueueAttributeName.REDRIVE_POLICY)
80+
.build())
81+
.thenApply(getQueueAttributesResponse -> getQueueAttributesResponse.attributes().get(QueueAttributeName.REDRIVE_POLICY))
82+
.get();
83+
assertThat(reDrivePolicy).contains("queueNameDlq");
84+
}
85+
86+
@Test
87+
public void whenMaxReceiveCountUsedAndNoDeadLetterQueueNameIsIncludedTheDefaultNameIsused() throws Exception {
88+
// arrange
89+
final SqsQueuesConfig queuesConfig = SqsQueuesConfig.builder()
90+
.sqsServerUrl(queueServerUrl)
91+
.queue(SqsQueuesConfig.QueueConfig.builder()
92+
.queueName("queueName")
93+
.maxReceiveCount(3)
94+
.build())
95+
.build();
96+
final LocalSqsAsyncClient sqsAsyncClient = new LocalSqsAsyncClient(queuesConfig);
97+
98+
// act
99+
sqsAsyncClient.buildQueues();
100+
101+
// assert
102+
final ListQueuesResponse listQueuesResponse = sqsAsyncClient.listQueues().get();
103+
assertThat(listQueuesResponse.queueUrls()).hasSize(2);
104+
assertThat(listQueuesResponse.queueUrls()).contains(queueServerUrl + "/queue/queueName-dlq");
105+
}
106+
107+
@Test
108+
public void maxReceiveCountIsIncludedInQueue() throws Exception {
109+
// arrange
110+
final SqsQueuesConfig queuesConfig = SqsQueuesConfig.builder()
111+
.sqsServerUrl(queueServerUrl)
112+
.queue(SqsQueuesConfig.QueueConfig.builder()
113+
.queueName("queueName")
114+
.maxReceiveCount(3)
115+
.build())
116+
.build();
117+
final LocalSqsAsyncClient sqsAsyncClient = new LocalSqsAsyncClient(queuesConfig);
118+
119+
// act
120+
sqsAsyncClient.buildQueues();
121+
122+
// assert
123+
final String reDrivePolicy = sqsAsyncClient.getQueueAttributes(GetQueueAttributesRequest.builder()
124+
.queueUrl(queueServerUrl + "/queue/queueName")
125+
.attributeNames(QueueAttributeName.REDRIVE_POLICY)
126+
.build())
127+
.thenApply(getQueueAttributesResponse -> getQueueAttributesResponse.attributes().get(QueueAttributeName.REDRIVE_POLICY))
128+
.get();
129+
assertThat(reDrivePolicy).contains("\"maxReceiveCount\":3");
130+
}
131+
132+
@Test
133+
public void visibilityTimeoutIsIncludedInQueueWhenBuilt() throws Exception {
134+
// arrange
135+
final SqsQueuesConfig queuesConfig = SqsQueuesConfig.builder()
136+
.sqsServerUrl(queueServerUrl)
137+
.queue(SqsQueuesConfig.QueueConfig.builder()
138+
.queueName("queueName")
139+
.visibilityTimeout(60)
140+
.build())
141+
.build();
142+
final LocalSqsAsyncClient sqsAsyncClient = new LocalSqsAsyncClient(queuesConfig);
143+
144+
// act
145+
sqsAsyncClient.buildQueues();
146+
147+
// assert
148+
final String visibilityTimeout = sqsAsyncClient.getQueueAttributes(GetQueueAttributesRequest.builder()
149+
.queueUrl(queueServerUrl + "/queue/queueName")
150+
.attributeNames(QueueAttributeName.VISIBILITY_TIMEOUT)
151+
.build())
152+
.thenApply(getQueueAttributesResponse -> getQueueAttributesResponse.attributes().get(QueueAttributeName.VISIBILITY_TIMEOUT))
153+
.get();
154+
assertThat(visibilityTimeout).contains("60");
155+
}
156+
157+
@Test
158+
public void sendingMessagesToLocalQueueViaNameWillSendMessagesToCorrectQueue() throws Exception {
159+
// arrange
160+
final SqsQueuesConfig queuesConfig = SqsQueuesConfig.builder()
161+
.sqsServerUrl(queueServerUrl)
162+
.queue(SqsQueuesConfig.QueueConfig.builder()
163+
.queueName("queueName")
164+
.visibilityTimeout(60)
165+
.build())
166+
.build();
167+
final LocalSqsAsyncClient sqsAsyncClient = new LocalSqsAsyncClient(queuesConfig);
168+
sqsAsyncClient.buildQueues();
169+
170+
// act
171+
sqsAsyncClient.sendMessageToLocalQueue("queueName", SendMessageRequest.builder().messageBody("payload").build()).get(30, TimeUnit.SECONDS);
172+
sqsAsyncClient.sendMessageToLocalQueue("queueName", "payload2").get(30, TimeUnit.SECONDS);
173+
sqsAsyncClient.sendMessageToLocalQueue("queueName", (builder) -> builder.messageBody("payload3")).get(30, TimeUnit.SECONDS);
174+
175+
// assert
176+
final String approximateNumberOfMessages = sqsAsyncClient.getQueueAttributes(GetQueueAttributesRequest.builder()
177+
.queueUrl(queueServerUrl + "/queue/queueName")
178+
.attributeNames(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES)
179+
.build())
180+
.thenApply(response -> response.attributes().get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES))
181+
.get(30, TimeUnit.SECONDS);
182+
assertThat(approximateNumberOfMessages).isEqualTo("3");
183+
}
184+
185+
@Test
186+
public void queueNameCanBeUsedToGetQueueUrl() {
187+
// arrange
188+
final SqsQueuesConfig queuesConfig = SqsQueuesConfig.builder()
189+
.sqsServerUrl(queueServerUrl)
190+
.queue(SqsQueuesConfig.QueueConfig.builder()
191+
.queueName("queueName")
192+
.visibilityTimeout(60)
193+
.build())
194+
.build();
195+
final LocalSqsAsyncClient sqsAsyncClient = new LocalSqsAsyncClient(queuesConfig);
196+
sqsAsyncClient.buildQueues();
197+
198+
// act
199+
final String queueUrl = sqsAsyncClient.getQueueUrl("queueName");
200+
201+
// assert
202+
assertThat(queueUrl).isEqualTo(queueServerUrl + "/queue/queueName");
203+
}
204+
}

0 commit comments

Comments
 (0)