Skip to content

Commit

Permalink
AMQP-824: Name for deferredCloseExec thread pool
Browse files Browse the repository at this point in the history
JIRA https://jira.spring.io/browse/AMQP-824

Taking the comments into account

Fix build

* Polishing for code style

**Cherry-pick to 2.0.x & 2.1.x**

# Conflicts:
#	spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/AbstractConnectionFactory.java
#	spring-rabbit/src/main/java/org/springframework/amqp/rabbit/connection/CachingConnectionFactory.java
  • Loading branch information
Will Droste authored and artembilan committed Jul 13, 2018
1 parent bac9aa2 commit b896c89
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 13 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2017 the original author or authors.
* Copyright 2002-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -58,6 +58,7 @@
* @author Gary Russell
* @author Steve Powell
* @author Artem Bilan
* @author Will Droste
*
*/
public abstract class AbstractConnectionFactory implements ConnectionFactory, DisposableBean, BeanNameAware,
Expand Down Expand Up @@ -428,7 +429,14 @@ public void setBeanName(String name) {
}
}

public boolean hasPublisherConnectionFactory() {
/**
* Return a bean name of the component or null if not a bean.
* @return the bean name or null.
* @since 1.7.9
*/
protected String getBeanName() {
return this.beanName;
} public boolean hasPublisherConnectionFactory() {
return this.publisherConnectionFactory != null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -55,6 +56,7 @@
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
Expand Down Expand Up @@ -92,13 +94,21 @@
* @author Gary Russell
* @author Artem Bilan
* @author Steve Powell
* @author Will Droste
*/
@ManagedResource
public class CachingConnectionFactory extends AbstractConnectionFactory
implements InitializingBean, ShutdownListener {

private static final int DEFAULT_CHANNEL_CACHE_SIZE = 25;

private static final String DEFAULT_DEFERRED_POOL_PREFIX = "spring-rabbit-deferred-pool-";

/**
* Create a unique ID for the pool.
*/
private static final AtomicInteger threadPoolId = new AtomicInteger();

private static final Set<String> txStarts = new HashSet<>(Arrays.asList("basicPublish", "basicAck",
"basicNack", "basicReject"));

Expand Down Expand Up @@ -148,9 +158,6 @@ public enum CacheMode {
/** Synchronization monitor for the shared Connection. */
private final Object connectionMonitor = new Object();

/** Executor used for deferred close if no explicit executor set. */
private final ExecutorService deferredCloseExecutor = Executors.newCachedThreadPool();

private long channelCheckoutTimeout = 0;

private CacheMode cacheMode = CacheMode.CHANNEL;
Expand All @@ -172,6 +179,10 @@ public enum CacheMode {
private volatile boolean active = true;

private volatile boolean initialized;
/**
* Executor used for deferred close if no explicit executor set.
*/
private ExecutorService deferredCloseExecutor;

private volatile boolean stopped;

Expand Down Expand Up @@ -764,7 +775,9 @@ public final void destroy() {
resetConnection();
if (getContextStopped()) {
this.stopped = true;
this.deferredCloseExecutor.shutdownNow();
if (this.deferredCloseExecutor != null) {
this.deferredCloseExecutor.shutdownNow();
}
}
}

Expand Down Expand Up @@ -910,6 +923,28 @@ private int countOpenConnections() {
return n;
}

/**
* Determine the executor service used to close connections.
* @return specified executor service otherwise the default one is created and returned.
* @since 1.7.9
*/
protected ExecutorService getDeferredCloseExecutor() {
if (getExecutorService() != null) {
return getExecutorService();
}
synchronized (this.connectionMonitor) {
if (this.deferredCloseExecutor == null) {
final String threadPrefix =
getBeanName() == null
? DEFAULT_DEFERRED_POOL_PREFIX + threadPoolId.incrementAndGet()
: getBeanName();
ThreadFactory threadPoolFactory = new CustomizableThreadFactory(threadPrefix);
this.deferredCloseExecutor = Executors.newCachedThreadPool(threadPoolFactory);
}
}
return this.deferredCloseExecutor;
}

@Override
public String toString() {
return "CachingConnectionFactory [channelCacheSize=" + this.channelCacheSize + ", host=" + getHost()
Expand Down Expand Up @@ -1187,9 +1222,7 @@ private void physicalClose() throws Exception {
}

private void asyncClose() {
ExecutorService executorService = (getExecutorService() != null
? getExecutorService()
: CachingConnectionFactory.this.deferredCloseExecutor);
ExecutorService executorService = getDeferredCloseExecutor();
final Channel channel = CachedChannelInvocationHandler.this.target;
executorService.execute(() -> {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2010-2016 the original author or authors.
* Copyright 2010-2018 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@
import static org.junit.Assert.assertTrue;

import java.util.Arrays;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;

import org.junit.After;
Expand All @@ -42,6 +43,7 @@
/**
* @author Dave Syer
* @author Gary Russell
* @author Will Droste
*/
public final class ListenerContainerPlaceholderParserTests {

Expand All @@ -58,14 +60,18 @@ public void closeBeanFactory() throws Exception {
if (this.context != null) {
CachingConnectionFactory cf = this.context.getBean(CachingConnectionFactory.class);
this.context.close();
assertTrue(TestUtils.getPropertyValue(cf, "deferredCloseExecutor", ThreadPoolExecutor.class)
.isTerminated());
ExecutorService es = TestUtils.getPropertyValue(cf, "deferredCloseExecutor", ThreadPoolExecutor.class);
if (es != null) {
// if it gets started make sure its terminated..
assertTrue(es.isTerminated());
}
}
}

@Test
public void testParseWithQueueNames() throws Exception {
SimpleMessageListenerContainer container = this.context.getBean("testListener", SimpleMessageListenerContainer.class);
SimpleMessageListenerContainer container =
this.context.getBean("testListener", SimpleMessageListenerContainer.class);
assertEquals(AcknowledgeMode.MANUAL, container.getAcknowledgeMode());
assertEquals(this.context.getBean(ConnectionFactory.class), container.getConnectionFactory());
assertEquals(MessageListenerAdapter.class, container.getMessageListener().getClass());
Expand Down

0 comments on commit b896c89

Please sign in to comment.