Skip to content

Commit ac9cfef

Browse files
committed
TaskExecutorRegistration does not apply its default settings to a user-provided executor
Also, ChannelRegistration.setInterceptors is deprecated now: in favor of a fluently named interceptors(...) method which is documented to add the given interceptors to the channel's current list. Issue: SPR-15962 Issue: SPR-15976
1 parent 5bdcb89 commit ac9cfef

File tree

4 files changed

+85
-54
lines changed

4 files changed

+85
-54
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -126,13 +126,15 @@ public ApplicationContext getApplicationContext() {
126126
public AbstractSubscribableChannel clientInboundChannel() {
127127
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientInboundChannelExecutor());
128128
ChannelRegistration reg = getClientInboundChannelRegistration();
129-
channel.setInterceptors(reg.getInterceptors());
129+
if (reg.hasInterceptors()) {
130+
channel.setInterceptors(reg.getInterceptors());
131+
}
130132
return channel;
131133
}
132134

133135
@Bean
134136
public ThreadPoolTaskExecutor clientInboundChannelExecutor() {
135-
TaskExecutorRegistration reg = getClientInboundChannelRegistration().getOrCreateTaskExecRegistration();
137+
TaskExecutorRegistration reg = getClientInboundChannelRegistration().taskExecutor();
136138
ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
137139
executor.setThreadNamePrefix("clientInboundChannel-");
138140
return executor;
@@ -142,7 +144,7 @@ protected final ChannelRegistration getClientInboundChannelRegistration() {
142144
if (this.clientInboundChannelRegistration == null) {
143145
ChannelRegistration registration = new ChannelRegistration();
144146
configureClientInboundChannel(registration);
145-
registration.setInterceptors(new ImmutableMessageChannelInterceptor());
147+
registration.interceptors(new ImmutableMessageChannelInterceptor());
146148
this.clientInboundChannelRegistration = registration;
147149
}
148150
return this.clientInboundChannelRegistration;
@@ -159,13 +161,15 @@ protected void configureClientInboundChannel(ChannelRegistration registration) {
159161
public AbstractSubscribableChannel clientOutboundChannel() {
160162
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(clientOutboundChannelExecutor());
161163
ChannelRegistration reg = getClientOutboundChannelRegistration();
162-
channel.setInterceptors(reg.getInterceptors());
164+
if (reg.hasInterceptors()) {
165+
channel.setInterceptors(reg.getInterceptors());
166+
}
163167
return channel;
164168
}
165169

166170
@Bean
167171
public ThreadPoolTaskExecutor clientOutboundChannelExecutor() {
168-
TaskExecutorRegistration reg = getClientOutboundChannelRegistration().getOrCreateTaskExecRegistration();
172+
TaskExecutorRegistration reg = getClientOutboundChannelRegistration().taskExecutor();
169173
ThreadPoolTaskExecutor executor = reg.getTaskExecutor();
170174
executor.setThreadNamePrefix("clientOutboundChannel-");
171175
return executor;
@@ -175,7 +179,7 @@ protected final ChannelRegistration getClientOutboundChannelRegistration() {
175179
if (this.clientOutboundChannelRegistration == null) {
176180
ChannelRegistration registration = new ChannelRegistration();
177181
configureClientOutboundChannel(registration);
178-
registration.setInterceptors(new ImmutableMessageChannelInterceptor());
182+
registration.interceptors(new ImmutableMessageChannelInterceptor());
179183
this.clientOutboundChannelRegistration = registration;
180184
}
181185
return this.clientOutboundChannelRegistration;
@@ -191,9 +195,9 @@ protected void configureClientOutboundChannel(ChannelRegistration registration)
191195
@Bean
192196
public AbstractSubscribableChannel brokerChannel() {
193197
ChannelRegistration reg = getBrokerRegistry().getBrokerChannelRegistration();
194-
ExecutorSubscribableChannel channel = reg.hasTaskExecutor() ?
195-
new ExecutorSubscribableChannel(brokerChannelExecutor()) : new ExecutorSubscribableChannel();
196-
reg.setInterceptors(new ImmutableMessageChannelInterceptor());
198+
ExecutorSubscribableChannel channel = (reg.hasTaskExecutor() ?
199+
new ExecutorSubscribableChannel(brokerChannelExecutor()) : new ExecutorSubscribableChannel());
200+
reg.interceptors(new ImmutableMessageChannelInterceptor());
197201
channel.setInterceptors(reg.getInterceptors());
198202
return channel;
199203
}

spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java

Lines changed: 18 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -43,43 +43,43 @@ public class ChannelRegistration {
4343
* Configure the thread pool backing this message channel.
4444
*/
4545
public TaskExecutorRegistration taskExecutor() {
46-
if (this.registration == null) {
47-
this.registration = new TaskExecutorRegistration();
48-
}
49-
return this.registration;
46+
return taskExecutor(null);
5047
}
5148

5249
/**
5350
* Configure the thread pool backing this message channel using a custom
5451
* ThreadPoolTaskExecutor.
52+
* @param taskExecutor the executor to use (or {@code null} for a default executor)
5553
*/
56-
public TaskExecutorRegistration taskExecutor(ThreadPoolTaskExecutor taskExecutor) {
54+
public TaskExecutorRegistration taskExecutor(@Nullable ThreadPoolTaskExecutor taskExecutor) {
5755
if (this.registration == null) {
58-
this.registration = new TaskExecutorRegistration(taskExecutor);
56+
this.registration = (taskExecutor != null ? new TaskExecutorRegistration(taskExecutor) :
57+
new TaskExecutorRegistration());
5958
}
6059
return this.registration;
6160
}
6261

6362
/**
64-
* Configure interceptors for the message channel.
63+
* Configure the given interceptors for this message channel,
64+
* adding them to the channel's current list of interceptors.
65+
* @since 4.3.12
6566
*/
66-
public ChannelRegistration setInterceptors(ChannelInterceptor... interceptors) {
67+
public ChannelRegistration interceptors(ChannelInterceptor... interceptors) {
6768
this.interceptors.addAll(Arrays.asList(interceptors));
6869
return this;
6970
}
7071

71-
72-
protected boolean hasTaskExecutor() {
73-
return (this.registration != null);
72+
/**
73+
* @deprecated as of 4.3.12, in favor of {@link #interceptors(ChannelInterceptor...)}
74+
*/
75+
@Deprecated
76+
public ChannelRegistration setInterceptors(ChannelInterceptor... interceptors) {
77+
return interceptors(interceptors);
7478
}
7579

76-
@Nullable
77-
protected TaskExecutorRegistration getTaskExecRegistration() {
78-
return this.registration;
79-
}
8080

81-
protected TaskExecutorRegistration getOrCreateTaskExecRegistration() {
82-
return taskExecutor();
81+
protected boolean hasTaskExecutor() {
82+
return (this.registration != null);
8383
}
8484

8585
protected boolean hasInterceptors() {
@@ -89,4 +89,5 @@ protected boolean hasInterceptors() {
8989
protected List<ChannelInterceptor> getInterceptors() {
9090
return this.interceptors;
9191
}
92+
9293
}

spring-messaging/src/main/java/org/springframework/messaging/simp/config/TaskExecutorRegistration.java

Lines changed: 50 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,53 @@
1818

1919
import org.springframework.lang.Nullable;
2020
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
21+
import org.springframework.util.Assert;
2122

2223
/**
2324
* A registration class for customizing the properties of {@link ThreadPoolTaskExecutor}.
2425
*
2526
* @author Rossen Stoyanchev
27+
* @author Juergen Hoeller
2628
* @since 4.0
2729
*/
2830
public class TaskExecutorRegistration {
2931

30-
@Nullable
31-
private ThreadPoolTaskExecutor taskExecutor;
32+
private final ThreadPoolTaskExecutor taskExecutor;
3233

33-
private int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
34+
@Nullable
35+
private Integer corePoolSize;
3436

35-
private int maxPoolSize = Integer.MAX_VALUE;
37+
@Nullable
38+
private Integer maxPoolSize;
3639

37-
private int queueCapacity = Integer.MAX_VALUE;
40+
@Nullable
41+
private Integer keepAliveSeconds;
3842

39-
private int keepAliveSeconds = 60;
43+
@Nullable
44+
private Integer queueCapacity;
4045

4146

47+
/**
48+
* Create a new {@code TaskExecutorRegistration} for a default
49+
* {@link ThreadPoolTaskExecutor}.
50+
*/
4251
public TaskExecutorRegistration() {
52+
this.taskExecutor = new ThreadPoolTaskExecutor();
53+
this.taskExecutor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
54+
this.taskExecutor.setAllowCoreThreadTimeOut(true);
4355
}
4456

57+
/**
58+
* Create a new {@code TaskExecutorRegistration} for a given
59+
* {@link ThreadPoolTaskExecutor}.
60+
* @param taskExecutor the executor to use
61+
*/
4562
public TaskExecutorRegistration(ThreadPoolTaskExecutor taskExecutor) {
63+
Assert.notNull(taskExecutor, "ThreadPoolTaskExecutor must not be null");
4664
this.taskExecutor = taskExecutor;
4765
}
4866

67+
4968
/**
5069
* Set the core pool size of the ThreadPoolExecutor.
5170
* <p><strong>NOTE:</strong> The core pool size is effectively the max pool size
@@ -77,6 +96,18 @@ public TaskExecutorRegistration maxPoolSize(int maxPoolSize) {
7796
return this;
7897
}
7998

99+
/**
100+
* Set the time limit for which threads may remain idle before being terminated.
101+
* If there are more than the core number of threads currently in the pool,
102+
* after waiting this amount of time without processing a task, excess threads
103+
* will be terminated. This overrides any value set in the constructor.
104+
* <p>By default this is set to 60.
105+
*/
106+
public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds) {
107+
this.keepAliveSeconds = keepAliveSeconds;
108+
return this;
109+
}
110+
80111
/**
81112
* Set the queue capacity for the ThreadPoolExecutor.
82113
* <p><strong>NOTE:</strong> when an unbounded {@code queueCapacity} is configured
@@ -91,26 +122,21 @@ public TaskExecutorRegistration queueCapacity(int queueCapacity) {
91122
return this;
92123
}
93124

94-
/**
95-
* Set the time limit for which threads may remain idle before being terminated.
96-
* If there are more than the core number of threads currently in the pool,
97-
* after waiting this amount of time without processing a task, excess threads
98-
* will be terminated. This overrides any value set in the constructor.
99-
* <p>By default this is set to 60.
100-
*/
101-
public TaskExecutorRegistration keepAliveSeconds(int keepAliveSeconds) {
102-
this.keepAliveSeconds = keepAliveSeconds;
103-
return this;
104-
}
105125

106126
protected ThreadPoolTaskExecutor getTaskExecutor() {
107-
ThreadPoolTaskExecutor executor = (this.taskExecutor != null ? this.taskExecutor : new ThreadPoolTaskExecutor());
108-
executor.setCorePoolSize(this.corePoolSize);
109-
executor.setMaxPoolSize(this.maxPoolSize);
110-
executor.setKeepAliveSeconds(this.keepAliveSeconds);
111-
executor.setQueueCapacity(this.queueCapacity);
112-
executor.setAllowCoreThreadTimeOut(true);
113-
return executor;
127+
if (this.corePoolSize != null) {
128+
this.taskExecutor.setCorePoolSize(this.corePoolSize);
129+
}
130+
if (this.maxPoolSize != null) {
131+
this.taskExecutor.setMaxPoolSize(this.maxPoolSize);
132+
}
133+
if (this.keepAliveSeconds != null) {
134+
this.taskExecutor.setKeepAliveSeconds(this.keepAliveSeconds);
135+
}
136+
if (this.queueCapacity != null) {
137+
this.taskExecutor.setQueueCapacity(this.queueCapacity);
138+
}
139+
return this.taskExecutor;
114140
}
115141

116142
}

spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2016 the original author or authors.
2+
* Copyright 2002-2017 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -512,14 +512,14 @@ static class CustomConfig extends BaseTestMessageBrokerConfig {
512512

513513
@Override
514514
protected void configureClientInboundChannel(ChannelRegistration registration) {
515-
registration.setInterceptors(this.interceptor);
515+
registration.interceptors(this.interceptor);
516516
registration.taskExecutor(new CustomThreadPoolTaskExecutor())
517517
.corePoolSize(11).maxPoolSize(12).keepAliveSeconds(13).queueCapacity(14);
518518
}
519519

520520
@Override
521521
protected void configureClientOutboundChannel(ChannelRegistration registration) {
522-
registration.setInterceptors(this.interceptor, this.interceptor);
522+
registration.interceptors(this.interceptor, this.interceptor);
523523
registration.taskExecutor().corePoolSize(21).maxPoolSize(22).keepAliveSeconds(23).queueCapacity(24);
524524
}
525525

@@ -535,7 +535,7 @@ protected void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> retu
535535

536536
@Override
537537
protected void configureMessageBroker(MessageBrokerRegistry registry) {
538-
registry.configureBrokerChannel().setInterceptors(this.interceptor, this.interceptor, this.interceptor);
538+
registry.configureBrokerChannel().interceptors(this.interceptor, this.interceptor, this.interceptor);
539539
registry.configureBrokerChannel().taskExecutor().corePoolSize(31).maxPoolSize(32).keepAliveSeconds(33).queueCapacity(34);
540540
registry.setPathMatcher(new AntPathMatcher(".")).enableSimpleBroker("/topic", "/queue");
541541
registry.setCacheLimit(8192);

0 commit comments

Comments
 (0)