Skip to content

Commit 4ee4984

Browse files
committed
And Javadocs to AMQP 1.0 Java DSL classes
* Remove from `AmqpClientInboundGateway` wrong delegate for the `taskScheduler` since gateway does not do batches * Remove `taskScheduler` property from the `AmqpClientInboundGatewaySpec` * Correct `AmqpClientMessageHandler` Javadoc about `SmartMessageConverter`
1 parent 05cfc04 commit 4ee4984

File tree

6 files changed

+290
-15
lines changed

6 files changed

+290
-15
lines changed

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
public final class AmqpClient {
3030

3131
/**
32-
* Create an initial {@link AmqpInboundGatewaySpec}.
32+
* Create an initial {@link AmqpClientInboundGatewaySpec}.
3333
* @param connectionFactory the connectionFactory.
3434
* @param queueNames the queueNames.
3535
* @return the AmqpInboundGatewaySpec.

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpClientInboundChannelAdapterSpec.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,77 +40,169 @@
4040
public class AmqpClientInboundChannelAdapterSpec
4141
extends MessageProducerSpec<AmqpClientInboundChannelAdapterSpec, AmqpClientMessageProducer> {
4242

43+
/**
44+
* Create an instance based on a {@link AmqpConnectionFactory} and queues to consume from.
45+
* @param connectionFactory the {@link AmqpConnectionFactory} to connect
46+
* @param queueNames queues to consume from
47+
*/
4348
public AmqpClientInboundChannelAdapterSpec(AmqpConnectionFactory connectionFactory, String... queueNames) {
4449
super(new AmqpClientMessageProducer(connectionFactory, queueNames));
4550
}
4651

52+
/**
53+
* The initial number credits to grant to the AMQP receiver.
54+
* The default is {@code 100}.
55+
* @param initialCredits number of initial credits
56+
* @return the spec
57+
*/
4758
public AmqpClientInboundChannelAdapterSpec initialCredits(int initialCredits) {
4859
this.target.setInitialCredits(initialCredits);
4960
return this;
5061
}
5162

63+
/**
64+
* The consumer priority.
65+
* @param priority consumer priority
66+
* @return the spec
67+
*/
5268
public AmqpClientInboundChannelAdapterSpec priority(int priority) {
5369
this.target.setPriority(priority);
5470
return this;
5571
}
5672

73+
/**
74+
* Add {@link Resource.StateListener} instances to the consumer.
75+
* @param stateListeners listeners to add
76+
* @return the spec
77+
*/
5778
public AmqpClientInboundChannelAdapterSpec stateListeners(Resource.StateListener... stateListeners) {
5879
this.target.setStateListeners(stateListeners);
5980
return this;
6081
}
6182

83+
/**
84+
* Add {@link MessagePostProcessor} instances to apply on just received messages.
85+
* @param afterReceivePostProcessors listeners to add
86+
* @return the spec
87+
*/
6288
public AmqpClientInboundChannelAdapterSpec afterReceivePostProcessors(
6389
MessagePostProcessor... afterReceivePostProcessors) {
6490

6591
this.target.setAfterReceivePostProcessors(afterReceivePostProcessors);
6692
return this;
6793
}
6894

95+
/**
96+
* Set a number of AMPQ messages to gather before producing as a single message downstream.
97+
* Default 1 - no batching.
98+
* @param batchSize the batch size to use.
99+
* @return the spec
100+
* @see #batchReceiveTimeout(long)
101+
*/
69102
public AmqpClientInboundChannelAdapterSpec batchSize(int batchSize) {
70103
this.target.setBatchSize(batchSize);
71104
return this;
72105
}
73106

107+
/**
108+
* Set a timeout in milliseconds for how long a batch gathering process should go.
109+
* Therefore, the batch is released as a single message whatever first happens:
110+
* this timeout or {@link #batchSize(int)}.
111+
* Default 30 seconds.
112+
* @param batchReceiveTimeout the timeout for gathering a batch.
113+
* @return the spec
114+
*/
74115
public AmqpClientInboundChannelAdapterSpec batchReceiveTimeout(long batchReceiveTimeout) {
75116
this.target.setBatchReceiveTimeout(batchReceiveTimeout);
76117
return this;
77118
}
78119

120+
/**
121+
* Set a {@link TaskScheduler} for monitoring batch releases.
122+
* @param taskScheduler the taskScheduler to use
123+
* @return the spec
124+
*/
79125
public AmqpClientInboundChannelAdapterSpec taskScheduler(TaskScheduler taskScheduler) {
80126
this.target.setTaskScheduler(taskScheduler);
81127
return this;
82128
}
83129

130+
/**
131+
* Set {@link Advice} instances to proxy message listener.
132+
* @param advices the taskScheduler to add
133+
* @return the spec
134+
*/
84135
public AmqpClientInboundChannelAdapterSpec adviceChain(Advice... advices) {
85136
this.target.setAdviceChain(advices);
86137
return this;
87138
}
88139

140+
/**
141+
* Set to {@code false} to propagate an acknowledgement callback into message headers
142+
* for downstream flow manual settlement.
143+
* @param autoSettle {@code true} to acknowledge messages automatically.
144+
* @return the spec
145+
*/
89146
public AmqpClientInboundChannelAdapterSpec autoSettle(boolean autoSettle) {
90147
this.target.setAutoSettle(autoSettle);
91148
return this;
92149
}
93150

151+
/**
152+
* Set the default behavior when a message processing has failed.
153+
* When true, messages will be requeued, when false, they will be discarded.
154+
* When true, the default can be overridden by the listener throwing an
155+
* {@link org.springframework.amqp.AmqpRejectAndDontRequeueException}.
156+
* Default true.
157+
* @param defaultRequeue true to requeue by default.
158+
* @return the spec
159+
*/
94160
public AmqpClientInboundChannelAdapterSpec defaultRequeue(boolean defaultRequeue) {
95161
this.target.setDefaultRequeue(defaultRequeue);
96162
return this;
97163
}
98164

165+
/**
166+
* Set a duration for how long to wait for all the consumers to shoutdown successfully on listener container stop.
167+
* Default 30 seconds.
168+
* @param gracefulShutdownPeriod the timeout to wait on stop.
169+
* @return the spec
170+
*/
99171
public AmqpClientInboundChannelAdapterSpec gracefulShutdownPeriod(Duration gracefulShutdownPeriod) {
100172
this.target.setGracefulShutdownPeriod(gracefulShutdownPeriod);
101173
return this;
102174
}
103175

176+
/**
177+
* Each queue runs in its own consumer; set this property to create multiple
178+
* consumers for each queue.
179+
* Can be treated as {@code concurrency}, but per queue.
180+
* @param consumersPerQueue the consumers per queue.
181+
* @return the spec
182+
*/
104183
public AmqpClientInboundChannelAdapterSpec consumersPerQueue(int consumersPerQueue) {
105184
this.target.setConsumersPerQueue(consumersPerQueue);
106185
return this;
107186
}
108187

188+
/**
189+
* Set a {@link MessageConverter} to replace the default
190+
* {@link org.springframework.amqp.support.converter.SimpleMessageConverter}.
191+
* If set to null, an AMQP message is sent as is into a message payload.
192+
* And a reply message has to return an AMQP message as its payload.
193+
* @param messageConverter the {@link MessageConverter} to use or null.
194+
* @return the spec
195+
*/
109196
public AmqpClientInboundChannelAdapterSpec messageConverter(@Nullable MessageConverter messageConverter) {
110197
this.target.setMessageConverter(messageConverter);
111198
return this;
112199
}
113200

201+
/**
202+
* Set an {@link AmqpHeaderMapper} to map request headers.
203+
* @param headerMapper the {@link AmqpHeaderMapper} to use.
204+
* @return the spec
205+
*/
114206
public AmqpClientInboundChannelAdapterSpec headerMapper(AmqpHeaderMapper headerMapper) {
115207
this.target.setHeaderMapper(headerMapper);
116208
return this;

spring-integration-amqp/src/main/java/org/springframework/integration/amqp/dsl/AmqpClientInboundGatewaySpec.java

Lines changed: 101 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.springframework.integration.amqp.inbound.AmqpClientInboundGateway;
3030
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
3131
import org.springframework.integration.dsl.MessagingGatewaySpec;
32-
import org.springframework.scheduling.TaskScheduler;
3332

3433
/**
3534
* Spec for an {@link AmqpClientInboundGateway}.
@@ -41,87 +40,183 @@
4140
public class AmqpClientInboundGatewaySpec
4241
extends MessagingGatewaySpec<AmqpClientInboundGatewaySpec, AmqpClientInboundGateway> {
4342

43+
/**
44+
* Create an instance based on a {@link AmqpConnectionFactory} and queues to consume from.
45+
* @param connectionFactory the {@link AmqpConnectionFactory} to connect
46+
* @param queueNames queues to consume from
47+
*/
4448
public AmqpClientInboundGatewaySpec(AmqpConnectionFactory connectionFactory, String... queueNames) {
4549
super(new AmqpClientInboundGateway(connectionFactory, queueNames));
4650
}
4751

52+
/**
53+
* The initial number credits to grant to the AMQP receiver.
54+
* The default is {@code 100}.
55+
* @param initialCredits number of initial credits
56+
* @return the spec
57+
*/
4858
public AmqpClientInboundGatewaySpec initialCredits(int initialCredits) {
4959
this.target.setInitialCredits(initialCredits);
5060
return this;
5161
}
5262

63+
/**
64+
* The consumer priority.
65+
* @param priority consumer priority
66+
* @return the spec
67+
*/
5368
public AmqpClientInboundGatewaySpec priority(int priority) {
5469
this.target.setPriority(priority);
5570
return this;
5671
}
5772

73+
/**
74+
* Add {@link Resource.StateListener} instances to the consumer.
75+
* @param stateListeners listeners to add
76+
* @return the spec
77+
*/
5878
public AmqpClientInboundGatewaySpec stateListeners(Resource.StateListener... stateListeners) {
5979
this.target.setStateListeners(stateListeners);
6080
return this;
6181
}
6282

83+
/**
84+
* Add {@link MessagePostProcessor} instances to apply on just received messages.
85+
* @param afterReceivePostProcessors listeners to add
86+
* @return the spec
87+
*/
6388
public AmqpClientInboundGatewaySpec afterReceivePostProcessors(
6489
MessagePostProcessor... afterReceivePostProcessors) {
6590

6691
this.target.setAfterReceivePostProcessors(afterReceivePostProcessors);
6792
return this;
6893
}
6994

70-
public AmqpClientInboundGatewaySpec taskScheduler(TaskScheduler taskScheduler) {
71-
this.target.setTaskScheduler(taskScheduler);
72-
return this;
73-
}
74-
95+
/**
96+
* Set {@link Advice} instances to proxy message listener.
97+
* @param advices the taskScheduler to add
98+
* @return the spec
99+
*/
75100
public AmqpClientInboundGatewaySpec adviceChain(Advice... advices) {
76101
this.target.setAdviceChain(advices);
77102
return this;
78103
}
79104

105+
/**
106+
* Set to {@code false} to propagate an acknowledgement callback into message headers
107+
* for downstream flow manual settlement.
108+
* @param autoSettle {@code true} to acknowledge messages automatically.
109+
* @return the spec
110+
*/
80111
public AmqpClientInboundGatewaySpec autoSettle(boolean autoSettle) {
81112
this.target.setAutoSettle(autoSettle);
82113
return this;
83114
}
84115

116+
/**
117+
* Set the default behavior when a message processing has failed.
118+
* When true, messages will be requeued, when false, they will be discarded.
119+
* When true, the default can be overridden by the listener throwing an
120+
* {@link org.springframework.amqp.AmqpRejectAndDontRequeueException}.
121+
* Default true.
122+
* @param defaultRequeue true to requeue by default.
123+
* @return the spec
124+
*/
85125
public AmqpClientInboundGatewaySpec defaultRequeue(boolean defaultRequeue) {
86126
this.target.setDefaultRequeue(defaultRequeue);
87127
return this;
88128
}
89129

130+
/**
131+
* Set a duration for how long to wait for all the consumers to shoutdown successfully on listener container stop.
132+
* Default 30 seconds.
133+
* @param gracefulShutdownPeriod the timeout to wait on stop.
134+
* @return the spec
135+
*/
90136
public AmqpClientInboundGatewaySpec gracefulShutdownPeriod(Duration gracefulShutdownPeriod) {
91137
this.target.setGracefulShutdownPeriod(gracefulShutdownPeriod);
92138
return this;
93139
}
94140

141+
/**
142+
* Each queue runs in its own consumer; set this property to create multiple
143+
* consumers for each queue.
144+
* Can be treated as {@code concurrency}, but per queue.
145+
* @param consumersPerQueue the consumers per queue.
146+
* @return the spec
147+
*/
95148
public AmqpClientInboundGatewaySpec consumersPerQueue(int consumersPerQueue) {
96149
this.target.setConsumersPerQueue(consumersPerQueue);
97150
return this;
98151
}
99152

153+
/**
154+
* Set a {@link MessageConverter} to replace the default
155+
* {@link org.springframework.amqp.support.converter.SimpleMessageConverter}.
156+
* If set to null, an AMQP message is sent as is into a message payload.
157+
* And a reply message has to return an AMQP message as its payload.
158+
* @param messageConverter the {@link MessageConverter} to use or null.
159+
* @return the spec
160+
*/
100161
public AmqpClientInboundGatewaySpec messageConverter(@Nullable MessageConverter messageConverter) {
101162
this.target.setMessageConverter(messageConverter);
102163
return this;
103164
}
104165

166+
/**
167+
* Set an {@link AmqpHeaderMapper} to map request and reply headers.
168+
* @param headerMapper the {@link AmqpHeaderMapper} to use.
169+
* @return the spec
170+
*/
105171
public AmqpClientInboundGatewaySpec headerMapper(AmqpHeaderMapper headerMapper) {
106172
this.target.setHeaderMapper(headerMapper);
107173
return this;
108174
}
109175

176+
/**
177+
* Set an {@link ReplyPostProcessor} to modify reply AMQP message before producing.
178+
* @param replyPostProcessor the {@link ReplyPostProcessor} to use.
179+
* @return the spec
180+
*/
110181
public AmqpClientInboundGatewaySpec replyPostProcessor(ReplyPostProcessor replyPostProcessor) {
111182
this.target.setReplyPostProcessor(replyPostProcessor);
112183
return this;
113184
}
114185

186+
/**
187+
* Set an exchange for publishing reply.
188+
* Mutually exclusive with {@link #replyQueue(String)}.
189+
* If neither is set, the {@code replyTo} property from the request message
190+
* is used to determine where to produce a reply.
191+
* @param exchange the exchange to send a reply.
192+
* @return the spec
193+
*/
115194
public AmqpClientInboundGatewaySpec replyExchange(String exchange) {
116195
this.target.setReplyExchange(exchange);
117196
return this;
118197
}
119198

199+
/**
200+
* Set a routing key for publishing reply.
201+
* Used only together with {@link #replyExchange(String)}.
202+
* If neither is set, the {@code replyTo} property from the request message
203+
* is used to determine where to produce a reply.
204+
* @param routingKey the routing key to send a reply.
205+
* @return the spec
206+
*/
120207
public AmqpClientInboundGatewaySpec replyRoutingKey(String routingKey) {
121208
this.target.setReplyRoutingKey(routingKey);
122209
return this;
123210
}
124211

212+
/**
213+
* Set a queue for publishing reply.
214+
* Mutually exclusive with {@link #replyExchange(String)}.
215+
* If neither is set, the {@code replyTo} property from the request message
216+
* is used to determine where to produce a reply.
217+
* @param queue the queue to send a reply.
218+
* @return the spec
219+
*/
125220
public AmqpClientInboundGatewaySpec replyQueue(String queue) {
126221
this.target.setReplyQueue(queue);
127222
return this;

0 commit comments

Comments
 (0)