Skip to content

Commit e6ec86c

Browse files
yilinweiartembilan
authored andcommitted
INT-3945: Add async to the @serviceactivator
JIRA: https://jira.spring.io/browse/INT-3945 Adding test for async annotated ServiceActivator fix whitespace Address comments Change copyright and author * Some code style polishing * Rename `Log4j2LevelAdjuster.level()` to more friendly `forLevel()` factory method name. * Add `@param level` to the `Log4j2LevelAdjuster.forLevel()` to fix JavaDoc warning
1 parent 1bb4f86 commit e6ec86c

File tree

6 files changed

+77
-23
lines changed

6 files changed

+77
-23
lines changed

spring-integration-core/src/main/java/org/springframework/integration/annotation/ServiceActivator.java

+12-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,8 +41,9 @@
4141
* @author Mark Fisher
4242
* @author Gary Russell
4343
* @author Artem Bilan
44+
* @author Yilin Wei
4445
*/
45-
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
46+
@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
4647
@Retention(RetentionPolicy.RUNTIME)
4748
@Inherited
4849
@Documented
@@ -77,7 +78,7 @@
7778
* Only the handler is advised, not the downstream flow.
7879
* @return the advice chain.
7980
*/
80-
String[] adviceChain() default {};
81+
String[] adviceChain() default { };
8182

8283
/**
8384
* Specify the maximum amount of time in milliseconds to wait when sending a reply
@@ -110,12 +111,19 @@
110111
*/
111112
String phase() default "";
112113

114+
/**
115+
* Specify whether the service method is async.
116+
* This value is {@code false} by default.
117+
* @return the async flag.
118+
*/
119+
String async() default "";
120+
113121
/**
114122
* @return the {@link Poller} options for a polled endpoint
115123
* ({@link org.springframework.integration.scheduling.PollerMetadata}).
116124
* This attribute is an {@code array} just to allow an empty default (no poller).
117125
* Only one {@link Poller} element is allowed.
118126
*/
119-
Poller[] poller() default {};
127+
Poller[] poller() default { };
120128

121129
}

spring-integration-core/src/main/java/org/springframework/integration/config/annotation/AbstractMethodAnnotationPostProcessor.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ protected List<Advice> extractAdviceChain(String beanName, List<Annotation> anno
257257
* by setting an empty array on the custom annotation.
258258
*/
259259
if (adviceChainNames != null && adviceChainNames.length > 0) {
260-
adviceChain = new ArrayList<Advice>();
260+
adviceChain = new ArrayList<>();
261261
for (String adviceChainName : adviceChainNames) {
262262
Object adviceChainBean = this.beanFactory.getBean(adviceChainName);
263263
if (adviceChainBean instanceof Advice) {

spring-integration-core/src/main/java/org/springframework/integration/config/annotation/ServiceActivatorAnnotationPostProcessor.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2017 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -38,12 +38,13 @@
3838
* @author Mark Fisher
3939
* @author Gary Russell
4040
* @author Artem Bilan
41+
* @author Yilin Wei
4142
*/
4243
public class ServiceActivatorAnnotationPostProcessor extends AbstractMethodAnnotationPostProcessor<ServiceActivator> {
4344

4445
public ServiceActivatorAnnotationPostProcessor(ConfigurableListableBeanFactory beanFactory) {
4546
super(beanFactory);
46-
this.messageHandlerAttributes.addAll(Arrays.<String>asList("outputChannel", "requiresReply", "adviceChain"));
47+
this.messageHandlerAttributes.addAll(Arrays.asList("outputChannel", "requiresReply", "adviceChain"));
4748
}
4849

4950

@@ -56,9 +57,9 @@ protected MessageHandler createHandler(Object bean, Method method, List<Annotati
5657
if (serviceActivator == null) {
5758
if (target instanceof MessageHandler) {
5859
/*
59-
* Return a reply-producing message handler so that we still get 'produced no reply' messages
60-
* and the super class will inject the advice chain to advise the handler method if needed.
61-
*/
60+
* Return a reply-producing message handler so that we still get 'produced no reply' messages
61+
* and the super class will inject the advice chain to advise the handler method if needed.
62+
*/
6263
return new ReplyProducingMessageHandlerWrapper((MessageHandler) target);
6364
}
6465
else {
@@ -79,6 +80,11 @@ protected MessageHandler createHandler(Object bean, Method method, List<Annotati
7980
serviceActivator.setRequiresReply(Boolean.parseBoolean(this.beanFactory.resolveEmbeddedValue(requiresReply)));
8081
}
8182

83+
String isAsync = MessagingAnnotationUtils.resolveAttribute(annotations, "async", String.class);
84+
if (StringUtils.hasText(isAsync)) {
85+
serviceActivator.setAsync(Boolean.parseBoolean(this.beanFactory.resolveEmbeddedValue(isAsync)));
86+
}
87+
8288
this.setOutputChannelIfPresent(annotations, serviceActivator);
8389
return serviceActivator;
8490
}

spring-integration-core/src/test/java/org/springframework/integration/config/annotation/AnnotatedEndpointActivationTests-context.xml

+10
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,20 @@
1919
<beans:bean id="annotatedEndpoint2"
2020
class="org.springframework.integration.config.annotation.AnnotatedEndpointActivationTests.AnnotatedEndpoint2"/>
2121

22+
<beans:bean id="annotatedEndpoint3"
23+
class="org.springframework.integration.config.annotation.AnnotatedEndpointActivationTests.AnnotatedEndpoint3"/>
24+
2225
<channel id="input"/>
2326

2427
<channel id="output">
2528
<queue capacity="10"/>
2629
</channel>
2730

31+
32+
<channel id="inputAsync"/>
33+
34+
<channel id="outputAsync">
35+
<queue capacity="10"/>
36+
</channel>
37+
2838
</beans:beans>

spring-integration-core/src/test/java/org/springframework/integration/config/annotation/AnnotatedEndpointActivationTests.java

+38-9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2015 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -35,17 +35,18 @@
3535
import org.springframework.messaging.PollableChannel;
3636
import org.springframework.messaging.support.GenericMessage;
3737
import org.springframework.test.annotation.DirtiesContext;
38-
import org.springframework.test.context.ContextConfiguration;
39-
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
38+
import org.springframework.test.context.junit4.SpringRunner;
39+
import org.springframework.util.concurrent.ListenableFuture;
40+
import org.springframework.util.concurrent.SettableListenableFuture;
4041

4142
/**
4243
* @author Dave Syer
4344
* @author Mark Fisher
4445
* @author Gary Russell
4546
* @author Artem Bilan
47+
* @author Yilin Wei
4648
*/
47-
@ContextConfiguration
48-
@RunWith(SpringJUnit4ClassRunner.class)
49+
@RunWith(SpringRunner.class)
4950
@DirtiesContext
5051
public class AnnotatedEndpointActivationTests {
5152

@@ -57,6 +58,14 @@ public class AnnotatedEndpointActivationTests {
5758
@Qualifier("output")
5859
private PollableChannel output;
5960

61+
@Autowired
62+
@Qualifier("inputAsync")
63+
private MessageChannel inputAsync;
64+
65+
@Autowired
66+
@Qualifier("outputAsync")
67+
private PollableChannel outputAsync;
68+
6069
@Autowired
6170
private AbstractApplicationContext applicationContext;
6271

@@ -72,7 +81,7 @@ public void resetCount() {
7281

7382
@Test
7483
public void sendAndReceive() {
75-
this.input.send(new GenericMessage<String>("foo"));
84+
this.input.send(new GenericMessage<>("foo"));
7685
Message<?> message = this.output.receive(100);
7786
assertNotNull(message);
7887
assertEquals("foo: 1", message.getPayload());
@@ -82,10 +91,19 @@ public void sendAndReceive() {
8291
assertTrue(this.applicationContext.containsBean("annotatedEndpoint2.process.serviceActivator"));
8392
}
8493

94+
@Test
95+
public void sendAndReceiveAsync() {
96+
this.inputAsync.send(new GenericMessage<>("foo"));
97+
Message<?> message = this.outputAsync.receive(100);
98+
assertNotNull(message);
99+
assertEquals("foo", message.getPayload());
100+
assertTrue(this.applicationContext.containsBean("annotatedEndpoint3.process.serviceActivator"));
101+
}
102+
85103
@Test
86104
public void sendAndReceiveImplicitInputChannel() {
87105
MessageChannel input = this.applicationContext.getBean("inputImplicit", MessageChannel.class);
88-
input.send(new GenericMessage<String>("foo"));
106+
input.send(new GenericMessage<>("foo"));
89107
Message<?> message = this.output.receive(100);
90108
assertNotNull(message);
91109
assertEquals("foo: 1", message.getPayload());
@@ -96,15 +114,15 @@ public void sendAndReceiveImplicitInputChannel() {
96114
@DirtiesContext
97115
public void stopContext() {
98116
applicationContext.stop();
99-
this.input.send(new GenericMessage<String>("foo"));
117+
this.input.send(new GenericMessage<>("foo"));
100118
}
101119

102120
@Test
103121
@DirtiesContext
104122
public void stopAndRestartContext() {
105123
applicationContext.stop();
106124
applicationContext.start();
107-
this.input.send(new GenericMessage<String>("foo"));
125+
this.input.send(new GenericMessage<>("foo"));
108126
Message<?> message = this.output.receive(100);
109127
assertNotNull(message);
110128
assertEquals("foo: 1", message.getPayload());
@@ -139,5 +157,16 @@ public String process(String message) {
139157

140158
}
141159

160+
@SuppressWarnings("unused")
161+
private static class AnnotatedEndpoint3 {
162+
163+
@ServiceActivator(inputChannel = "inputAsync", outputChannel = "outputAsync", async = "true")
164+
public ListenableFuture<String> process(String message) {
165+
SettableListenableFuture<String> future = new SettableListenableFuture<>();
166+
future.set(message);
167+
return future;
168+
}
169+
170+
}
142171

143172
}

spring-integration-test-support/src/main/java/org/springframework/integration/test/rule/Log4j2LevelAdjuster.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ public Log4j2LevelAdjuster categories(String... categories) {
183183
* @return the Log4j2LevelAdjuster instance
184184
*/
185185
public static Log4j2LevelAdjuster trace() {
186-
return level(Level.TRACE);
186+
return forLevel(Level.TRACE);
187187
}
188188

189189
/**
@@ -192,7 +192,7 @@ public static Log4j2LevelAdjuster trace() {
192192
* @return the Log4j2LevelAdjuster instance
193193
*/
194194
public static Log4j2LevelAdjuster debug() {
195-
return level(Level.DEBUG);
195+
return forLevel(Level.DEBUG);
196196
}
197197

198198
/**
@@ -201,15 +201,16 @@ public static Log4j2LevelAdjuster debug() {
201201
* @return the Log4j2LevelAdjuster instance
202202
*/
203203
public static Log4j2LevelAdjuster info() {
204-
return level(Level.INFO);
204+
return forLevel(Level.INFO);
205205
}
206206

207207
/**
208208
* The factory to produce Log4j2LevelAdjuster instances for arbitrary logging {@link Level}
209209
* with the {@code org.springframework.integration} as default category.
210+
* @param level the {@link Level} to use for logging
210211
* @return the Log4j2LevelAdjuster instance
211212
*/
212-
public static Log4j2LevelAdjuster level(Level level) {
213+
public static Log4j2LevelAdjuster forLevel(Level level) {
213214
return new Log4j2LevelAdjuster(level);
214215
}
215216

0 commit comments

Comments
 (0)