Skip to content

Shutdown test executors in -core #2334

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -77,7 +77,8 @@ protected void afterRelease(MessageGroup group, Collection<Message<?>> completed
/*
* Runs "reap" when group 'bar' is in completion
*/
Executors.newSingleThreadExecutor().execute(() -> {
ExecutorService exec = Executors.newSingleThreadExecutor();
exec.execute(() -> {
try {
waitReapStartLatch.await(10, TimeUnit.SECONDS);
}
Expand Down Expand Up @@ -145,6 +146,7 @@ protected void afterRelease(MessageGroup group, Collection<Message<?>> completed
assertEquals(1, ((MessageGroup) outputMessages.get(1).getPayload()).size()); // 'qux'

assertNull(discards.receive(0));
exec.shutdownNow();
}

@Test // INT-2833
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ public void testShouldNotSendPartialResultOnTimeoutByDefault() throws Interrupte
Message<?> message = createMessage(3, "ABC", 2, 1, replyChannel, null);
this.aggregator.handleMessage(message);
this.store.expireMessageGroups(-10000);
Message<?> reply = replyChannel.receive(1000);
Message<?> reply = replyChannel.receive(0);
assertNull("No message should have been sent normally", reply);
Message<?> discardedMessage = discardChannel.receive(1000);
assertNotNull("A message should have been discarded", discardedMessage);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2016 the original author or authors.
* Copyright 2015-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -132,6 +132,7 @@ public void testRequestBeforeReply() throws Exception {
assertEquals("bar", result.get(1));
assertEquals(0, suspensions.size());
assertEquals(0, inProcess.size());
exec.shutdownNow();
}

@Test
Expand All @@ -141,8 +142,8 @@ public void testReplyBeforeRequest() throws Exception {
handler.setOutputChannel(outputChannel);
handler.setBeanFactory(mock(BeanFactory.class));
handler.afterPropertiesSet();
Executors.newSingleThreadExecutor()
.execute(() -> handler.trigger(MessageBuilder.withPayload("bar").setCorrelationId("foo").build()));
ExecutorService exec = Executors.newSingleThreadExecutor();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the #2329 I went the way to expose a SimpleAsyncTaskExecutor as a class property do not instantiate per test and don't care about shutdown, but I think it really doesn't matter from test perspective.

So, merging...

exec.execute(() -> handler.trigger(MessageBuilder.withPayload("bar").setCorrelationId("foo").build()));
Map<?, ?> suspensions = TestUtils.getPropertyValue(handler, "suspensions", Map.class);
int n = 0;
while (n++ < 100 && suspensions.size() == 0) {
Expand All @@ -156,6 +157,7 @@ public void testReplyBeforeRequest() throws Exception {
assertEquals("foo", result.get(0));
assertEquals("bar", result.get(1));
assertEquals(0, suspensions.size());
exec.shutdownNow();
}

@Test
Expand All @@ -169,7 +171,8 @@ public void testLateReply() throws Exception {
handler.setBeanFactory(mock(BeanFactory.class));
handler.afterPropertiesSet();
final CountDownLatch latch = new CountDownLatch(1);
Executors.newSingleThreadExecutor().execute(() -> {
ExecutorService exec = Executors.newSingleThreadExecutor();
exec.execute(() -> {
handler.handleMessage(MessageBuilder.withPayload("foo").setCorrelationId("foo").build());
latch.countDown();
});
Expand All @@ -190,6 +193,7 @@ public void testLateReply() throws Exception {
assertSame(discard, triggerMessage);
handler.handleMessage(MessageBuilder.withPayload("foo").setCorrelationId("foo").build());
assertEquals(0, suspensions.size());
exec.shutdownNow();
}

@Test
Expand Down Expand Up @@ -218,7 +222,8 @@ public void testExceptionReply() throws Exception {
handler.afterPropertiesSet();
final AtomicReference<Exception> exception = new AtomicReference<Exception>();
final CountDownLatch latch = new CountDownLatch(1);
Executors.newSingleThreadExecutor().execute(() -> {
ExecutorService exec = Executors.newSingleThreadExecutor();
exec.execute(() -> {
try {
handler.handleMessage(MessageBuilder.withPayload("foo").setCorrelationId("foo").build());
}
Expand All @@ -238,6 +243,7 @@ public void testExceptionReply() throws Exception {
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertSame(exc, exception.get().getCause());
assertEquals(0, suspensions.size());
exec.shutdownNow();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

Expand Down Expand Up @@ -89,8 +90,9 @@ public void shouldNotDropMessageOrBlockSendingThread() {
barrier.setReleaseStrategy(trackingReleaseStrategy);
final CountDownLatch start = new CountDownLatch(1);
final CountDownLatch sent = new CountDownLatch(200);
ExecutorService exec = Executors.newSingleThreadExecutor();
for (int i = 0; i < 200; i++) {
sendAsynchronously(barrier, testMessage(), start, sent);
sendAsynchronously(barrier, testMessage(), start, sent, exec);
}
start.countDown();

Expand All @@ -106,10 +108,12 @@ public void shouldNotDropMessageOrBlockSendingThread() {
trackingReleaseStrategy.release("foo");
assertThat((barrier.receive()), is(notNullValue()));
}
exec.shutdownNow();
}

private void sendAsynchronously(final MessageHandler handler, final Message<Object> message, final CountDownLatch start, final CountDownLatch sent) {
Executors.newSingleThreadExecutor().execute(() -> {
private void sendAsynchronously(final MessageHandler handler, final Message<Object> message,
final CountDownLatch start, final CountDownLatch sent, ExecutorService exec) {
exec.execute(() -> {
try {
start.await();
}
Expand All @@ -119,7 +123,6 @@ private void sendAsynchronously(final MessageHandler handler, final Message<Obje
handler.handleMessage(message);
sent.countDown();
});

}

private Message<Object> testMessage() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -146,7 +147,8 @@ public void shouldNotPruneWhileCompleting() throws Exception {
handler.handleMessage(message1);
bothMessagesHandled.countDown();
storedMessages.add(message1);
Executors.newSingleThreadExecutor().submit(() -> {
ExecutorService exec = Executors.newSingleThreadExecutor();
exec.submit(() -> {
handler.handleMessage(message2);
storedMessages.add(message2);
bothMessagesHandled.countDown();
Expand All @@ -155,6 +157,7 @@ public void shouldNotPruneWhileCompleting() throws Exception {
assertTrue(bothMessagesHandled.await(10, TimeUnit.SECONDS));

assertEquals(0, store.expireMessageGroups(10000));
exec.shutdownNow();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void validateSequenceSizeHasNoAffectCustomCorrelator() throws Exception {
});
}

assertTrue("Sends failed to complete: " + latch.getCount() + " remain", latch.await(60, TimeUnit.SECONDS));
assertTrue("Sends failed to complete: " + latch.getCount() + " remain", latch.await(120, TimeUnit.SECONDS));

Message<?> message = resultChannel.receive(1000);
int counter = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017 the original author or authors.
* Copyright 2017-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,8 +20,7 @@
import static org.junit.Assert.assertTrue;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -36,6 +35,7 @@
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;

Expand Down Expand Up @@ -128,8 +128,8 @@ public ProxyFactoryBean publishSubscribeChannel() {
}

@Bean
public ExecutorService executor() {
return Executors.newCachedThreadPool();
public Executor executor() {
return new ThreadPoolTaskExecutor();
}

private ProxyFactoryBean createProxyFactory(MessageChannel channel) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -76,8 +77,9 @@ public void verifyDifferentThread() throws Exception {
@Test
public void roundRobinLoadBalancing() throws Exception {
int numberOfMessages = 11;
ConcurrentTaskExecutor taskExecutor = new ConcurrentTaskExecutor(
Executors.newSingleThreadScheduledExecutor(new CustomizableThreadFactory("test-")));
ScheduledExecutorService exec = Executors
.newSingleThreadScheduledExecutor(new CustomizableThreadFactory("test-"));
ConcurrentTaskExecutor taskExecutor = new ConcurrentTaskExecutor(exec);
ExecutorChannel channel = new ExecutorChannel(
taskExecutor, new RoundRobinLoadBalancingStrategy());
CountDownLatch latch = new CountDownLatch(numberOfMessages);
Expand All @@ -104,13 +106,15 @@ public void roundRobinLoadBalancing() throws Exception {
assertEquals(4, handler1.count.get());
assertEquals(4, handler2.count.get());
assertEquals(3, handler3.count.get());
exec.shutdownNow();
}

@Test
public void verifyFailoverWithLoadBalancing() throws Exception {
int numberOfMessages = 11;
ConcurrentTaskExecutor taskExecutor = new ConcurrentTaskExecutor(
Executors.newSingleThreadScheduledExecutor(new CustomizableThreadFactory("test-")));
ScheduledExecutorService exec = Executors
.newSingleThreadScheduledExecutor(new CustomizableThreadFactory("test-"));
ConcurrentTaskExecutor taskExecutor = new ConcurrentTaskExecutor(exec);
ExecutorChannel channel = new ExecutorChannel(
taskExecutor, new RoundRobinLoadBalancingStrategy());
CountDownLatch latch = new CountDownLatch(numberOfMessages);
Expand Down Expand Up @@ -138,13 +142,15 @@ public void verifyFailoverWithLoadBalancing() throws Exception {
assertEquals(0, handler2.count.get());
assertEquals(4, handler1.count.get());
assertEquals(7, handler3.count.get());
exec.shutdownNow();
}

@Test
public void verifyFailoverWithoutLoadBalancing() throws Exception {
int numberOfMessages = 11;
ConcurrentTaskExecutor taskExecutor = new ConcurrentTaskExecutor(
Executors.newSingleThreadScheduledExecutor(new CustomizableThreadFactory("test-")));
ScheduledExecutorService exec = Executors
.newSingleThreadScheduledExecutor(new CustomizableThreadFactory("test-"));
ConcurrentTaskExecutor taskExecutor = new ConcurrentTaskExecutor(exec);
ExecutorChannel channel = new ExecutorChannel(taskExecutor, null);
CountDownLatch latch = new CountDownLatch(numberOfMessages);
TestHandler handler1 = new TestHandler(latch);
Expand All @@ -169,6 +175,7 @@ public void verifyFailoverWithoutLoadBalancing() throws Exception {
assertEquals(0, handler1.count.get());
assertEquals(0, handler3.count.get());
assertEquals(numberOfMessages, handler2.count.get());
exec.shutdownNow();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -40,7 +41,7 @@
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.MessageRejectedException;
import org.springframework.integration.dispatcher.RoundRobinLoadBalancingStrategy;
Expand Down Expand Up @@ -69,7 +70,7 @@ public class MixedDispatcherConfigurationScenarioTests {
@Mock
private List<Exception> exceptionRegistry;

private ApplicationContext ac;
private ConfigurableApplicationContext ac;

@Mock
private MessageHandler handlerA;
Expand Down Expand Up @@ -99,6 +100,12 @@ public void initialize() throws Exception {
failed = new AtomicBoolean(false);
}

@After
public void tearDown() {
this.executor.shutdownNow();
this.ac.close();
}

@Test
public void noFailoverNoLoadBalancing() {
DirectChannel channel = (DirectChannel) ac.getBean("noLoadBalancerNoFailover");
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2016 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,7 +24,6 @@

import java.util.Comparator;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -38,6 +37,7 @@

/**
* @author Mark Fisher
* @author Gary Russell
*/
public class PriorityChannelTests {

Expand Down Expand Up @@ -246,7 +246,7 @@ public void testTimeoutDoesNotElapse() throws InterruptedException {
final PriorityChannel channel = new PriorityChannel(1);
final AtomicBoolean sentSecondMessage = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
Executor executor = Executors.newSingleThreadScheduledExecutor();
ExecutorService executor = Executors.newSingleThreadScheduledExecutor();
channel.send(new GenericMessage<String>("test-1"));
executor.execute(() -> {
sentSecondMessage.set(channel.send(new GenericMessage<String>("test-2"), 3000));
Expand All @@ -262,6 +262,7 @@ public void testTimeoutDoesNotElapse() throws InterruptedException {
Message<?> message2 = channel.receive();
assertNotNull(message2);
assertEquals("test-2", message2.getPayload());
executor.shutdownNow();
}

@Test
Expand Down
Loading