Skip to content

Commit 08790b1

Browse files
committed
MAPREDUCE-7329: HadoopPipes task may fail when linux kernel version change from 3.x to 4.x
1 parent 9b2f812 commit 08790b1

File tree

2 files changed

+135
-0
lines changed
  • hadoop-mapreduce-project/hadoop-mapreduce-client

2 files changed

+135
-0
lines changed

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Application.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@
3030

3131
import javax.crypto.SecretKey;
3232

33+
import org.apache.hadoop.fs.CommonConfigurationKeys;
3334
import org.apache.hadoop.fs.FSDataOutputStream;
3435
import org.apache.hadoop.fs.FileSystem;
3536
import org.apache.hadoop.fs.FileUtil;
3637
import org.apache.hadoop.fs.Path;
3738
import org.apache.hadoop.fs.permission.FsPermission;
3839
import org.apache.hadoop.io.FloatWritable;
40+
import org.apache.hadoop.io.IOUtils;
3941
import org.apache.hadoop.io.NullWritable;
4042
import org.apache.hadoop.io.Writable;
4143
import org.apache.hadoop.io.WritableComparable;
@@ -52,6 +54,7 @@
5254
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
5355
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
5456
import org.apache.hadoop.security.token.Token;
57+
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
5558
import org.apache.hadoop.util.ReflectionUtils;
5659
import org.apache.hadoop.util.StringUtils;
5760
import org.slf4j.Logger;
@@ -66,6 +69,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
6669
private static final Logger LOG =
6770
LoggerFactory.getLogger(Application.class.getName());
6871
private ServerSocket serverSocket;
72+
private PingSocketCleaner socketCleaner;
6973
private Process process;
7074
private Socket clientSocket;
7175
private OutputHandler<K2, V2> handler;
@@ -133,6 +137,13 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
133137

134138
process = runClient(cmd, env);
135139
clientSocket = serverSocket.accept();
140+
// start ping socket cleaner
141+
int soTimeout = conf.getInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,
142+
CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);
143+
socketCleaner = new PingSocketCleaner("ping-socket-cleaner", serverSocket,
144+
soTimeout);
145+
socketCleaner.setDaemon(true);
146+
socketCleaner.start();
136147

137148
String challenge = getSecurityChallenge();
138149
String digestToSend = createDigest(password, challenge);
@@ -237,6 +248,7 @@ void cleanup() throws IOException {
237248
serverSocket.close();
238249
try {
239250
downlink.close();
251+
socketCleaner.interrupt();
240252
} catch (InterruptedException ie) {
241253
Thread.currentThread().interrupt();
242254
}
@@ -266,4 +278,44 @@ public static String createDigest(byte[] password, String data)
266278
return SecureShuffleUtils.hashFromString(data, key);
267279
}
268280

281+
@VisibleForTesting
282+
public static class PingSocketCleaner extends Thread {
283+
private final ServerSocket serverSocket;
284+
private final int soTimeout;
285+
286+
PingSocketCleaner(String name, ServerSocket serverSocket, int soTimeout) {
287+
super(name);
288+
this.serverSocket = serverSocket;
289+
this.soTimeout = soTimeout;
290+
}
291+
292+
@Override
293+
public void run() {
294+
LOG.info("PingSocketCleaner started...");
295+
while (!Thread.currentThread().isInterrupted()) {
296+
Socket clientSocket = null;
297+
try {
298+
clientSocket = serverSocket.accept();
299+
clientSocket.setSoTimeout(soTimeout);
300+
LOG.debug("Connection received from {}",
301+
clientSocket.getInetAddress());
302+
int readData = 0;
303+
while (readData != -1) {
304+
readData = clientSocket.getInputStream().read();
305+
}
306+
LOG.debug("close socket cause client has closed.");
307+
closeSocketInternal(clientSocket);
308+
} catch (IOException exception) {
309+
LOG.error("PingSocketCleaner exception", exception);
310+
} finally {
311+
closeSocketInternal(clientSocket);
312+
}
313+
}
314+
}
315+
316+
@VisibleForTesting
317+
protected void closeSocketInternal(Socket clientSocket) {
318+
IOUtils.closeSocket(clientSocket);
319+
}
320+
}
269321
}

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,15 @@
2828
import java.io.InputStream;
2929
import java.io.OutputStream;
3030
import java.io.PrintStream;
31+
import java.net.ServerSocket;
32+
import java.net.Socket;
3133
import java.util.ArrayList;
3234
import java.util.HashMap;
3335
import java.util.List;
3436
import java.util.Map;
3537
import java.util.Map.Entry;
3638

39+
import org.apache.hadoop.fs.CommonConfigurationKeys;
3740
import org.apache.hadoop.fs.FileSystem;
3841
import org.apache.hadoop.fs.FileUtil;
3942
import org.apache.hadoop.fs.FsConstants;
@@ -59,7 +62,9 @@
5962
import org.apache.hadoop.mapred.Reporter;
6063
import org.apache.hadoop.mapred.TaskAttemptID;
6164
import org.apache.hadoop.mapred.TaskLog;
65+
import org.apache.hadoop.mapred.pipes.Application.PingSocketCleaner;
6266
import org.apache.hadoop.security.token.Token;
67+
import org.apache.hadoop.test.GenericTestUtils;
6368
import org.apache.hadoop.util.ExitUtil;
6469
import org.apache.hadoop.util.Progressable;
6570
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@@ -455,6 +460,84 @@ public void testPipesPartitioner() {
455460
assertEquals(3, partitioner.getPartition(iw, new Text("test"), 2));
456461
}
457462

463+
@Test
464+
public void testSocketCleaner() throws Exception {
465+
ServerSocket serverSocket = setupServerSocket();
466+
SocketCleaner cleaner = setupCleaner(serverSocket);
467+
// mock ping thread, connect to server socket per second.
468+
int expectedClosedCount = 5;
469+
for (int i = 0; i < expectedClosedCount; i++) {
470+
try {
471+
Thread.sleep(1000);
472+
Socket clientSocket = new Socket(serverSocket.getInetAddress(),
473+
serverSocket.getLocalPort());
474+
clientSocket.close();
475+
} catch (Exception exception) {
476+
// ignored...
477+
exception.printStackTrace();
478+
}
479+
}
480+
GenericTestUtils.waitFor(
481+
() -> expectedClosedCount == cleaner.getCloseSocketCount(), 100, 5000);
482+
}
483+
484+
@Test
485+
public void testSocketTimeout() throws Exception {
486+
ServerSocket serverSocket = setupServerSocket();
487+
SocketCleaner cleaner = setupCleaner(serverSocket, 100);
488+
try {
489+
new Socket(serverSocket.getInetAddress(), serverSocket.getLocalPort());
490+
Thread.sleep(1000);
491+
} catch (Exception exception) {
492+
// ignored...
493+
}
494+
GenericTestUtils.waitFor(() -> 1 == cleaner.getCloseSocketCount(), 100,
495+
5000);
496+
}
497+
498+
private SocketCleaner setupCleaner(ServerSocket serverSocket) {
499+
return setupCleaner(serverSocket,
500+
CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);
501+
}
502+
503+
private SocketCleaner setupCleaner(ServerSocket serverSocket, int soTimeout) {
504+
// start socket cleaner.
505+
SocketCleaner cleaner = new SocketCleaner("test-ping-socket-cleaner",
506+
serverSocket, soTimeout);
507+
cleaner.setDaemon(true);
508+
cleaner.start();
509+
510+
return cleaner;
511+
}
512+
513+
private static class SocketCleaner extends PingSocketCleaner {
514+
private int closeSocketCount = 0;
515+
516+
SocketCleaner(String name, ServerSocket serverSocket, int soTimeout) {
517+
super(name, serverSocket, soTimeout);
518+
}
519+
520+
@Override
521+
public void run() {
522+
super.run();
523+
}
524+
525+
protected void closeSocketInternal(Socket clientSocket) {
526+
if (!clientSocket.isClosed()) {
527+
closeSocketCount++;
528+
}
529+
super.closeSocketInternal(clientSocket);
530+
}
531+
532+
public int getCloseSocketCount() {
533+
return closeSocketCount;
534+
}
535+
}
536+
537+
private ServerSocket setupServerSocket() throws Exception {
538+
return new ServerSocket(0, 1);
539+
}
540+
458541
/**
459542
* clean previous std error and outs
460543
*/

0 commit comments

Comments
 (0)