Skip to content

Commit e6fd514

Browse files
committed
Transport Pools should clean up old connections instead of leaving them idle.
PooledHttpTransportFactory.create() is now fully synchronized. Creating and borrowing from a host level transport pool is now atomic. If not, the cleanup thread could remove a host level pool from the factory in between pool creation and borrowing that would leave a leaked resource. PooledHttpTransportFactory.cleanPools() method added to walk the list of host level transport pools and request each to clean itself. If a host level transport pool no longer has any pooled transports after this cleaning, then the pool is removed. PooledTransportManager now starts a daemon thread that periodically requests the registered PooledHttpTransportFactory objects to clean their host level transport pools. TransportPool now logs pooled transport validation failures. TransportPool.removeOldConnections() method added to remove any connections that have sat idle longer than the job's configured maximum idle transport timeout, and returns the number of transports remaining in the pool. Added `es.net.transport.pooling.expiration.timeout` property (default 5 minutes) for defining how often pooled connections should be cleaned. fixes elastic#849
1 parent f1e94a6 commit e6fd514

File tree

10 files changed

+217
-49
lines changed

10 files changed

+217
-49
lines changed

mr/src/itest/java/org/elasticsearch/hadoop/integration/rest/ConnectionExhaustionSuite.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import org.elasticsearch.hadoop.LocalEs;
66
import org.elasticsearch.hadoop.cfg.Settings;
77
import org.elasticsearch.hadoop.rest.RestClient;
8+
import org.elasticsearch.hadoop.util.SettingsUtils;
89
import org.elasticsearch.hadoop.util.TestSettings;
910
import org.junit.ClassRule;
1011
import org.junit.Ignore;
@@ -18,8 +19,6 @@
1819
import java.util.UUID;
1920
import java.util.concurrent.TimeUnit;
2021

21-
import static org.elasticsearch.hadoop.cfg.InternalConfigurationOptions.INTERNAL_TRANSPORT_POOLING_KEY;
22-
2322
@Ignore
2423
@RunWith(Suite.class)
2524
@Suite.SuiteClasses({ ConnectionExhaustionSuite.ConnectionExhaustionTest.class })
@@ -65,7 +64,7 @@ public void exhaustConnections() throws InterruptedException {
6564
for (String jobKey : JOB_KEYS) {
6665
final Settings workerSettings = SETTINGS.copy();
6766
if (POOLED) {
68-
workerSettings.setProperty(INTERNAL_TRANSPORT_POOLING_KEY, jobKey);
67+
SettingsUtils.setJobTransportPoolingKey(workerSettings, jobKey);
6968
}
7069
Thread worker = new Thread(new Exhauster(++workerNum, workerSettings));
7170
worker.start();

mr/src/main/java/org/elasticsearch/hadoop/cfg/ConfigurationOptions.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,9 @@ public interface ConfigurationOptions {
241241
String ES_OUTPUT_JSON_DEFAULT = "no";
242242

243243
/** Network options */
244+
String ES_NET_TRANSPORT_POOLING_EXPIRATION_TIMEOUT = "es.net.transport.pooling.expiration.timeout";
245+
String ES_NET_TRANSPORT_POOLING_EXPIRATION_TIMEOUT_DEFAULT = "5m";
246+
244247
// SSL
245248
String ES_NET_USE_SSL = "es.net.ssl";
246249
String ES_NET_USE_SSL_DEFAULT = "false";

mr/src/main/java/org/elasticsearch/hadoop/cfg/Settings.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,10 @@ public TimeValue getHeartBeatLead() {
345345
return TimeValue.parseTimeValue(getProperty(ES_HEART_BEAT_LEAD, ES_HEART_BEAT_LEAD_DEFAULT));
346346
}
347347

348+
public TimeValue getTransportPoolingExpirationTimeout() {
349+
return TimeValue.parseTimeValue(getProperty(ES_NET_TRANSPORT_POOLING_EXPIRATION_TIMEOUT, ES_NET_TRANSPORT_POOLING_EXPIRATION_TIMEOUT_DEFAULT));
350+
}
351+
348352
// SSL
349353
public boolean getNetworkSSLEnabled() {
350354
return Booleans.parseBoolean(getProperty(ES_NET_USE_SSL, ES_NET_USE_SSL_DEFAULT));

mr/src/main/java/org/elasticsearch/hadoop/rest/NetworkClient.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.elasticsearch.hadoop.EsHadoopException;
3333
import org.elasticsearch.hadoop.EsHadoopIllegalStateException;
3434
import org.elasticsearch.hadoop.cfg.Settings;
35-
import org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransport;
3635
import org.elasticsearch.hadoop.rest.commonshttp.CommonsHttpTransportFactory;
3736
import org.elasticsearch.hadoop.rest.pooling.PooledTransportManager;
3837
import org.elasticsearch.hadoop.rest.stats.Stats;
@@ -41,8 +40,6 @@
4140
import org.elasticsearch.hadoop.util.ByteSequence;
4241
import org.elasticsearch.hadoop.util.SettingsUtils;
4342

44-
import static org.elasticsearch.hadoop.cfg.InternalConfigurationOptions.INTERNAL_TRANSPORT_POOLING_KEY;
45-
4643

4744
public class NetworkClient implements StatsAware, Closeable {
4845
private static Log log = LogFactory.getLog(NetworkClient.class);
@@ -59,7 +56,7 @@ public class NetworkClient implements StatsAware, Closeable {
5956
private final Stats stats = new Stats();
6057

6158
public NetworkClient(Settings settings) {
62-
this(settings, (settings.getProperty(INTERNAL_TRANSPORT_POOLING_KEY) == null ? new CommonsHttpTransportFactory() : PooledTransportManager.getTransportFactory(settings)));
59+
this(settings,(SettingsUtils.hasJobTransportPoolingKey(settings) ? new CommonsHttpTransportFactory() : PooledTransportManager.getTransportFactory(settings)));
6360
}
6461

6562
public NetworkClient(Settings settings, TransportFactory transportFactory) {

mr/src/main/java/org/elasticsearch/hadoop/rest/TransportFactory.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,10 @@
66
* Creates {@link Transport} Objects
77
*/
88
public interface TransportFactory {
9+
/**
10+
* Creates a {@link Transport} object
11+
* @param settings Specifies the Transport's properties
12+
* @param hostInfo Host to connect to
13+
*/
914
Transport create(Settings settings, String hostInfo);
1015
}

mr/src/main/java/org/elasticsearch/hadoop/rest/pooling/PooledHttpTransportFactory.java

Lines changed: 79 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,56 +7,112 @@
77
import org.elasticsearch.hadoop.cfg.Settings;
88
import org.elasticsearch.hadoop.rest.Transport;
99
import org.elasticsearch.hadoop.rest.TransportFactory;
10+
import org.elasticsearch.hadoop.util.SettingsUtils;
1011

11-
import java.util.concurrent.ConcurrentHashMap;
12-
import java.util.concurrent.ConcurrentMap;
13-
14-
import static org.elasticsearch.hadoop.cfg.InternalConfigurationOptions.INTERNAL_TRANSPORT_POOLING_KEY;
12+
import java.util.ArrayList;
13+
import java.util.HashMap;
14+
import java.util.List;
15+
import java.util.Map;
1516

1617
/**
1718
* Creates HTTP Transports that are backed by a pool of Transport objects for reuse.
1819
*/
19-
class PooledHttpTransportFactory implements TransportFactory {
20+
final class PooledHttpTransportFactory implements TransportFactory {
2021

2122
private final Log log = LogFactory.getLog(this.getClass());
22-
private final ConcurrentMap<String, TransportPool> hostPools = new ConcurrentHashMap<String, TransportPool>();
23+
private final Map<String, TransportPool> hostPools = new HashMap<String, TransportPool>();
2324
private final String jobKey;
2425

2526
PooledHttpTransportFactory(String jobKey) {
2627
this.jobKey = jobKey;
2728
}
2829

30+
/**
31+
* {@inheritDoc}
32+
*/
2933
@Override
30-
public Transport create(Settings settings, String hostInfo) {
34+
public synchronized Transport create(Settings settings, String hostInfo) {
3135
// Make sure that the caller's Settings has the correct job pool key.
32-
if (!jobKey.equals(settings.getProperty(INTERNAL_TRANSPORT_POOLING_KEY))) {
33-
throw new EsHadoopIllegalArgumentException("Settings object passed does not have the same `"+INTERNAL_TRANSPORT_POOLING_KEY+"` property as when this pool was created. This could be a different job incorrectly polluting the TransportPool. Bailing out...");
36+
assertCorrectJobId(settings);
37+
return borrowFrom(getOrCreateTransportPool(hostInfo, settings), hostInfo);
38+
}
39+
40+
/**
41+
* Checks to ensure that the caller is using a settings object with the same job id
42+
* that this pool is responsible for.
43+
* @param settings To be checked
44+
*/
45+
private void assertCorrectJobId(Settings settings) {
46+
SettingsUtils.ensureJobTransportPoolingKey(settings);
47+
String requestingJobKey = SettingsUtils.getJobTransportPoolingKey(settings);
48+
if (!jobKey.equals(requestingJobKey)) {
49+
throw new EsHadoopIllegalArgumentException("Settings object passed does not have the same job " +
50+
"pooling key property as when this pool was created. Job key requested was [" +
51+
requestingJobKey + "] but this pool services job [" + jobKey + "]. This could be a " +
52+
"different job incorrectly polluting the TransportPool. Bailing out...");
3453
}
54+
}
3555

36-
TransportPool pool = hostPools.get(hostInfo);
56+
/**
57+
* Gets the transport pool for the given host info, or creates one if it is absent.
58+
* @param hostInfo To get a pool for
59+
* @param settings For creating the pool if it does not exist
60+
* @return A transport pool for the given host
61+
*/
62+
private TransportPool getOrCreateTransportPool(String hostInfo, Settings settings) {
63+
TransportPool pool;
64+
pool = hostPools.get(hostInfo); // Check again in case it was added while waiting for the lock
3765
if (pool == null) {
38-
synchronized (this) {
39-
pool = hostPools.get(hostInfo); // Check again in case it was added while waiting for the lock
40-
if (pool == null) {
41-
pool = new TransportPool(jobKey, hostInfo, settings);
42-
hostPools.put(hostInfo, pool);
43-
if (log.isDebugEnabled()) {
44-
log.debug("Creating new TransportPool for job ["+jobKey+"] for host ["+hostInfo+"]");
45-
}
46-
}
66+
pool = new TransportPool(jobKey, hostInfo, settings);
67+
hostPools.put(hostInfo, pool);
68+
if (log.isDebugEnabled()) {
69+
log.debug("Creating new TransportPool for job ["+jobKey+"] for host ["+hostInfo+"]");
4770
}
4871
}
72+
return pool;
73+
}
4974

75+
/**
76+
* Creates a Transport using the given TransportPool.
77+
* @param pool Transport is borrowed from
78+
* @param hostInfo For logging purposes
79+
* @return A Transport backed by a pooled resource
80+
*/
81+
private Transport borrowFrom(TransportPool pool, String hostInfo) {
5082
if (!pool.getJobPoolingKey().equals(jobKey)) {
5183
throw new EsHadoopIllegalArgumentException("PooledTransportFactory found a pool with a different owner than this job. This could be a different job incorrectly polluting the TransportPool. Bailing out...");
5284
}
53-
54-
Transport borrowed;
5585
try {
56-
borrowed = pool.borrowTransport();
86+
return pool.borrowTransport();
5787
} catch (Exception e) {
5888
throw new EsHadoopException(String.format("Could not get a Transport from the Transport Pool for host [%s]", hostInfo));
5989
}
60-
return borrowed;
90+
}
91+
92+
/**
93+
* Iterates over the available host pools and asks each one to purge transports older than a certain age.
94+
* @return Total number of pooled connections still alive in this factory.
95+
*/
96+
synchronized int cleanPools() {
97+
int totalConnectionsRemaining = 0;
98+
List<String> hostsToRemove = new ArrayList<String>();
99+
for (Map.Entry<String, TransportPool> hostPool : hostPools.entrySet()) {
100+
String host = hostPool.getKey();
101+
TransportPool pool = hostPool.getValue();
102+
103+
int connectionsRemaining = pool.removeOldConnections();
104+
if (connectionsRemaining == 0) {
105+
hostsToRemove.add(host);
106+
} else {
107+
totalConnectionsRemaining += connectionsRemaining;
108+
}
109+
}
110+
111+
// Remove old pools that now have no connections.
112+
for (String hostToRemove : hostsToRemove) {
113+
hostPools.remove(hostToRemove);
114+
}
115+
116+
return totalConnectionsRemaining;
61117
}
62118
}

mr/src/main/java/org/elasticsearch/hadoop/rest/pooling/PooledTransportManager.java

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,14 @@
22

33
import org.apache.commons.logging.Log;
44
import org.apache.commons.logging.LogFactory;
5-
import org.elasticsearch.hadoop.cfg.InternalConfigurationOptions;
65
import org.elasticsearch.hadoop.cfg.Settings;
76
import org.elasticsearch.hadoop.rest.TransportFactory;
7+
import org.elasticsearch.hadoop.util.SettingsUtils;
88

9+
import java.util.Map;
910
import java.util.concurrent.ConcurrentHashMap;
1011
import java.util.concurrent.ConcurrentMap;
12+
import java.util.concurrent.TimeUnit;
1113

1214
/**
1315
* Central location to request {@link PooledHttpTransportFactory} instances so that the instances can be persisted
@@ -25,7 +27,8 @@ private PooledTransportManager() {
2527
= new ConcurrentHashMap<String, PooledHttpTransportFactory>();
2628

2729
public static TransportFactory getTransportFactory(Settings jobSettings) {
28-
String jobKey = jobSettings.getProperty(InternalConfigurationOptions.INTERNAL_TRANSPORT_POOLING_KEY);
30+
SettingsUtils.ensureJobTransportPoolingKey(jobSettings);
31+
String jobKey = SettingsUtils.getJobTransportPoolingKey(jobSettings);
2932

3033
PooledHttpTransportFactory factoryForJob = poolRegistry.get(jobKey);
3134
if (factoryForJob == null) {
@@ -42,4 +45,32 @@ public static TransportFactory getTransportFactory(Settings jobSettings) {
4245
}
4346
return factoryForJob;
4447
}
48+
49+
static {
50+
Thread cleanup = new Thread(new PoolCleaner());
51+
cleanup.setDaemon(true);
52+
cleanup.start();
53+
}
54+
55+
private static class PoolCleaner implements Runnable {
56+
private final Log log = LogFactory.getLog(getClass());
57+
private final long cleaningInterval = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES);
58+
59+
@Override
60+
public void run() {
61+
log.trace("Started PoolCleaner...");
62+
try {
63+
while (true) {
64+
log.trace("Waiting");
65+
Thread.sleep(cleaningInterval);
66+
log.trace("Cleaning...");
67+
for (Map.Entry<String, PooledHttpTransportFactory> entry : poolRegistry.entrySet()) {
68+
entry.getValue().cleanPools();
69+
}
70+
}
71+
} catch (InterruptedException e) {
72+
Thread.currentThread().interrupt();
73+
}
74+
}
75+
}
4576
}

0 commit comments

Comments
 (0)