Skip to content

Commit d8b0e4c

Browse files
artembilangaryrussell
authored andcommitted
INT-4471: PubSubChannel: Add errorHandler warn (#2459)
* INT-4471: PubSubChannel: Add errorHandler warn JIRA: https://jira.spring.io/browse/INT-4471 * When an `Executor` is not provided, log warn that the provided `ErrorHandler` is ignored. **Cherry-pick to 5.0.x and 4.3.x** * * Polish warn message
1 parent 8daca72 commit d8b0e4c

File tree

4 files changed

+44
-50
lines changed

4 files changed

+44
-50
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/PublishSubscribeChannel.java

+13-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -36,19 +36,18 @@
3636
*/
3737
public class PublishSubscribeChannel extends AbstractExecutorChannel {
3838

39-
private volatile ErrorHandler errorHandler;
39+
private ErrorHandler errorHandler;
4040

41-
private volatile boolean ignoreFailures;
41+
private boolean ignoreFailures;
4242

43-
private volatile boolean applySequence;
43+
private boolean applySequence;
4444

45-
private volatile int minSubscribers;
45+
private int minSubscribers;
4646

4747
/**
4848
* Create a PublishSubscribeChannel that will use an {@link Executor}
4949
* to invoke the handlers. If this is null, each invocation will occur in
5050
* the message sender's thread.
51-
*
5251
* @param executor The executor.
5352
*/
5453
public PublishSubscribeChannel(Executor executor) {
@@ -80,9 +79,7 @@ public String getComponentType() {
8079
* a {@link MessagePublishingErrorHandler} that sends error messages to
8180
* the failed request Message's error channel header if available or to
8281
* the default 'errorChannel' otherwise.
83-
*
8482
* @param errorHandler The error handler.
85-
*
8683
* @see #PublishSubscribeChannel(Executor)
8784
*/
8885
public void setErrorHandler(ErrorHandler errorHandler) {
@@ -147,6 +144,14 @@ public final void onInit() throws Exception {
147144
getDispatcher().setApplySequence(this.applySequence);
148145
getDispatcher().setMinSubscribers(this.minSubscribers);
149146
}
147+
else if (this.errorHandler != null) {
148+
if (this.logger.isWarnEnabled()) {
149+
this.logger.warn("The 'errorHandler' is ignored for the '" + getComponentName() +
150+
"' (an 'executor' is not provided) and exceptions will be thrown " +
151+
"directly within the sending Thread");
152+
}
153+
}
154+
150155
if (this.maxSubscribers == null) {
151156
Integer maxSubscribers =
152157
getIntegrationProperty(IntegrationProperties.CHANNELS_MAX_BROADCAST_SUBSCRIBERS, Integer.class);
+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
<publish-subscribe-channel id="channelWithApplySequenceEnabledAndTaskExecutor" apply-sequence="true" task-executor="pool"/>
2121

22-
<publish-subscribe-channel id="channelWithErrorHandler" error-handler="testErrorHandler"/>
22+
<publish-subscribe-channel id="channelWithErrorHandler" error-handler="testErrorHandler" task-executor="pool"/>
2323

2424
<task:executor id="pool" pool-size="1"/>
2525

Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2015 the original author or authors.
2+
* Copyright 2002-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,9 +26,11 @@
2626
import java.util.concurrent.Executor;
2727

2828
import org.junit.Test;
29+
import org.junit.runner.RunWith;
2930

3031
import org.springframework.beans.DirectFieldAccessor;
31-
import org.springframework.context.support.ClassPathXmlApplicationContext;
32+
import org.springframework.beans.factory.annotation.Autowired;
33+
import org.springframework.context.ApplicationContext;
3234
import org.springframework.integration.channel.PublishSubscribeChannel;
3335
import org.springframework.integration.dispatcher.BroadcastingDispatcher;
3436
import org.springframework.integration.support.utils.IntegrationUtils;
@@ -37,21 +39,23 @@
3739
import org.springframework.messaging.MessageHandler;
3840
import org.springframework.messaging.MessagingException;
3941
import org.springframework.messaging.support.GenericMessage;
42+
import org.springframework.test.context.junit4.SpringRunner;
4043
import org.springframework.util.ErrorHandler;
4144

4245
/**
4346
* @author Mark Fisher
4447
* @author Gary Russell
4548
* @author Artem Bilan
4649
*/
50+
@RunWith(SpringRunner.class)
4751
public class PublishSubscribeChannelParserTests {
4852

53+
@Autowired
54+
private ApplicationContext context;
55+
4956
@Test
5057
public void defaultChannel() {
51-
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
52-
"publishSubscribeChannelParserTests.xml", this.getClass());
53-
PublishSubscribeChannel channel = (PublishSubscribeChannel)
54-
context.getBean("defaultChannel");
58+
PublishSubscribeChannel channel = this.context.getBean("defaultChannel", PublishSubscribeChannel.class);
5559
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
5660
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
5761
accessor.getPropertyValue("dispatcher");
@@ -69,43 +73,34 @@ public void handleMessage(Message<?> message) throws MessagingException {
6973
assertNull(dispatcherAccessor.getPropertyValue("executor"));
7074
assertFalse((Boolean) dispatcherAccessor.getPropertyValue("ignoreFailures"));
7175
assertTrue((Boolean) dispatcherAccessor.getPropertyValue("applySequence"));
72-
Object mbf = context.getBean(IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);
76+
Object mbf = this.context.getBean(IntegrationUtils.INTEGRATION_MESSAGE_BUILDER_FACTORY_BEAN_NAME);
7377
assertSame(mbf, dispatcherAccessor.getPropertyValue("messageBuilderFactory"));
74-
context.close();
7578
}
7679

7780
@Test
7881
public void ignoreFailures() {
79-
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
80-
"publishSubscribeChannelParserTests.xml", this.getClass());
81-
PublishSubscribeChannel channel = (PublishSubscribeChannel)
82-
context.getBean("channelWithIgnoreFailures");
82+
PublishSubscribeChannel channel =
83+
this.context.getBean("channelWithIgnoreFailures", PublishSubscribeChannel.class);
8384
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
8485
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
8586
accessor.getPropertyValue("dispatcher");
8687
assertTrue((Boolean) new DirectFieldAccessor(dispatcher).getPropertyValue("ignoreFailures"));
87-
context.close();
8888
}
8989

9090
@Test
9191
public void applySequenceEnabled() {
92-
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
93-
"publishSubscribeChannelParserTests.xml", this.getClass());
94-
PublishSubscribeChannel channel = (PublishSubscribeChannel)
95-
context.getBean("channelWithApplySequenceEnabled");
92+
PublishSubscribeChannel channel =
93+
this.context.getBean("channelWithApplySequenceEnabled", PublishSubscribeChannel.class);
9694
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
9795
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
9896
accessor.getPropertyValue("dispatcher");
9997
assertTrue((Boolean) new DirectFieldAccessor(dispatcher).getPropertyValue("applySequence"));
100-
context.close();
10198
}
10299

103100
@Test
104101
public void channelWithTaskExecutor() {
105-
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
106-
"publishSubscribeChannelParserTests.xml", this.getClass());
107-
PublishSubscribeChannel channel = (PublishSubscribeChannel)
108-
context.getBean("channelWithTaskExecutor");
102+
PublishSubscribeChannel channel =
103+
this.context.getBean("channelWithTaskExecutor", PublishSubscribeChannel.class);
109104
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
110105
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
111106
accessor.getPropertyValue("dispatcher");
@@ -116,15 +111,12 @@ public void channelWithTaskExecutor() {
116111
DirectFieldAccessor executorAccessor = new DirectFieldAccessor(executor);
117112
Executor innerExecutor = (Executor) executorAccessor.getPropertyValue("executor");
118113
assertEquals(context.getBean("pool"), innerExecutor);
119-
context.close();
120114
}
121115

122116
@Test
123117
public void ignoreFailuresWithTaskExecutor() {
124-
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
125-
"publishSubscribeChannelParserTests.xml", this.getClass());
126-
PublishSubscribeChannel channel = (PublishSubscribeChannel)
127-
context.getBean("channelWithIgnoreFailuresAndTaskExecutor");
118+
PublishSubscribeChannel channel =
119+
this.context.getBean("channelWithIgnoreFailuresAndTaskExecutor", PublishSubscribeChannel.class);
128120
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
129121
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
130122
accessor.getPropertyValue("dispatcher");
@@ -135,16 +127,13 @@ public void ignoreFailuresWithTaskExecutor() {
135127
assertEquals(ErrorHandlingTaskExecutor.class, executor.getClass());
136128
DirectFieldAccessor executorAccessor = new DirectFieldAccessor(executor);
137129
Executor innerExecutor = (Executor) executorAccessor.getPropertyValue("executor");
138-
assertEquals(context.getBean("pool"), innerExecutor);
139-
context.close();
130+
assertEquals(this.context.getBean("pool"), innerExecutor);
140131
}
141132

142133
@Test
143134
public void applySequenceEnabledWithTaskExecutor() {
144-
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
145-
"publishSubscribeChannelParserTests.xml", this.getClass());
146-
PublishSubscribeChannel channel = (PublishSubscribeChannel)
147-
context.getBean("channelWithApplySequenceEnabledAndTaskExecutor");
135+
PublishSubscribeChannel channel =
136+
this.context.getBean("channelWithApplySequenceEnabledAndTaskExecutor", PublishSubscribeChannel.class);
148137
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
149138
BroadcastingDispatcher dispatcher = (BroadcastingDispatcher)
150139
accessor.getPropertyValue("dispatcher");
@@ -155,21 +144,17 @@ public void applySequenceEnabledWithTaskExecutor() {
155144
assertEquals(ErrorHandlingTaskExecutor.class, executor.getClass());
156145
DirectFieldAccessor executorAccessor = new DirectFieldAccessor(executor);
157146
Executor innerExecutor = (Executor) executorAccessor.getPropertyValue("executor");
158-
assertEquals(context.getBean("pool"), innerExecutor);
159-
context.close();
147+
assertEquals(this.context.getBean("pool"), innerExecutor);
160148
}
161149

162150
@Test
163151
public void channelWithErrorHandler() {
164-
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
165-
"publishSubscribeChannelParserTests.xml", this.getClass());
166-
PublishSubscribeChannel channel = (PublishSubscribeChannel)
167-
context.getBean("channelWithErrorHandler");
152+
PublishSubscribeChannel channel =
153+
this.context.getBean("channelWithErrorHandler", PublishSubscribeChannel.class);
168154
DirectFieldAccessor accessor = new DirectFieldAccessor(channel);
169155
ErrorHandler errorHandler = (ErrorHandler) accessor.getPropertyValue("errorHandler");
170156
assertNotNull(errorHandler);
171-
assertEquals(context.getBean("testErrorHandler"), errorHandler);
172-
context.close();
157+
assertEquals(this.context.getBean("testErrorHandler"), errorHandler);
173158
}
174159

175160
}

src/reference/asciidoc/channel.adoc

+4
Original file line numberDiff line numberDiff line change
@@ -565,6 +565,10 @@ When using this element, you can also specify the `task-executor` used for publi
565565
<int:publish-subscribe-channel id="pubsubChannel" task-executor="someExecutor"/>
566566
----
567567

568+
Alongside with the `Executor`, an `ErrorHandler` can be configured as well.
569+
By default the `PublishSubscribeChannel` uses a `MessagePublishingErrorHandler` implementation to send error to the `MessageChannel` from the `errorChannel` header or a global `errorChannel` instance.
570+
If an `Executor` is not configured, the `ErrorHandler` is ignored and exceptions are thrown directly to the caller's Thread.
571+
568572
If you are providing a _Resequencer_ or _Aggregator_ downstream from a `PublishSubscribeChannel`, then you can set the 'apply-sequence' property on the channel to `true`.
569573
That will indicate that the channel should set the sequence-size and sequence-number Message headers as well as the correlation id prior to passing the Messages along.
570574
For example, if there are 5 subscribers, the sequence-size would be set to 5, and the Messages would have sequence-number header values ranging from 1 to 5.

0 commit comments

Comments
 (0)