Skip to content

Commit 216b4af

Browse files
artembilanspring-builds
authored andcommitted
GH-10547: DefaultHeaderChannelRegistry: Fix race condition
Fixes: #10547 When stopping an application, the following error log is generated: ``` ERROR Unexpected error occurred in scheduled task org.springframework.core.task.TaskRejectedException: ExecutorService in shutdown state did not accept task: bean 'integrationHeaderChannelRegistry' at org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler.schedule(ThreadPoolTaskScheduler.java:430) Caused by: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@7bc9f638[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@3ecb6f85[Wrapped task = DelegatingErrorHandlingRunnable for bean 'integrationHeaderChannelRegistry']] rejected from org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler$1@57715118[Shutting down, pool size = 1, active threads = 1, queued tasks = 0, completed tasks = 1225] ``` * Check for `isRunning()` in the `DefaultHeaderChannelRegistry.run()` before scheduling a new reaper task * Increase latch wait timeout to 20 seconds in the `SftpSessionFactoryTests.sharedSessionConcurrentAccess()` (cherry picked from commit 5160117)
1 parent e73ab48 commit 216b4af

File tree

2 files changed

+14
-20
lines changed

2 files changed

+14
-20
lines changed

spring-integration-core/src/main/java/org/springframework/integration/channel/DefaultHeaderChannelRegistry.java

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,11 @@ public class DefaultHeaderChannelRegistry extends IntegrationObjectSupport
5656

5757
private static final int DEFAULT_REAPER_DELAY = 60000;
5858

59-
protected static final AtomicLong id = new AtomicLong(); // NOSONAR
59+
protected static final AtomicLong id = new AtomicLong();
6060

61-
protected final Map<String, MessageChannelWrapper> channels = new ConcurrentHashMap<>(); // NOSONAR
61+
protected final Map<String, MessageChannelWrapper> channels = new ConcurrentHashMap<>();
6262

63-
protected final String uuid = UUID.randomUUID() + ":"; // NOSONAR
63+
protected final String uuid = UUID.randomUUID() + ":";
6464

6565
private boolean removeOnGet;
6666

@@ -118,20 +118,13 @@ public final int size() {
118118
return this.channels.size();
119119
}
120120

121-
@Override
122-
protected void onInit() {
123-
super.onInit();
124-
Assert.notNull(getTaskScheduler(), "a task scheduler is required");
125-
}
126-
127121
@Override
128122
public void start() {
129123
this.lock.lock();
130124
try {
131125
if (!this.running) {
132-
Assert.notNull(getTaskScheduler(), "a task scheduler is required");
133-
this.reaperScheduledFuture = getTaskScheduler()
134-
.schedule(this, Instant.now().plusMillis(this.reaperDelay));
126+
this.reaperScheduledFuture =
127+
getTaskScheduler().schedule(this, Instant.now().plusMillis(this.reaperDelay));
135128

136129
this.running = true;
137130
}
@@ -177,13 +170,12 @@ public Object channelToChannelName(@Nullable Object channel) {
177170
@Override
178171
@Nullable
179172
public Object channelToChannelName(@Nullable Object channel, long timeToLive) {
180-
if (!this.running && !this.explicitlyStopped && this.getTaskScheduler() != null) {
173+
if (!this.running && !this.explicitlyStopped) {
181174
start();
182175
}
183-
if (channel instanceof MessageChannel) {
176+
if (channel instanceof MessageChannel messageChannel) {
184177
String name = this.uuid + id.incrementAndGet();
185-
this.channels.put(name, new MessageChannelWrapper((MessageChannel) channel,
186-
System.currentTimeMillis() + timeToLive));
178+
this.channels.put(name, new MessageChannelWrapper(messageChannel, System.currentTimeMillis() + timeToLive));
187179
logger.debug(() -> "Registered " + channel + " as " + name);
188180
return name;
189181
}
@@ -246,8 +238,10 @@ public void run() {
246238
iterator.remove();
247239
}
248240
}
249-
this.reaperScheduledFuture = getTaskScheduler()
250-
.schedule(this, Instant.now().plusMillis(this.reaperDelay));
241+
if (isRunning()) {
242+
this.reaperScheduledFuture =
243+
getTaskScheduler().schedule(this, Instant.now().plusMillis(this.reaperDelay));
244+
}
251245

252246
logger.trace(() -> "Reaper completed; channels size=" + this.channels.size());
253247
}

spring-integration-sftp/src/test/java/org/springframework/integration/sftp/session/SftpSessionFactoryTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,15 +331,15 @@ protected SftpClient createSftpClient(ClientSession clientSession,
331331
});
332332
}
333333

334-
assertThat(executionLatch.await(10, TimeUnit.SECONDS)).isTrue();
334+
assertThat(executionLatch.await(20, TimeUnit.SECONDS)).isTrue();
335335
synchronized (errors) {
336336
assertThat(errors).isEmpty();
337337
}
338338

339339
assertThat(clientInstances).hasValue(1);
340340

341341
executorService.shutdown();
342-
assertThat(executorService.awaitTermination(10, TimeUnit.SECONDS)).isTrue();
342+
assertThat(executorService.awaitTermination(20, TimeUnit.SECONDS)).isTrue();
343343

344344
sftpSessionFactory.destroy();
345345
}

0 commit comments

Comments
 (0)