Skip to content

MAPREDUCE-7329: HadoopPipes task has failed because of the ping timeout exception #2775

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@

import javax.crypto.SecretKey;

import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
Expand All @@ -52,6 +54,7 @@
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
Expand All @@ -66,6 +69,7 @@ class Application<K1 extends WritableComparable, V1 extends Writable,
private static final Logger LOG =
LoggerFactory.getLogger(Application.class.getName());
private ServerSocket serverSocket;
private PingSocketCleaner socketCleaner;
private Process process;
private Socket clientSocket;
private OutputHandler<K2, V2> handler;
Expand Down Expand Up @@ -133,6 +137,13 @@ class Application<K1 extends WritableComparable, V1 extends Writable,

process = runClient(cmd, env);
clientSocket = serverSocket.accept();
// start ping socket cleaner
int soTimeout = conf.getInt(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY,
CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);
socketCleaner = new PingSocketCleaner("ping-socket-cleaner", serverSocket,
soTimeout);
socketCleaner.setDaemon(true);
socketCleaner.start();

String challenge = getSecurityChallenge();
String digestToSend = createDigest(password, challenge);
Expand Down Expand Up @@ -237,6 +248,7 @@ void cleanup() throws IOException {
serverSocket.close();
try {
downlink.close();
socketCleaner.interrupt();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
Expand Down Expand Up @@ -266,4 +278,44 @@ public static String createDigest(byte[] password, String data)
return SecureShuffleUtils.hashFromString(data, key);
}

@VisibleForTesting
public static class PingSocketCleaner extends Thread {
private final ServerSocket serverSocket;
private final int soTimeout;

PingSocketCleaner(String name, ServerSocket serverSocket, int soTimeout) {
super(name);
this.serverSocket = serverSocket;
this.soTimeout = soTimeout;
}

@Override
public void run() {
LOG.info("PingSocketCleaner started...");
while (!Thread.currentThread().isInterrupted()) {
Socket clientSocket = null;
try {
clientSocket = serverSocket.accept();
clientSocket.setSoTimeout(soTimeout);
LOG.debug("Connection received from {}",
clientSocket.getInetAddress());
int readData = 0;
while (readData != -1) {
readData = clientSocket.getInputStream().read();
}
LOG.debug("close socket cause client has closed.");
closeSocketInternal(clientSocket);
} catch (IOException exception) {
LOG.error("PingSocketCleaner exception", exception);
} finally {
closeSocketInternal(clientSocket);
}
}
}

@VisibleForTesting
protected void closeSocketInternal(Socket clientSocket) {
IOUtils.closeSocket(clientSocket);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FsConstants;
Expand All @@ -59,7 +62,9 @@
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.pipes.Application.PingSocketCleaner;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
Expand Down Expand Up @@ -455,6 +460,84 @@ public void testPipesPartitioner() {
assertEquals(3, partitioner.getPartition(iw, new Text("test"), 2));
}

@Test
public void testSocketCleaner() throws Exception {
ServerSocket serverSocket = setupServerSocket();
SocketCleaner cleaner = setupCleaner(serverSocket);
// mock ping thread, connect to server socket per second.
int expectedClosedCount = 5;
for (int i = 0; i < expectedClosedCount; i++) {
try {
Thread.sleep(1000);
Socket clientSocket = new Socket(serverSocket.getInetAddress(),
serverSocket.getLocalPort());
clientSocket.close();
} catch (Exception exception) {
// ignored...
exception.printStackTrace();
}
}
GenericTestUtils.waitFor(
() -> expectedClosedCount == cleaner.getCloseSocketCount(), 100, 5000);
}

@Test
public void testSocketTimeout() throws Exception {
ServerSocket serverSocket = setupServerSocket();
SocketCleaner cleaner = setupCleaner(serverSocket, 100);
try {
new Socket(serverSocket.getInetAddress(), serverSocket.getLocalPort());
Thread.sleep(1000);
} catch (Exception exception) {
// ignored...
}
GenericTestUtils.waitFor(() -> 1 == cleaner.getCloseSocketCount(), 100,
5000);
}

private SocketCleaner setupCleaner(ServerSocket serverSocket) {
return setupCleaner(serverSocket,
CommonConfigurationKeys.IPC_PING_INTERVAL_DEFAULT);
}

private SocketCleaner setupCleaner(ServerSocket serverSocket, int soTimeout) {
// start socket cleaner.
SocketCleaner cleaner = new SocketCleaner("test-ping-socket-cleaner",
serverSocket, soTimeout);
cleaner.setDaemon(true);
cleaner.start();

return cleaner;
}

private static class SocketCleaner extends PingSocketCleaner {
private int closeSocketCount = 0;

SocketCleaner(String name, ServerSocket serverSocket, int soTimeout) {
super(name, serverSocket, soTimeout);
}

@Override
public void run() {
super.run();
}

protected void closeSocketInternal(Socket clientSocket) {
if (!clientSocket.isClosed()) {
closeSocketCount++;
}
super.closeSocketInternal(clientSocket);
}

public int getCloseSocketCount() {
return closeSocketCount;
}
}

private ServerSocket setupServerSocket() throws Exception {
return new ServerSocket(0, 1);
}

/**
* clean previous std error and outs
*/
Expand Down