Skip to content

Commit bdb54f6

Browse files
Merge pull request #86 from JaidenAshmore/issue/85_queue_listener_methods_can_return_completable_futures
refs #85: added the ability for queue listener methods to return a completable future
2 parents 5a4052c + c0a5ead commit bdb54f6

File tree

4 files changed

+185
-62
lines changed

4 files changed

+185
-62
lines changed

java-dynamic-sqs-listener-api/src/main/java/com/jashmore/sqs/processor/MessageProcessor.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,28 @@
33
import com.jashmore.sqs.argument.ArgumentResolverService;
44
import com.jashmore.sqs.broker.MessageBroker;
55
import com.jashmore.sqs.processor.argument.Acknowledge;
6+
import com.jashmore.sqs.resolver.MessageResolver;
67
import software.amazon.awssdk.services.sqs.model.Message;
78

9+
import java.util.concurrent.CompletableFuture;
810
import javax.annotation.concurrent.ThreadSafe;
911

1012
/**
1113
* Processor that has the responsibility of taking a message and processing it via the required message consumer (Java method).
1214
*
13-
* <p>This would therefore need to know what message consumer (method) should be executed for this message and how to execute it.
14-
* During the message processing it may need to extract data out of the message to populate the parameters of the method with arguments
15-
* fulfilling what is required. For example, an argument may require a parameter to contain the message id of the message
15+
* <p>This would therefore need to know what message consumer (method) should be executed for this message and how to execute it. The default
16+
* behaviour of the processor should be if the method processing the method completes without an exception, the message should be resolved by
17+
* a {@link MessageResolver}. However, if the method throws an exception the message processing will be considered a failure and will not be resolved.
18+
*
19+
* <p>Instead of the approach above of using an exception to consider failures you can have the Java method return a {@link CompletableFuture} and
20+
* if that is resolved the message should be considered completed successfully and should be resolved. However, if the {@link CompletableFuture}
21+
* is rejected the message should not be resolved, just the same as if an exception was thrown.
22+
*
23+
* <p>However, if an {@link Acknowledge} parameter is included in the method signature, neither scenarios above should resolve the message and only
24+
* calls to {@link Acknowledge#acknowledgeSuccessful()} should resolve the message.
25+
*
26+
* <p>The parameters of the function will need to extract data out of the message and the implementations of this {@link MessageProcessor} should use
27+
* the {@link ArgumentResolverService}. For example, an argument may require a parameter to contain the message id of the message
1628
* and therefore this would handle populating the argument for this parameter with this value.
1729
*
1830
* <p>Most arguments would be able to be resolved via the {@link ArgumentResolverService}, however, the following arguments must be resolved via

java-dynamic-sqs-listener-core/src/main/java/com/jashmore/sqs/processor/DefaultMessageProcessor.java

Lines changed: 61 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212

1313
import java.lang.reflect.Method;
1414
import java.lang.reflect.Parameter;
15-
import java.util.concurrent.atomic.AtomicBoolean;
15+
import java.util.Arrays;
16+
import java.util.concurrent.CompletableFuture;
17+
import java.util.concurrent.ExecutionException;
1618
import java.util.stream.IntStream;
1719
import javax.annotation.concurrent.ThreadSafe;
1820

@@ -31,16 +33,67 @@ public class DefaultMessageProcessor implements MessageProcessor {
3133

3234
@Override
3335
public void processMessage(final Message message) throws MessageProcessingException {
34-
final Parameter[] parameters = messageConsumerMethod.getParameters();
35-
36-
final AtomicBoolean hasAcknowledgeField = new AtomicBoolean();
3736
final Acknowledge acknowledge = () -> messageResolver.resolveMessage(message);
38-
final Object[] arguments = IntStream.range(0, parameters.length)
37+
final Object[] arguments = getArguments(acknowledge, message);
38+
39+
final Object result;
40+
try {
41+
result = messageConsumerMethod.invoke(messageConsumerBean, arguments);
42+
} catch (final Throwable throwable) {
43+
throw new MessageProcessingException("Error processing message", throwable);
44+
}
45+
46+
if (hasAcknowledgeParameter()) {
47+
// If the method has the Acknowledge parameter it is up to them to resolve the message
48+
return;
49+
}
50+
51+
final Class<?> returnType = messageConsumerMethod.getReturnType();
52+
if (CompletableFuture.class.isAssignableFrom(returnType)) {
53+
final CompletableFuture<?> resultCompletableFuture = (CompletableFuture) result;
54+
55+
if (resultCompletableFuture == null) {
56+
throw new MessageProcessingException("Method returns CompletableFuture but null was returned");
57+
}
58+
59+
try {
60+
resultCompletableFuture
61+
.thenAccept((ignored) -> acknowledge.acknowledgeSuccessful())
62+
.get();
63+
} catch (final InterruptedException interruptedException) {
64+
Thread.currentThread().interrupt();
65+
throw new MessageProcessingException("Thread interrupted while processing message");
66+
} catch (final ExecutionException executionException) {
67+
throw new MessageProcessingException("Error processing message", executionException.getCause());
68+
}
69+
} else {
70+
acknowledge.acknowledgeSuccessful();
71+
}
72+
}
73+
74+
private boolean hasAcknowledgeParameter() {
75+
return Arrays.stream(messageConsumerMethod.getParameters())
76+
.anyMatch(DefaultMessageProcessor::isAcknowledgeParameter);
77+
}
78+
79+
private static boolean isAcknowledgeParameter(final Parameter parameter) {
80+
return Acknowledge.class.isAssignableFrom(parameter.getType());
81+
}
82+
83+
/**
84+
* Get the arguments for the method for the message that is being processed.
85+
*
86+
* @param acknowledge the acknowledge object that should be used if a parameter is an {@link Acknowledge}
87+
* @param message the message to populate the arguments from
88+
* @return the array of arguments to call the method with
89+
*/
90+
private Object[] getArguments(final Acknowledge acknowledge, final Message message) {
91+
final Parameter[] parameters = messageConsumerMethod.getParameters();
92+
return IntStream.range(0, parameters.length)
3993
.mapToObj(parameterIndex -> {
40-
final Parameter parameter = messageConsumerMethod.getParameters()[parameterIndex];
94+
final Parameter parameter = parameters[parameterIndex];
4195

42-
if (Acknowledge.class.isAssignableFrom(parameter.getType())) {
43-
hasAcknowledgeField.set(true);
96+
if (isAcknowledgeParameter(parameter)) {
4497
return acknowledge;
4598
}
4699

@@ -58,15 +111,5 @@ public void processMessage(final Message message) throws MessageProcessingExcept
58111
})
59112
.toArray(Object[]::new);
60113

61-
try {
62-
messageConsumerMethod.invoke(messageConsumerBean, arguments);
63-
} catch (final Throwable throwable) {
64-
throw new MessageProcessingException("Error processing message", throwable);
65-
}
66-
67-
// If the method doesn't consume the Acknowledge field, it will acknowledge the method here on success
68-
if (!hasAcknowledgeField.get()) {
69-
acknowledge.acknowledgeSuccessful();
70-
}
71114
}
72115
}

java-dynamic-sqs-listener-core/src/test/java/com/jashmore/sqs/processor/DefaultMessageProcessorTest.java

Lines changed: 107 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import static org.mockito.ArgumentMatchers.eq;
77
import static org.mockito.Mockito.mock;
88
import static org.mockito.Mockito.never;
9+
import static org.mockito.Mockito.timeout;
910
import static org.mockito.Mockito.times;
1011
import static org.mockito.Mockito.verify;
1112
import static org.mockito.Mockito.when;
@@ -19,21 +20,32 @@
1920
import com.jashmore.sqs.resolver.MessageResolver;
2021
import org.junit.Rule;
2122
import org.junit.Test;
23+
import org.junit.rules.ExpectedException;
2224
import org.mockito.ArgumentMatchers;
2325
import org.mockito.Mock;
2426
import org.mockito.junit.MockitoJUnit;
2527
import org.mockito.junit.MockitoRule;
2628
import software.amazon.awssdk.services.sqs.model.Message;
2729

2830
import java.lang.reflect.Method;
31+
import java.util.concurrent.CompletableFuture;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicBoolean;
2934

35+
@SuppressWarnings( {"WeakerAccess", "unused"})
3036
public class DefaultMessageProcessorTest {
31-
private static final String QUEUE_URL = "queueUrl";
37+
private static final QueueProperties QUEUE_PROPERTIES = QueueProperties
38+
.builder()
39+
.queueUrl("queueUrl")
40+
.build();
3241
private static final DefaultMessageProcessorTest BEAN = new DefaultMessageProcessorTest();
3342

3443
@Rule
3544
public MockitoRule mockitoRule = MockitoJUnit.rule();
3645

46+
@Rule
47+
public ExpectedException expectedException = ExpectedException.none();
48+
3749
@Mock
3850
private ArgumentResolverService argumentResolverService;
3951

@@ -44,34 +56,26 @@ public class DefaultMessageProcessorTest {
4456
public void forEachParameterInMethodTheArgumentIsResolved() {
4557
// arrange
4658
final Method method = getMethodWithAcknowledge();
47-
final QueueProperties queueProperties = QueueProperties
48-
.builder()
49-
.queueUrl(QUEUE_URL)
50-
.build();
5159
final Message message = Message.builder().build();
52-
final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, queueProperties, messageResolver, method, BEAN);
53-
when(argumentResolverService.resolveArgument(eq(queueProperties), any(MethodParameter.class), eq(message)))
60+
final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, BEAN);
61+
when(argumentResolverService.resolveArgument(eq(QUEUE_PROPERTIES), any(MethodParameter.class), eq(message)))
5462
.thenReturn("payload")
5563
.thenReturn("payload2");
5664

5765
// act
5866
processor.processMessage(message);
5967

6068
// assert
61-
verify(argumentResolverService, times(2)).resolveArgument(eq(queueProperties), any(MethodParameter.class), eq(message));
69+
verify(argumentResolverService, times(2)).resolveArgument(eq(QUEUE_PROPERTIES), any(MethodParameter.class), eq(message));
6270
}
6371

6472
@Test
6573
public void anyParameterUnableToBeResolvedWillThrowAnError() {
6674
// arrange
6775
final Method method = getMethodWithAcknowledge();
68-
final QueueProperties queueProperties = QueueProperties
69-
.builder()
70-
.queueUrl(QUEUE_URL)
71-
.build();
7276
final Message message = Message.builder().build();
73-
final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, queueProperties, messageResolver, method, BEAN);
74-
when(argumentResolverService.resolveArgument(eq(queueProperties), any(MethodParameter.class), eq(message)))
77+
final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, BEAN);
78+
when(argumentResolverService.resolveArgument(eq(QUEUE_PROPERTIES), any(MethodParameter.class), eq(message)))
7579
.thenThrow(new ArgumentResolutionException("Error resolving"));
7680

7781
// act
@@ -89,13 +93,9 @@ public void methodWillBeInvokedWithArgumentsResolved() {
8993
// arrange
9094
final Method method = getMethodWithAcknowledge();
9195
final DefaultMessageProcessorTest mockProcessor = mock(DefaultMessageProcessorTest.class);
92-
final QueueProperties queueProperties = QueueProperties
93-
.builder()
94-
.queueUrl(QUEUE_URL)
95-
.build();
9696
final Message message = Message.builder().build();
97-
final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, queueProperties, messageResolver, method, mockProcessor);
98-
when(argumentResolverService.resolveArgument(eq(queueProperties), any(MethodParameter.class), eq(message)))
97+
final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, mockProcessor);
98+
when(argumentResolverService.resolveArgument(eq(QUEUE_PROPERTIES), any(MethodParameter.class), eq(message)))
9999
.thenReturn("payload")
100100
.thenReturn("payload2");
101101

@@ -110,13 +110,9 @@ public void methodWillBeInvokedWithArgumentsResolved() {
110110
public void methodWithAcknowledgeParameterWillNotDeleteMessageOnSuccess() {
111111
// arrange
112112
final Method method = getMethodWithAcknowledge();
113-
final QueueProperties queueProperties = QueueProperties
114-
.builder()
115-
.queueUrl(QUEUE_URL)
116-
.build();
117113
final Message message = Message.builder().build();
118-
final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, queueProperties, messageResolver, method, BEAN);
119-
when(argumentResolverService.resolveArgument(eq(queueProperties), any(MethodParameter.class), eq(message)))
114+
final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, BEAN);
115+
when(argumentResolverService.resolveArgument(eq(QUEUE_PROPERTIES), any(MethodParameter.class), eq(message)))
120116
.thenReturn("payload");
121117

122118
// act
@@ -130,13 +126,9 @@ public void methodWithAcknowledgeParameterWillNotDeleteMessageOnSuccess() {
130126
public void methodWithoutAcknowledgeParameterWillDeleteMessageOnSuccess() {
131127
// arrange
132128
final Method method = getMethodWithNoAcknowledge();
133-
final QueueProperties queueProperties = QueueProperties
134-
.builder()
135-
.queueUrl(QUEUE_URL)
136-
.build();
137129
final Message message = Message.builder().receiptHandle("handle").build();
138-
final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, queueProperties, messageResolver, method, BEAN);
139-
when(argumentResolverService.resolveArgument(eq(queueProperties), any(MethodParameter.class), eq(message)))
130+
final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, BEAN);
131+
when(argumentResolverService.resolveArgument(eq(QUEUE_PROPERTIES), any(MethodParameter.class), eq(message)))
140132
.thenReturn("payload");
141133

142134
// act
@@ -151,13 +143,9 @@ public void methodWithoutAcknowledgeParameterWillDeleteMessageOnSuccess() {
151143
public void methodWithoutAcknowledgeThatThrowsExceptionDoesNotDeleteMessage() {
152144
// arrange
153145
final Method method = getMethodThatThrowsException();
154-
final QueueProperties queueProperties = QueueProperties
155-
.builder()
156-
.queueUrl(QUEUE_URL)
157-
.build();
158146
final Message message = Message.builder().receiptHandle("handle").build();
159-
final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, queueProperties, messageResolver, method, BEAN);
160-
when(argumentResolverService.resolveArgument(eq(queueProperties), any(MethodParameter.class), eq(message)))
147+
final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, BEAN);
148+
when(argumentResolverService.resolveArgument(eq(QUEUE_PROPERTIES), any(MethodParameter.class), eq(message)))
161149
.thenReturn("payload");
162150

163151
// act
@@ -170,17 +158,95 @@ public void methodWithoutAcknowledgeThatThrowsExceptionDoesNotDeleteMessage() {
170158
}
171159
}
172160

173-
@SuppressWarnings("unused")
161+
@Test
162+
public void methodReturningCompletableFutureWillResolveMessageWhenFutureResolved() throws Exception {
163+
// arrange
164+
final Method method = DefaultMessageProcessorTest.class.getMethod("methodReturningCompletableFuture", CompletableFuture.class);
165+
final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, BEAN);
166+
final Message message = Message.builder().receiptHandle("handle").build();
167+
final CompletableFuture<Object> future = new CompletableFuture<>();
168+
when(argumentResolverService.resolveArgument(eq(QUEUE_PROPERTIES), any(MethodParameter.class), eq(message)))
169+
.thenReturn(future);
170+
171+
// act
172+
CompletableFuture.runAsync(() -> processor.processMessage(message));
173+
174+
// assert
175+
verify(messageResolver, never()).resolveMessage(message);
176+
future.complete("value");
177+
verify(messageResolver, timeout(100)).resolveMessage(message);
178+
}
179+
180+
@Test
181+
public void methodReturningCompletableFutureWillNotResolveMessageWhenFutureRejected() throws Exception {
182+
// arrange
183+
final Method method = DefaultMessageProcessorTest.class.getMethod("methodReturningCompletableFuture", CompletableFuture.class);
184+
final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, BEAN);
185+
final Message message = Message.builder().receiptHandle("handle").build();
186+
final CompletableFuture<Object> future = new CompletableFuture<>();
187+
when(argumentResolverService.resolveArgument(eq(QUEUE_PROPERTIES), any(MethodParameter.class), eq(message)))
188+
.thenReturn(future);
189+
190+
// act
191+
CompletableFuture.runAsync(() -> processor.processMessage(message));
192+
future.completeExceptionally(new RuntimeException("Excepted Exception"));
193+
Thread.sleep(100); // make sure it doesn't resolve
194+
195+
// assert
196+
verify(messageResolver, never()).resolveMessage(message);
197+
}
198+
199+
@Test
200+
public void methodReturningCompletableFutureThatReturnsNullWillThrowMessageProcessingException() throws Exception {
201+
// arrange
202+
final Method method = DefaultMessageProcessorTest.class.getMethod("methodReturningCompletableFuture", CompletableFuture.class);
203+
final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, BEAN);
204+
final Message message = Message.builder().receiptHandle("handle").build();
205+
when(argumentResolverService.resolveArgument(eq(QUEUE_PROPERTIES), any(MethodParameter.class), eq(message)))
206+
.thenReturn(null);
207+
expectedException.expect(MessageProcessingException.class);
208+
209+
// act
210+
processor.processMessage(message);
211+
}
212+
213+
@Test
214+
public void threadInterruptedWhileGettingMessageWillThrowException() throws Exception {
215+
// arrange
216+
final Method method = DefaultMessageProcessorTest.class.getMethod("methodReturningCompletableFuture", CompletableFuture.class);
217+
final MessageProcessor processor = new DefaultMessageProcessor(argumentResolverService, QUEUE_PROPERTIES, messageResolver, method, BEAN);
218+
final Message message = Message.builder().receiptHandle("handle").build();
219+
final CompletableFuture<Object> future = new CompletableFuture<>();
220+
when(argumentResolverService.resolveArgument(eq(QUEUE_PROPERTIES), any(MethodParameter.class), eq(message)))
221+
.thenReturn(future);
222+
223+
// act
224+
final AtomicBoolean exceptionThrown = new AtomicBoolean(false);
225+
CompletableFuture.runAsync(() -> {
226+
try {
227+
Thread.currentThread().interrupt(); // interrupt the thread so the InterruptedException will be thrown while blocking on the future
228+
processor.processMessage(message);
229+
} catch (final MessageProcessingException messageProcessingException) {
230+
exceptionThrown.set(true);
231+
}
232+
}).get(1, TimeUnit.SECONDS);
233+
234+
// assert
235+
assertThat(exceptionThrown).isTrue();
236+
}
237+
174238
public void methodWithNoAcknowledge(@Payload String payload, @Payload String payloadTwo) {
175239

176240
}
177241

178-
@SuppressWarnings( {"unused", "WeakerAccess"})
179242
public void methodWithAcknowledge(Acknowledge acknowledge, @Payload String payload, @Payload String payloadTwo) {
180243

181244
}
182245

183-
@SuppressWarnings("unused")
246+
public CompletableFuture<?> methodReturningCompletableFuture(CompletableFuture<?> futureToReturn) {
247+
return futureToReturn;
248+
}
249+
184250
public void methodThatThrowsException(@Payload String payload) {
185251
throw new RuntimeException("error");
186252
}

0 commit comments

Comments
 (0)