Skip to content

Commit da26d94

Browse files
artembilanliujiong1982
authored andcommitted
INT-3402-3: Channels Late Resolution spring-projects#3
JIRA: https://jira.spring.io/browse/INT-3402 * Add late resolution of channel names for the `MessagingGatewaySupport` * Implement delegate logic for internal implementations like `ContentEnricher.Gateway`
1 parent 4b00e73 commit da26d94

File tree

6 files changed

+237
-102
lines changed

6 files changed

+237
-102
lines changed

spring-integration-core/src/main/java/org/springframework/integration/gateway/MessagingGatewaySupport.java

Lines changed: 140 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.integration.gateway;
1818

19+
import org.springframework.beans.BeansException;
1920
import org.springframework.integration.core.MessagingTemplate;
2021
import org.springframework.integration.endpoint.AbstractEndpoint;
2122
import org.springframework.integration.endpoint.EventDrivenConsumer;
@@ -33,6 +34,7 @@
3334
import org.springframework.messaging.MessagingException;
3435
import org.springframework.messaging.PollableChannel;
3536
import org.springframework.messaging.SubscribableChannel;
37+
import org.springframework.messaging.core.DestinationResolutionException;
3638
import org.springframework.messaging.support.ErrorMessage;
3739
import org.springframework.util.Assert;
3840

@@ -53,10 +55,16 @@ public abstract class MessagingGatewaySupport extends AbstractEndpoint implement
5355

5456
private volatile MessageChannel requestChannel;
5557

58+
private volatile String requestChannelName;
59+
5660
private volatile MessageChannel replyChannel;
5761

62+
private volatile String replyChannelName;
63+
5864
private volatile MessageChannel errorChannel;
5965

66+
private volatile String errorChannelName;
67+
6068
private volatile long replyTimeout = DEFAULT_TIMEOUT;
6169

6270
@SuppressWarnings("rawtypes")
@@ -66,7 +74,8 @@ public abstract class MessagingGatewaySupport extends AbstractEndpoint implement
6674

6775
private final MessagingTemplate messagingTemplate;
6876

69-
private final HistoryWritingMessagePostProcessor historyWritingPostProcessor = new HistoryWritingMessagePostProcessor();
77+
private final HistoryWritingMessagePostProcessor historyWritingPostProcessor =
78+
new HistoryWritingMessagePostProcessor();
7079

7180
private volatile boolean initialized;
7281

@@ -86,38 +95,67 @@ public MessagingGatewaySupport() {
8695

8796
/**
8897
* Set the request channel.
89-
*
9098
* @param requestChannel the channel to which request messages will be sent
9199
*/
92100
public void setRequestChannel(MessageChannel requestChannel) {
93101
this.requestChannel = requestChannel;
94102
}
95103

104+
/**
105+
* Set the request channel name.
106+
* @param requestChannelName the channel bean name to which request messages will be sent
107+
* @since 4.1
108+
*/
109+
public void setRequestChannelName(String requestChannelName) {
110+
Assert.hasText(requestChannelName, "'requestChannelName' must not be empty");
111+
this.requestChannelName = requestChannelName;
112+
}
113+
96114
/**
97115
* Set the reply channel. If no reply channel is provided, this gateway will
98116
* always use an anonymous, temporary channel for handling replies.
99-
*
100117
* @param replyChannel the channel from which reply messages will be received
101118
*/
102119
public void setReplyChannel(MessageChannel replyChannel) {
103120
this.replyChannel = replyChannel;
104121
}
105122

123+
/**
124+
* Set the reply channel name. If no reply channel is provided, this gateway will
125+
* always use an anonymous, temporary channel for handling replies.
126+
* @param replyChannelName the channel bean name from which reply messages will be received
127+
* @since 4.1
128+
*/
129+
public void setReplyChannelName(String replyChannelName) {
130+
Assert.hasText(replyChannelName, "'replyChannelName' must not be empty");
131+
this.replyChannelName = replyChannelName;
132+
}
133+
106134
/**
107135
* Set the error channel. If no error channel is provided, this gateway will
108136
* propagate Exceptions to the caller. To completely suppress Exceptions, provide
109137
* a reference to the "nullChannel" here.
110-
*
111138
* @param errorChannel The error channel.
112139
*/
113140
public void setErrorChannel(MessageChannel errorChannel) {
114141
this.errorChannel = errorChannel;
115142
}
116143

144+
/**
145+
* Set the error channel name. If no error channel is provided, this gateway will
146+
* propagate Exceptions to the caller. To completely suppress Exceptions, provide
147+
* a reference to the "nullChannel" here.
148+
* @param errorChannelName The error channel bean name.
149+
* @since 4.1
150+
*/
151+
public void setErrorChannelName(String errorChannelName) {
152+
Assert.hasText(errorChannelName, "'errorChannelName' must not be empty");
153+
this.errorChannelName = errorChannelName;
154+
}
155+
117156
/**
118157
* Set the timeout value for sending request messages. If not
119158
* explicitly configured, the default is one second.
120-
*
121159
* @param requestTimeout the timeout value in milliseconds
122160
*/
123161
public void setRequestTimeout(long requestTimeout) {
@@ -127,7 +165,6 @@ public void setRequestTimeout(long requestTimeout) {
127165
/**
128166
* Set the timeout value for receiving reply messages. If not
129167
* explicitly configured, the default is one second.
130-
*
131168
* @param replyTimeout the timeout value in milliseconds
132169
*/
133170
public void setReplyTimeout(long replyTimeout) {
@@ -138,7 +175,6 @@ public void setReplyTimeout(long replyTimeout) {
138175
/**
139176
* Provide an {@link InboundMessageMapper} for creating request Messages
140177
* from any object passed in a send or sendAndReceive operation.
141-
*
142178
* @param requestMapper The request mapper.
143179
*/
144180
public void setRequestMapper(InboundMessageMapper<?> requestMapper) {
@@ -150,7 +186,6 @@ public void setRequestMapper(InboundMessageMapper<?> requestMapper) {
150186
/**
151187
* Provide an {@link OutboundMessageMapper} for mapping to objects from
152188
* any reply Messages received in receive or sendAndReceive operations.
153-
*
154189
* @param replyMapper The reply mapper.
155190
*/
156191
public void setReplyMapper(OutboundMessageMapper<?> replyMapper) {
@@ -173,6 +208,12 @@ public String getComponentType() {
173208

174209
@Override
175210
protected void onInit() throws Exception {
211+
Assert.state(!(this.requestChannelName != null && this.requestChannel != null),
212+
"'requestChannelName' and 'requestChannel' are mutually exclusive.");
213+
Assert.state(!(this.replyChannelName != null && this.replyChannel != null),
214+
"'replyChannelName' and 'replyChannel' are mutually exclusive.");
215+
Assert.state(!(this.errorChannelName != null && this.errorChannel != null),
216+
"'errorChannelName' and 'errorChannel' are mutually exclusive.");
176217
this.historyWritingPostProcessor.setTrackableComponent(this);
177218
this.historyWritingPostProcessor.setMessageBuilderFactory(this.getMessageBuilderFactory());
178219
if (this.getBeanFactory() != null) {
@@ -191,17 +232,79 @@ private void initializeIfNecessary() {
191232
}
192233
}
193234

235+
private MessageChannel getRequestChannel() {
236+
if (this.requestChannelName != null) {
237+
synchronized (this) {
238+
if (this.requestChannelName != null) {
239+
try {
240+
Assert.state(getBeanFactory() != null,
241+
"A bean factory is required to resolve the requestChannel at runtime.");
242+
this.requestChannel = getBeanFactory().getBean(this.requestChannelName, MessageChannel.class);
243+
this.requestChannelName = null;
244+
}
245+
catch (BeansException e) {
246+
throw new DestinationResolutionException("Failed to look up MessageChannel with name '"
247+
+ this.requestChannelName + "' in the BeanFactory.");
248+
}
249+
}
250+
}
251+
}
252+
return this.requestChannel;
253+
}
254+
255+
private MessageChannel getReplyChannel() {
256+
if (this.replyChannelName != null) {
257+
synchronized (this) {
258+
if (this.replyChannelName != null) {
259+
try {
260+
Assert.state(getBeanFactory() != null,
261+
"A bean factory is required to resolve the replyChannel at runtime.");
262+
this.replyChannel = getBeanFactory().getBean(this.replyChannelName, MessageChannel.class);
263+
this.replyChannelName = null;
264+
}
265+
catch (BeansException e) {
266+
throw new DestinationResolutionException("Failed to look up MessageChannel with name '"
267+
+ this.replyChannelName + "' in the BeanFactory.");
268+
}
269+
}
270+
}
271+
}
272+
return this.replyChannel;
273+
}
274+
275+
private MessageChannel getErrorChannel() {
276+
if (this.errorChannelName != null) {
277+
synchronized (this) {
278+
if (this.errorChannelName != null) {
279+
try {
280+
Assert.state(getBeanFactory() != null,
281+
"A bean factory is required to resolve the errorChannel at runtime.");
282+
this.errorChannel = getBeanFactory().getBean(this.errorChannelName, MessageChannel.class);
283+
this.errorChannelName = null;
284+
}
285+
catch (BeansException e) {
286+
throw new DestinationResolutionException("Failed to look up MessageChannel with name '"
287+
+ this.errorChannelName + "' in the BeanFactory.");
288+
}
289+
}
290+
}
291+
}
292+
return this.errorChannel;
293+
}
294+
194295
protected void send(Object object) {
195296
this.initializeIfNecessary();
196297
Assert.notNull(object, "request must not be null");
197-
Assert.state(this.requestChannel != null,
298+
MessageChannel requestChannel = getRequestChannel();
299+
Assert.state(requestChannel != null,
198300
"send is not supported, because no request channel has been configured");
199301
try {
200-
this.messagingTemplate.convertAndSend(this.requestChannel, object, this.historyWritingPostProcessor);
302+
this.messagingTemplate.convertAndSend(requestChannel, object, this.historyWritingPostProcessor);
201303
}
202304
catch (Exception e) {
203-
if (this.errorChannel != null) {
204-
this.messagingTemplate.send(this.errorChannel, new ErrorMessage(e));
305+
MessageChannel errorChannel = getErrorChannel();
306+
if (errorChannel != null) {
307+
this.messagingTemplate.send(errorChannel, new ErrorMessage(e));
205308
}
206309
else {
207310
this.rethrow(e, "failed to send message");
@@ -211,9 +314,10 @@ protected void send(Object object) {
211314

212315
protected Object receive() {
213316
this.initializeIfNecessary();
214-
Assert.state(this.replyChannel != null && (this.replyChannel instanceof PollableChannel),
317+
MessageChannel replyChannel = getReplyChannel();
318+
Assert.state(replyChannel != null && (replyChannel instanceof PollableChannel),
215319
"receive is not supported, because no pollable reply channel has been configured");
216-
return this.messagingTemplate.receiveAndConvert((PollableChannel) this.replyChannel, null);
320+
return this.messagingTemplate.receiveAndConvert(replyChannel, null);
217321
}
218322

219323
protected Object sendAndReceive(Object object) {
@@ -228,17 +332,20 @@ protected Message<?> sendAndReceiveMessage(Object object) {
228332
private Object doSendAndReceive(Object object, boolean shouldConvert) {
229333
this.initializeIfNecessary();
230334
Assert.notNull(object, "request must not be null");
231-
if (this.requestChannel == null) {
335+
MessageChannel requestChannel = getRequestChannel();
336+
if (requestChannel == null) {
232337
throw new MessagingException("No request channel available. Cannot send request message.");
233338
}
234-
if (this.replyChannel != null && this.replyMessageCorrelator == null) {
339+
MessageChannel replyChannel = getReplyChannel();
340+
if (replyChannel != null && this.replyMessageCorrelator == null) {
235341
this.registerReplyMessageCorrelator();
236342
}
237343
Object reply = null;
238344
Throwable error = null;
239345
try {
240346
if (shouldConvert) {
241-
reply = this.messagingTemplate.convertSendAndReceive(this.requestChannel, object, null, this.historyWritingPostProcessor);
347+
reply = this.messagingTemplate.convertSendAndReceive(requestChannel, object, null,
348+
this.historyWritingPostProcessor);
242349
if (reply instanceof Throwable) {
243350
error = (Throwable) reply;
244351
}
@@ -247,7 +354,7 @@ private Object doSendAndReceive(Object object, boolean shouldConvert) {
247354
Message<?> requestMessage = (object instanceof Message<?>)
248355
? (Message<?>) object : this.requestMapper.toMessage(object);
249356
requestMessage = this.historyWritingPostProcessor.postProcessMessage(requestMessage);
250-
reply = this.messagingTemplate.sendAndReceive(this.requestChannel, requestMessage);
357+
reply = this.messagingTemplate.sendAndReceive(requestChannel, requestMessage);
251358
if (reply instanceof ErrorMessage) {
252359
error = ((ErrorMessage) reply).getPayload();
253360
}
@@ -261,14 +368,16 @@ private Object doSendAndReceive(Object object, boolean shouldConvert) {
261368
}
262369

263370
if (error != null) {
264-
if (this.errorChannel != null) {
371+
MessageChannel errorChannel = getErrorChannel();
372+
if (errorChannel != null) {
265373
Message<?> errorMessage = new ErrorMessage(error);
266374
Message<?> errorFlowReply = null;
267375
try {
268-
errorFlowReply = this.messagingTemplate.sendAndReceive(this.errorChannel, errorMessage);
376+
errorFlowReply = this.messagingTemplate.sendAndReceive(errorChannel, errorMessage);
269377
}
270378
catch (Exception errorFlowFailure) {
271-
throw new MessagingException(errorMessage, "failure occurred in error-handling flow", errorFlowFailure);
379+
throw new MessagingException(errorMessage, "failure occurred in error-handling flow",
380+
errorFlowFailure);
272381
}
273382
if (shouldConvert) {
274383
Object result = (errorFlowReply != null) ? errorFlowReply.getPayload() : null;
@@ -307,18 +416,21 @@ private void registerReplyMessageCorrelator() {
307416
handler.setBeanFactory(this.getBeanFactory());
308417
}
309418
handler.afterPropertiesSet();
310-
if (this.replyChannel instanceof SubscribableChannel) {
311-
correlator = new EventDrivenConsumer(
312-
(SubscribableChannel) this.replyChannel, handler);
419+
MessageChannel replyChannel = getReplyChannel();
420+
if (replyChannel instanceof SubscribableChannel) {
421+
correlator = new EventDrivenConsumer((SubscribableChannel) replyChannel, handler);
313422
}
314-
else if (this.replyChannel instanceof PollableChannel) {
315-
PollingConsumer endpoint = new PollingConsumer(
316-
(PollableChannel) this.replyChannel, handler);
423+
else if (replyChannel instanceof PollableChannel) {
424+
PollingConsumer endpoint = new PollingConsumer((PollableChannel) replyChannel, handler);
317425
endpoint.setBeanFactory(this.getBeanFactory());
318426
endpoint.setReceiveTimeout(this.replyTimeout);
319427
endpoint.afterPropertiesSet();
320428
correlator = endpoint;
321429
}
430+
else {
431+
throw new MessagingException("Unsupported 'replyChannel' type [" + replyChannel.getClass() + "]."
432+
+ "SubscribableChannel or PollableChannel type are supported.");
433+
}
322434
if (this.isRunning()) {
323435
correlator.start();
324436
}
@@ -356,6 +468,7 @@ public Message<?> toMessage(Object object) throws Exception {
356468
}
357469
return (object != null) ? this.messageBuilderFactory.withPayload(object).build() : null;
358470
}
471+
359472
}
360473

361474
}

0 commit comments

Comments
 (0)