Skip to content

Commit a5db683

Browse files
authored
HADOOP-17749. Remove lock contention in SelectorPool of SocketIOWithTimeout (apache#3080)
1 parent f639fbc commit a5db683

File tree

2 files changed

+124
-58
lines changed

2 files changed

+124
-58
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/SocketIOWithTimeout.java

Lines changed: 45 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@
2828
import java.nio.channels.Selector;
2929
import java.nio.channels.SocketChannel;
3030
import java.nio.channels.spi.SelectorProvider;
31-
import java.util.Iterator;
32-
import java.util.LinkedList;
31+
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.concurrent.ConcurrentLinkedDeque;
33+
import java.util.concurrent.atomic.AtomicBoolean;
3334

3435
import org.apache.hadoop.util.Time;
3536
import org.slf4j.Logger;
@@ -48,8 +49,6 @@ abstract class SocketIOWithTimeout {
4849
private long timeout;
4950
private boolean closed = false;
5051

51-
private static SelectorPool selector = new SelectorPool();
52-
5352
/* A timeout value of 0 implies wait for ever.
5453
* We should have a value of timeout that implies zero wait.. i.e.
5554
* read or write returns immediately.
@@ -154,7 +153,7 @@ int doIO(ByteBuffer buf, int ops) throws IOException {
154153
//now wait for socket to be ready.
155154
int count = 0;
156155
try {
157-
count = selector.select(channel, ops, timeout);
156+
count = SelectorPool.select(channel, ops, timeout);
158157
} catch (IOException e) { //unexpected IOException.
159158
closed = true;
160159
throw e;
@@ -200,7 +199,7 @@ static void connect(SocketChannel channel,
200199
// we might have to call finishConnect() more than once
201200
// for some channels (with user level protocols)
202201

203-
int ret = selector.select((SelectableChannel)channel,
202+
int ret = SelectorPool.select(channel,
204203
SelectionKey.OP_CONNECT, timeoutLeft);
205204

206205
if (ret > 0 && channel.finishConnect()) {
@@ -242,7 +241,7 @@ static void connect(SocketChannel channel,
242241
*/
243242
void waitForIO(int ops) throws IOException {
244243

245-
if (selector.select(channel, ops, timeout) == 0) {
244+
if (SelectorPool.select(channel, ops, timeout) == 0) {
246245
throw new SocketTimeoutException(timeoutExceptionString(channel, timeout,
247246
ops));
248247
}
@@ -280,12 +279,17 @@ private static String timeoutExceptionString(SelectableChannel channel,
280279
* This maintains a pool of selectors. These selectors are closed
281280
* once they are idle (unused) for a few seconds.
282281
*/
283-
private static class SelectorPool {
282+
private static final class SelectorPool {
284283

285-
private static class SelectorInfo {
286-
Selector selector;
287-
long lastActivityTime;
288-
LinkedList<SelectorInfo> queue;
284+
private static final class SelectorInfo {
285+
private final SelectorProvider provider;
286+
private final Selector selector;
287+
private long lastActivityTime;
288+
289+
private SelectorInfo(SelectorProvider provider, Selector selector) {
290+
this.provider = provider;
291+
this.selector = selector;
292+
}
289293

290294
void close() {
291295
if (selector != null) {
@@ -298,16 +302,11 @@ void close() {
298302
}
299303
}
300304

301-
private static class ProviderInfo {
302-
SelectorProvider provider;
303-
LinkedList<SelectorInfo> queue; // lifo
304-
ProviderInfo next;
305-
}
305+
private static ConcurrentHashMap<SelectorProvider, ConcurrentLinkedDeque
306+
<SelectorInfo>> providerMap = new ConcurrentHashMap<>();
306307

307308
private static final long IDLE_TIMEOUT = 10 * 1000; // 10 seconds.
308309

309-
private ProviderInfo providerList = null;
310-
311310
/**
312311
* Waits on the channel with the given timeout using one of the
313312
* cached selectors. It also removes any cached selectors that are
@@ -319,7 +318,7 @@ private static class ProviderInfo {
319318
* @return
320319
* @throws IOException
321320
*/
322-
int select(SelectableChannel channel, int ops, long timeout)
321+
static int select(SelectableChannel channel, int ops, long timeout)
323322
throws IOException {
324323

325324
SelectorInfo info = get(channel);
@@ -385,35 +384,18 @@ int select(SelectableChannel channel, int ops, long timeout)
385384
* @return
386385
* @throws IOException
387386
*/
388-
private synchronized SelectorInfo get(SelectableChannel channel)
387+
private static SelectorInfo get(SelectableChannel channel)
389388
throws IOException {
390-
SelectorInfo selInfo = null;
391-
392389
SelectorProvider provider = channel.provider();
393-
394390
// pick the list : rarely there is more than one provider in use.
395-
ProviderInfo pList = providerList;
396-
while (pList != null && pList.provider != provider) {
397-
pList = pList.next;
398-
}
399-
if (pList == null) {
400-
//LOG.info("Creating new ProviderInfo : " + provider.toString());
401-
pList = new ProviderInfo();
402-
pList.provider = provider;
403-
pList.queue = new LinkedList<SelectorInfo>();
404-
pList.next = providerList;
405-
providerList = pList;
406-
}
407-
408-
LinkedList<SelectorInfo> queue = pList.queue;
409-
410-
if (queue.isEmpty()) {
391+
ConcurrentLinkedDeque<SelectorInfo> infoQ = providerMap.computeIfAbsent(
392+
provider, k -> new ConcurrentLinkedDeque<>());
393+
394+
SelectorInfo selInfo = infoQ.pollLast(); // last in first out
395+
if (selInfo == null) {
411396
Selector selector = provider.openSelector();
412-
selInfo = new SelectorInfo();
413-
selInfo.selector = selector;
414-
selInfo.queue = queue;
415-
} else {
416-
selInfo = queue.removeLast();
397+
// selInfo will be put into infoQ after `#release()`
398+
selInfo = new SelectorInfo(provider, selector);
417399
}
418400

419401
trimIdleSelectors(Time.now());
@@ -426,34 +408,39 @@ private synchronized SelectorInfo get(SelectableChannel channel)
426408
*
427409
* @param info
428410
*/
429-
private synchronized void release(SelectorInfo info) {
411+
private static void release(SelectorInfo info) {
430412
long now = Time.now();
431413
trimIdleSelectors(now);
432414
info.lastActivityTime = now;
433-
info.queue.addLast(info);
415+
// SelectorInfos in queue are sorted by lastActivityTime
416+
providerMap.get(info.provider).addLast(info);
434417
}
435418

419+
private static AtomicBoolean trimming = new AtomicBoolean(false);
420+
436421
/**
437422
* Closes selectors that are idle for IDLE_TIMEOUT (10 sec). It does not
438423
* traverse the whole list, just over the one that have crossed
439424
* the timeout.
440425
*/
441-
private void trimIdleSelectors(long now) {
426+
private static void trimIdleSelectors(long now) {
427+
if (!trimming.compareAndSet(false, true)) {
428+
return;
429+
}
430+
442431
long cutoff = now - IDLE_TIMEOUT;
443-
444-
for(ProviderInfo pList=providerList; pList != null; pList=pList.next) {
445-
if (pList.queue.isEmpty()) {
446-
continue;
447-
}
448-
for(Iterator<SelectorInfo> it = pList.queue.iterator(); it.hasNext();) {
449-
SelectorInfo info = it.next();
450-
if (info.lastActivityTime > cutoff) {
432+
for (ConcurrentLinkedDeque<SelectorInfo> infoQ : providerMap.values()) {
433+
SelectorInfo oldest;
434+
while ((oldest = infoQ.peekFirst()) != null) {
435+
if (oldest.lastActivityTime <= cutoff && infoQ.remove(oldest)) {
436+
oldest.close();
437+
} else {
451438
break;
452439
}
453-
it.remove();
454-
info.close();
455440
}
456441
}
442+
443+
trimming.set(false);
457444
}
458445
}
459446
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/TestSocketIOWithTimeout.java

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@
2424
import java.net.SocketTimeoutException;
2525
import java.nio.channels.Pipe;
2626
import java.util.Arrays;
27+
import java.util.concurrent.CountDownLatch;
28+
import java.util.concurrent.ExecutorService;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.TimeUnit;
31+
import java.util.concurrent.atomic.AtomicLong;
2732

2833
import org.apache.hadoop.test.GenericTestUtils;
2934
import org.apache.hadoop.test.MultithreadedTestUtil;
@@ -186,6 +191,46 @@ public void doWork() throws Exception {
186191
}
187192
}
188193

194+
@Test
195+
public void testSocketIOWithTimeoutByMultiThread() throws Exception {
196+
CountDownLatch latch = new CountDownLatch(1);
197+
Runnable ioTask = () -> {
198+
try {
199+
Pipe pipe = Pipe.open();
200+
try (Pipe.SourceChannel source = pipe.source();
201+
InputStream in = new SocketInputStream(source, TIMEOUT);
202+
Pipe.SinkChannel sink = pipe.sink();
203+
OutputStream out = new SocketOutputStream(sink, TIMEOUT)) {
204+
205+
byte[] writeBytes = TEST_STRING.getBytes();
206+
byte[] readBytes = new byte[writeBytes.length];
207+
latch.await();
208+
209+
out.write(writeBytes);
210+
doIO(null, out, TIMEOUT);
211+
212+
in.read(readBytes);
213+
assertArrayEquals(writeBytes, readBytes);
214+
doIO(in, null, TIMEOUT);
215+
}
216+
} catch (Exception e) {
217+
fail(e.getMessage());
218+
}
219+
};
220+
221+
int threadCnt = 64;
222+
ExecutorService threadPool = Executors.newFixedThreadPool(threadCnt);
223+
for (int i = 0; i < threadCnt; ++i) {
224+
threadPool.submit(ioTask);
225+
}
226+
227+
Thread.sleep(1000);
228+
latch.countDown();
229+
230+
threadPool.shutdown();
231+
assertTrue(threadPool.awaitTermination(3, TimeUnit.SECONDS));
232+
}
233+
189234
@Test
190235
public void testSocketIOWithTimeoutInterrupted() throws Exception {
191236
Pipe pipe = Pipe.open();
@@ -223,4 +268,38 @@ public void doWork() throws Exception {
223268
ctx.stop();
224269
}
225270
}
271+
272+
@Test
273+
public void testSocketIOWithTimeoutInterruptedByMultiThread()
274+
throws Exception {
275+
final int timeout = TIMEOUT * 10;
276+
AtomicLong readCount = new AtomicLong();
277+
AtomicLong exceptionCount = new AtomicLong();
278+
Runnable ioTask = () -> {
279+
try {
280+
Pipe pipe = Pipe.open();
281+
try (Pipe.SourceChannel source = pipe.source();
282+
InputStream in = new SocketInputStream(source, timeout)) {
283+
in.read();
284+
readCount.incrementAndGet();
285+
} catch (InterruptedIOException ste) {
286+
exceptionCount.incrementAndGet();
287+
}
288+
} catch (Exception e) {
289+
fail(e.getMessage());
290+
}
291+
};
292+
293+
int threadCnt = 64;
294+
ExecutorService threadPool = Executors.newFixedThreadPool(threadCnt);
295+
for (int i = 0; i < threadCnt; ++i) {
296+
threadPool.submit(ioTask);
297+
}
298+
Thread.sleep(1000);
299+
threadPool.shutdownNow();
300+
threadPool.awaitTermination(1, TimeUnit.SECONDS);
301+
302+
assertEquals(0, readCount.get());
303+
assertEquals(threadCnt, exceptionCount.get());
304+
}
226305
}

0 commit comments

Comments
 (0)