Skip to content

Commit b7ea331

Browse files
committed
Fixing concurrent modification exceptions in the pooling logic.
fixes elastic#894
1 parent 839bcfd commit b7ea331

File tree

3 files changed

+149
-9
lines changed

3 files changed

+149
-9
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.hadoop.rest.pooling;
21+
22+
import org.elasticsearch.hadoop.cfg.Settings;
23+
import org.elasticsearch.hadoop.rest.Transport;
24+
import org.elasticsearch.hadoop.util.SettingsUtils;
25+
import org.elasticsearch.hadoop.util.TestSettings;
26+
import org.junit.Test;
27+
28+
import java.util.UUID;
29+
30+
import static org.elasticsearch.hadoop.cfg.ConfigurationOptions.ES_NET_TRANSPORT_POOLING_EXPIRATION_TIMEOUT;
31+
32+
public class AbstractTransportPoolTest {
33+
34+
@Test
35+
public void removeOldConnections() throws Exception {
36+
Settings settings = new TestSettings();
37+
settings.setProperty(ES_NET_TRANSPORT_POOLING_EXPIRATION_TIMEOUT, "2s");
38+
39+
String host = SettingsUtils.discoveredOrDeclaredNodes(settings).get(0);
40+
41+
TransportPool pool = new TransportPool(UUID.randomUUID().toString(), host, settings);
42+
43+
Transport transport1 = null;
44+
Transport transport2 = null;
45+
Transport transport3 = null;
46+
47+
try {
48+
// Checkout three transports all at once, this should create three pooled transports.
49+
transport1 = pool.borrowTransport();
50+
transport2 = pool.borrowTransport();
51+
transport3 = pool.borrowTransport();
52+
53+
// Close two of them.
54+
transport1.close();
55+
transport2.close();
56+
57+
// Wait the amount of time to close.
58+
Thread.sleep(settings.getTransportPoolingExpirationTimeout().millis() + 1000L);
59+
60+
// Will need to remove 2 connections at this point
61+
pool.removeOldConnections();
62+
63+
} finally {
64+
// Close everything
65+
if (transport1 != null) {
66+
transport1.close();
67+
}
68+
69+
if (transport2 != null) {
70+
transport2.close();
71+
}
72+
73+
if (transport3 != null) {
74+
transport3.close();
75+
}
76+
}
77+
}
78+
79+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.hadoop.rest.pooling;
21+
22+
import org.elasticsearch.hadoop.LocalEs;
23+
import org.junit.ClassRule;
24+
import org.junit.rules.ExternalResource;
25+
import org.junit.runner.RunWith;
26+
import org.junit.runners.Suite;
27+
28+
@RunWith(Suite.class)
29+
@Suite.SuiteClasses({ AbstractTransportPoolTest.class })
30+
public class PoolingSuite {
31+
32+
@ClassRule
33+
public static ExternalResource resource = new LocalEs();
34+
35+
}

mr/src/main/java/org/elasticsearch/hadoop/rest/pooling/TransportPool.java

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
import org.elasticsearch.hadoop.util.unit.TimeValue;
3232

3333
import java.io.IOException;
34+
import java.util.ArrayList;
3435
import java.util.HashMap;
36+
import java.util.List;
3537
import java.util.Map;
3638

3739
import static org.elasticsearch.hadoop.rest.Request.Method.GET;
@@ -115,20 +117,37 @@ private void release(PooledTransport transport) {
115117
*/
116118
synchronized Transport borrowTransport() {
117119
long now = System.currentTimeMillis();
120+
121+
List<PooledTransport> garbageTransports = new ArrayList<PooledTransport>();
122+
PooledTransport candidate = null;
123+
124+
// Grab a transport
118125
for (Map.Entry<PooledTransport, Long> entry : idle.entrySet()) {
119126
PooledTransport transport = entry.getKey();
120127
if (validate(transport)) {
121-
idle.remove(transport);
122-
leased.put(transport, now);
123-
return new LeasedTransport(transport, this);
128+
candidate = transport;
129+
break;
124130
} else {
125-
idle.remove(transport);
126-
release(transport);
131+
garbageTransports.add(transport);
127132
}
128133
}
129-
PooledTransport transport = create();
130-
leased.put(transport, now);
131-
return new LeasedTransport(transport, this);
134+
135+
// Remove any dead connections found
136+
for (PooledTransport transport : garbageTransports) {
137+
idle.remove(transport);
138+
release(transport);
139+
}
140+
141+
// Create the connection if we didn't find any, remove it from the pool if we did.
142+
if (candidate == null) {
143+
candidate = create();
144+
} else {
145+
idle.remove(candidate);
146+
}
147+
148+
// Lease.
149+
leased.put(candidate, now);
150+
return new LeasedTransport(candidate, this);
132151
}
133152

134153
/**
@@ -165,6 +184,8 @@ private synchronized void returnTransport(Transport returning) {
165184
synchronized int removeOldConnections() {
166185
long now = System.currentTimeMillis();
167186
long expirationTime = now - idleTransportTimeout.millis();
187+
188+
List<PooledTransport> removeFromIdle = new ArrayList<PooledTransport>();
168189
for (Map.Entry<PooledTransport, Long> idleEntry : idle.entrySet()) {
169190
long lastUsed = idleEntry.getValue();
170191
if (lastUsed < expirationTime) {
@@ -175,9 +196,14 @@ synchronized int removeOldConnections() {
175196
+ idleTransportTimeout + "] ago.");
176197
}
177198
release(removed);
178-
idle.remove(removed);
199+
removeFromIdle.add(removed);
179200
}
180201
}
202+
203+
for (PooledTransport toRemove : removeFromIdle) {
204+
idle.remove(toRemove);
205+
}
206+
181207
return idle.size() + leased.size();
182208
}
183209

0 commit comments

Comments
 (0)