Skip to content

Commit bab8505

Browse files
garyrussellartembilan
authored andcommitted
INT-4371: Add MessageSource Acknowlegment support
JIRA: https://jira.spring.io/browse/INT-4371 Add an abstract message source that populates the message with a header containing an abstraction that allows the application to acknowledge/reject/requeue the message. (In amqp, for example, this would map to channel.basicAck(tag), channel.basicReject(tag, false), and channel.basicReject(tag, true). With some brokers (such as kafka), acknowledge and reject might be equivalent since there is no automatic DLQ processing in kafka both would simply commit the offset. Also include a strategy interface for a factory to provide the acknowledgment header. Add MessageSourcePollableChannel Fix typo Polishing - PR Comments - remove marker interface; just use presence of the header for auto-ack - remove the MessageSourcePollableChannel - users can use a MethodInterceptor to enhance the message - other PR comments Save entire response to allow customization of the ack/nack actions. Docs, Copyrights; add MessageSourcePollingTemplate Polishing - PR Comments - support disabling auto-ack, when possible. More Utilities * Simple code style polishing
1 parent 5ad304a commit bab8505

File tree

17 files changed

+1296
-11
lines changed

17 files changed

+1296
-11
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/Amqp.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2014-2016 the original author or authors.
2+
* Copyright 2014-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,12 +22,14 @@
2222
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
2323
import org.springframework.amqp.rabbit.listener.DirectMessageListenerContainer;
2424
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
25+
import org.springframework.integration.amqp.inbound.AmqpMessageSource.AmqpAckCallbackFactory;
2526

2627
/**
2728
* Factory class for AMQP components.
2829
*
2930
* @author Artem Bilan
3031
* @author Gary Russell
32+
*
3133
* @since 5.0
3234
*/
3335
public final class Amqp {
@@ -53,6 +55,7 @@ public static AmqpInboundGatewaySMLCSpec inboundGateway(ConnectionFactory connec
5355
*/
5456
public static AmqpInboundGatewaySMLCSpec inboundGateway(ConnectionFactory connectionFactory, AmqpTemplate amqpTemplate,
5557
String... queueNames) {
58+
5659
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(connectionFactory);
5760
listenerContainer.setQueueNames(queueNames);
5861
return inboundGateway(listenerContainer, amqpTemplate);
@@ -80,6 +83,7 @@ public static AmqpInboundGatewaySMLCSpec inboundGateway(ConnectionFactory connec
8083
*/
8184
public static AmqpInboundGatewaySMLCSpec inboundGateway(ConnectionFactory connectionFactory, AmqpTemplate amqpTemplate,
8285
Queue... queues) {
86+
8387
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(connectionFactory);
8488
listenerContainer.setQueues(queues);
8589
return inboundGateway(listenerContainer, amqpTemplate);
@@ -112,6 +116,7 @@ public static AmqpInboundGatewaySMLCSpec inboundGateway(SimpleMessageListenerCon
112116
*/
113117
public static AmqpInboundGatewaySMLCSpec inboundGateway(SimpleMessageListenerContainer listenerContainer,
114118
AmqpTemplate amqpTemplate) {
119+
115120
return new AmqpInboundGatewaySMLCSpec(listenerContainer, amqpTemplate);
116121
}
117122

@@ -142,9 +147,37 @@ public static AmqpInboundGatewayDMLCSpec inboundGateway(DirectMessageListenerCon
142147
*/
143148
public static AmqpInboundGatewayDMLCSpec inboundGateway(DirectMessageListenerContainer listenerContainer,
144149
AmqpTemplate amqpTemplate) {
150+
145151
return new AmqpInboundGatewayDMLCSpec(listenerContainer, amqpTemplate);
146152
}
147153

154+
/**
155+
* Create an initial AmqpInboundPolledChannelAdapterSpec
156+
* @param connectionFactory the connectionFactory.
157+
* @param queue the queue.
158+
* @return the AmqpInboundPolledChannelAdapterSpec.
159+
* @since 5.0.1
160+
*/
161+
public static AmqpInboundPolledChannelAdapterSpec inboundPolledAdapter(ConnectionFactory connectionFactory,
162+
String queue) {
163+
164+
return new AmqpInboundPolledChannelAdapterSpec(connectionFactory, queue);
165+
}
166+
167+
/**
168+
* Create an initial AmqpInboundPolledChannelAdapterSpec
169+
* @param connectionFactory the connectionFactory.
170+
* @param ackCallbackFactory the ackCallbackFactory
171+
* @param queue the queue.
172+
* @return the AmqpInboundPolledChannelAdapterSpec.
173+
* @since 5.0.1
174+
*/
175+
public static AmqpInboundPolledChannelAdapterSpec inboundPolledAdapter(ConnectionFactory connectionFactory,
176+
AmqpAckCallbackFactory ackCallbackFactory, String queue) {
177+
178+
return new AmqpInboundPolledChannelAdapterSpec(connectionFactory, ackCallbackFactory, queue);
179+
}
180+
148181
/**
149182
* Create an initial AmqpInboundChannelAdapterSpec using a
150183
* {@link SimpleMessageListenerContainer}.
@@ -154,6 +187,7 @@ public static AmqpInboundGatewayDMLCSpec inboundGateway(DirectMessageListenerCon
154187
*/
155188
public static AmqpInboundChannelAdapterSMLCSpec inboundAdapter(ConnectionFactory connectionFactory,
156189
String... queueNames) {
190+
157191
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(connectionFactory);
158192
listenerContainer.setQueueNames(queueNames);
159193
return new AmqpInboundChannelAdapterSMLCSpec(listenerContainer);
@@ -166,7 +200,9 @@ public static AmqpInboundChannelAdapterSMLCSpec inboundAdapter(ConnectionFactory
166200
* @param queues the queues.
167201
* @return the AmqpInboundChannelAdapterSpec.
168202
*/
169-
public static AmqpInboundChannelAdapterSMLCSpec inboundAdapter(ConnectionFactory connectionFactory, Queue... queues) {
203+
public static AmqpInboundChannelAdapterSMLCSpec inboundAdapter(ConnectionFactory connectionFactory,
204+
Queue... queues) {
205+
170206
SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(connectionFactory);
171207
listenerContainer.setQueues(queues);
172208
return new AmqpInboundChannelAdapterSMLCSpec(listenerContainer);
@@ -235,6 +271,7 @@ public static AmqpAsyncOutboundGatewaySpec asyncOutboundGateway(AsyncRabbitTempl
235271
*/
236272
public static <S extends AmqpPollableMessageChannelSpec<S>> AmqpPollableMessageChannelSpec<S> pollableChannel(
237273
ConnectionFactory connectionFactory) {
274+
238275
return pollableChannel(null, connectionFactory);
239276
}
240277

@@ -247,6 +284,7 @@ public static <S extends AmqpPollableMessageChannelSpec<S>> AmqpPollableMessageC
247284
*/
248285
public static <S extends AmqpPollableMessageChannelSpec<S>> AmqpPollableMessageChannelSpec<S> pollableChannel(
249286
String id, ConnectionFactory connectionFactory) {
287+
250288
return new AmqpPollableMessageChannelSpec<S>(connectionFactory).id(id);
251289
}
252290

@@ -258,6 +296,7 @@ public static <S extends AmqpPollableMessageChannelSpec<S>> AmqpPollableMessageC
258296
*/
259297
public static <S extends AmqpMessageChannelSpec<S>> AmqpMessageChannelSpec<S> channel(
260298
ConnectionFactory connectionFactory) {
299+
261300
return channel(null, connectionFactory);
262301
}
263302

@@ -270,6 +309,7 @@ public static <S extends AmqpMessageChannelSpec<S>> AmqpMessageChannelSpec<S> ch
270309
*/
271310
public static <S extends AmqpMessageChannelSpec<S>> AmqpMessageChannelSpec<S> channel(String id,
272311
ConnectionFactory connectionFactory) {
312+
273313
return new AmqpMessageChannelSpec<S>(connectionFactory).id(id);
274314
}
275315

@@ -290,6 +330,7 @@ public static AmqpPublishSubscribeMessageChannelSpec publishSubscribeChannel(Con
290330
*/
291331
public static AmqpPublishSubscribeMessageChannelSpec publishSubscribeChannel(String id,
292332
ConnectionFactory connectionFactory) {
333+
293334
return new AmqpPublishSubscribeMessageChannelSpec(connectionFactory).id(id);
294335
}
295336

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/*
2+
* Copyright 2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.amqp.dsl;
18+
19+
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
20+
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
21+
import org.springframework.amqp.support.converter.MessageConverter;
22+
import org.springframework.integration.amqp.inbound.AmqpMessageSource;
23+
import org.springframework.integration.amqp.inbound.AmqpMessageSource.AmqpAckCallbackFactory;
24+
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
25+
import org.springframework.integration.dsl.MessageSourceSpec;
26+
27+
/**
28+
* Spec for a polled AMQP inbound channel adapter.
29+
*
30+
* @author Gary Russell
31+
*
32+
* @since 5.0.1
33+
*
34+
*/
35+
public class AmqpInboundPolledChannelAdapterSpec
36+
extends MessageSourceSpec<AmqpInboundPolledChannelAdapterSpec, AmqpMessageSource> {
37+
38+
AmqpInboundPolledChannelAdapterSpec(ConnectionFactory connectionFactory, String queue) {
39+
this.target = new AmqpMessageSource(connectionFactory, queue);
40+
}
41+
42+
AmqpInboundPolledChannelAdapterSpec(ConnectionFactory connectionFactory,
43+
AmqpAckCallbackFactory ackCallbackFactory, String queue) {
44+
45+
this.target = new AmqpMessageSource(connectionFactory, ackCallbackFactory, queue);
46+
}
47+
48+
public AmqpInboundPolledChannelAdapterSpec transacted(boolean transacted) {
49+
this.target.setTransacted(transacted);
50+
return this;
51+
}
52+
53+
public AmqpInboundPolledChannelAdapterSpec setPropertiesConverter(MessagePropertiesConverter propertiesConverter) {
54+
this.target.setPropertiesConverter(propertiesConverter);
55+
return this;
56+
}
57+
58+
public AmqpInboundPolledChannelAdapterSpec setHeaderMapper(AmqpHeaderMapper headerMapper) {
59+
this.target.setHeaderMapper(headerMapper);
60+
return this;
61+
}
62+
63+
public AmqpInboundPolledChannelAdapterSpec setMessageConverter(MessageConverter messageConverter) {
64+
this.target.setMessageConverter(messageConverter);
65+
return this;
66+
}
67+
68+
}

0 commit comments

Comments
 (0)