Skip to content

Commit

Permalink
Revert "Revert "[Java] fix test hang occasionally when running Failur…
Browse files Browse the repository at this point in the history
  • Loading branch information
kfstorm authored Feb 9, 2021
1 parent f51c26b commit e0b8179
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public static void getAddressInfoAndFillConfig(RayConfig rayConfig) {
*
* @param command The command to start the process with.
*/
private static String runCommand(List<String> command) throws IOException, InterruptedException {
public static String runCommand(List<String> command) throws IOException, InterruptedException {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Starting process with command: {}", Joiner.on(" ").join(command));
}
Expand Down
57 changes: 37 additions & 20 deletions java/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,27 @@ pushd "$ROOT_DIR"
mvn -T16 checkstyle:check
popd

on_exit() {
exit_code=$?
if [ $exit_code -ne 0 ]; then
echo "Exit trap, printing ray logs"
cat /tmp/ray/session_latest/logs/*
fi
}

trap on_exit EXIT

run_testng() {
local pid
local exit_code
if "$@"; then
"$@" &
pid=$!
if wait $pid; then
exit_code=0
else
exit_code=$?
fi
# exit_code == 2 means there are skipped tests.
if [ $exit_code -ne 2 ] && [ $exit_code -ne 0 ] ; then
if [ $exit_code -gt 128 ] ; then
# Test crashed. Print the driver log for diagnosis.
cat /tmp/ray/session_latest/logs/java-core-driver-*
# Only print log files if it ran in cluster mode
if [[ ! "$*" =~ SINGLE_PROCESS ]]; then
if [ $exit_code -gt 128 ] ; then
# Test crashed. Print the driver log for diagnosis.
cat /tmp/ray/session_latest/logs/java-core-driver-*$pid*
fi
fi
find . -name "hs_err_*log" -exec cat {} +
# Only print the hs_err_pid file of TestNG process
find . -name "hs_err_pid$pid.log" -exec cat {} +
exit $exit_code
fi
}
Expand All @@ -60,11 +57,31 @@ if ! git diff --exit-code -- java src/ray/core_worker/lib/java; then
exit 1
fi

echo "Running tests under cluster mode."
# TODO(hchen): Ideally, we should use the following bazel command to run Java tests. However, if there're skipped tests,
# TestNG will exit with code 2. And bazel treats it as test failure.
# bazel test //java:all_tests --config=ci || cluster_exit_code=$?
run_testng java -cp "$ROOT_DIR"/../bazel-bin/java/all_tests_deploy.jar org.testng.TestNG -d /tmp/ray_java_test_output "$ROOT_DIR"/testng.xml
# NOTE(kfstrom): Java test troubleshooting only.
# Set MAX_ROUNDS to a big number (e.g. 1000) to run Java tests repeatedly.
# You may also want to modify java/testng.xml to run only a subset of test cases.
MAX_ROUNDS=1
if [ $MAX_ROUNDS -gt 1 ]; then
export RAY_BACKEND_LOG_LEVEL=debug
fi

round=1
while true; do
echo Starting cluster mode test round $round

echo "Running tests under cluster mode."
# TODO(hchen): Ideally, we should use the following bazel command to run Java tests. However, if there're skipped tests,
# TestNG will exit with code 2. And bazel treats it as test failure.
# bazel test //java:all_tests --config=ci || cluster_exit_code=$?
run_testng java -cp "$ROOT_DIR"/../bazel-bin/java/all_tests_deploy.jar org.testng.TestNG -d /tmp/ray_java_test_output "$ROOT_DIR"/testng.xml

echo Finished cluster mode test round $round
date
round=$((round+1))
if (( round > MAX_ROUNDS )); then
break
fi
done

echo "Running tests under single-process mode."
# bazel test //java:all_tests --jvmopt="-Dray.run-mode=SINGLE_PROCESS" --config=ci || single_exit_code=$?
Expand Down
166 changes: 149 additions & 17 deletions java/test/src/main/java/io/ray/test/TestProgressListener.java
Original file line number Diff line number Diff line change
@@ -1,27 +1,42 @@
package io.ray.test;

import com.google.common.collect.ImmutableList;
import io.ray.runtime.runner.RunManager;
import java.io.File;
import java.time.LocalDateTime;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.testng.IInvokedMethod;
import org.testng.IInvokedMethodListener;
import org.testng.ITestContext;
import org.testng.ITestListener;
import org.testng.ITestResult;
import org.testng.SkipException;

public class TestProgressListener implements IInvokedMethodListener, ITestListener {

// Travis aborts CI if no outputs for 10 minutes. So threshold needs to be smaller than 10m.
private static final long hangDetectionThresholdMillis = 5 * 60 * 1000;
private static final int TAIL_NO_OF_LINES = 500;
private Thread testMainThread;
private long testStartTimeMillis;

private String getFullTestName(ITestResult testResult) {
return testResult.getTestClass().getName() + "." + testResult.getMethod().getMethodName();
}

private void printInfo(String tag, String content) {
private void printSection(String sectionName) {
System.out.println(
"============ ["
+ LocalDateTime.now().toString()
+ "] ["
+ tag
+ "] "
+ content
+ " ============");
"============ [" + LocalDateTime.now().toString() + "] " + sectionName + " ============");
}

private void printTestStage(String tag, String content) {
printSection("[" + tag + "] " + content);
}

@Override
Expand All @@ -32,36 +47,153 @@ public void afterInvocation(IInvokedMethod method, ITestResult testResult) {}

@Override
public void onTestStart(ITestResult result) {
printInfo("TEST START", getFullTestName(result));
printTestStage("TEST START", getFullTestName(result));
testStartTimeMillis = System.currentTimeMillis();
// TODO(kfstorm): Add a timer to detect hang
if (testMainThread == null) {
testMainThread = Thread.currentThread();
Thread hangDetectionThread =
new Thread(
() -> {
try {
// If current task case has ran for more than 5 minutes.
while (System.currentTimeMillis() - testStartTimeMillis
< hangDetectionThresholdMillis) {
Thread.sleep(1000);
}
printDebugInfo(null, /*testHanged=*/ true);
} catch (InterruptedException e) {
// ignored
}
});
hangDetectionThread.setDaemon(true);
hangDetectionThread.start();
}
}

@Override
public void onTestSuccess(ITestResult result) {
printInfo("TEST SUCCESS", getFullTestName(result));
printTestStage("TEST SUCCESS", getFullTestName(result));
}

@Override
public void onTestFailure(ITestResult result) {
printInfo("TEST FAILURE", getFullTestName(result));
Throwable throwable = result.getThrowable();
if (throwable != null) {
throwable.printStackTrace();
}
printTestStage("TEST FAILURE", getFullTestName(result));
printDebugInfo(result, /*testHanged=*/ false);
}

@Override
public void onTestSkipped(ITestResult result) {
printInfo("TEST SKIPPED", getFullTestName(result));
printTestStage("TEST SKIPPED", getFullTestName(result));
printDebugInfo(result, /*testHanged=*/ false);
}

@Override
public void onTestFailedButWithinSuccessPercentage(ITestResult result) {
printInfo("TEST FAILED BUT WITHIN SUCCESS PERCENTAGE", getFullTestName(result));
printTestStage("TEST FAILED BUT WITHIN SUCCESS PERCENTAGE", getFullTestName(result));
}

@Override
public void onStart(ITestContext context) {}

@Override
public void onFinish(ITestContext context) {}

private void printDebugInfo(ITestResult result, boolean testHanged) {
boolean testFailed = false;
if (result != null) {
Throwable throwable = result.getThrowable();
if (throwable != null && !(throwable instanceof SkipException)) {
testFailed = true;
throwable.printStackTrace();
}
}
if (!testFailed && !testHanged) {
return;
}

if (testHanged) {
printSection("TEST CASE HANGED");
printSection("STACK TRACE OF TEST THREAD");
for (StackTraceElement element : testMainThread.getStackTrace()) {
System.out.println(element.toString());
}
Set<Integer> javaPids = getJavaPids();
for (Integer pid : javaPids) {
runCommandSafely(ImmutableList.of("jstack", pid.toString()));
// TODO(kfstorm): Check lldb or gdb exists rather than detecting OS type.
if (SystemUtils.IS_OS_MAC) {
runCommandSafely(
ImmutableList.of("lldb", "--batch", "-o", "bt all", "-p", pid.toString()));
} else {
runCommandSafely(
ImmutableList.of(
"sudo", "gdb", "-batch", "-ex", "thread apply all bt", "-p", pid.toString()));
}
}
}

printLogFiles();

if (testHanged) {
printSection("ABORT TEST");
System.exit(1);
}
}

private String runCommandSafely(List<String> command) {
String output;
String commandString = String.join(" ", command);
printSection(commandString);
try {
output = RunManager.runCommand(command);
System.out.println(output);
} catch (Exception e) {
System.out.println("Failed to execute command: " + commandString);
e.printStackTrace();
output = "";
}
return output;
}

private Set<Integer> getJavaPids() {
Set<Integer> javaPids = new HashSet<>();
String jpsOutput = runCommandSafely(ImmutableList.of("jps", "-v"));
try {
for (String line : StringUtils.split(jpsOutput, "\n")) {
String[] parts = StringUtils.split(line);
if (parts.length > 1 && parts[1].toLowerCase().equals("jps")) {
// Skip jps.
continue;
}
Integer pid = Integer.valueOf(parts[0]);
javaPids.add(pid);
}
} catch (Exception e) {
System.out.println("Failed to parse jps output.");
e.printStackTrace();
}

String pgrepJavaResult = runCommandSafely(ImmutableList.of("pgrep", "java"));
try {
for (String line : StringUtils.split(pgrepJavaResult, "\n")) {
Integer pid = Integer.valueOf(line);
javaPids.add(pid);
}
} catch (Exception e) {
System.out.println("Failed to parse pgrep java output.");
e.printStackTrace();
}

return javaPids;
}

private void printLogFiles() {
Collection<File> logFiles =
FileUtils.listFiles(new File("/tmp/ray/session_latest/logs"), null, false);
for (File file : logFiles) {
runCommandSafely(
ImmutableList.of("tail", "-n", String.valueOf(TAIL_NO_OF_LINES), file.getAbsolutePath()));
}
}
}
2 changes: 1 addition & 1 deletion java/testng.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE suite SYSTEM "https://testng.org/testng-1.0.dtd">
<suite name="RAY suite" verbose="2">
<suite name="RAY suite" verbose="2" configfailurepolicy="continue">
<test name = "RAY test">
<packages>
<package name = "io.ray.runtime.*" />
Expand Down
18 changes: 13 additions & 5 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,21 @@ CoreWorkerProcess::CoreWorkerProcess(const CoreWorkerOptions &options)
// RayConfig is generated in Java_io_ray_runtime_RayNativeRuntime_nativeInitialize
// for java worker or in constructor of CoreWorker for python worker.
ray::stats::Init(global_tags, options_.metrics_agent_port);

#ifndef _WIN32
// NOTE(kfstorm): std::atexit should be put at the end of `CoreWorkerProcess`
// constructor. We assume that spdlog has been initialized before this line. When the
// process is exiting, `HandleAtExit` will be invoked before destructing spdlog static
// variables. We explicitly destruct `CoreWorkerProcess` instance in the callback to
// ensure the static `CoreWorkerProcess` instance is destructed while spdlog is still
// usable. This prevents crashing (or hanging) when using `RAY_LOG` in
// `CoreWorkerProcess` destructor.
RAY_CHECK(std::atexit(CoreWorkerProcess::HandleAtExit) == 0);
#endif
}

CoreWorkerProcess::~CoreWorkerProcess() {
RAY_LOG(INFO) << "Destructing CoreWorkerProcess. pid: " << getpid();
{
// Check that all `CoreWorker` instances have been removed.
absl::ReaderMutexLock lock(&worker_map_mutex_);
RAY_CHECK(workers_.empty());
}
RAY_LOG(DEBUG) << "Stats stop in core worker.";
// Shutdown stats module if worker process exits.
ray::stats::Shutdown();
Expand All @@ -183,6 +189,8 @@ void CoreWorkerProcess::EnsureInitialized() {
<< "shutdown.";
}

void CoreWorkerProcess::HandleAtExit() { instance_.reset(); }

std::shared_ptr<CoreWorker> CoreWorkerProcess::TryGetWorker(const WorkerID &worker_id) {
if (!instance_) {
return nullptr;
Expand Down
2 changes: 2 additions & 0 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ class CoreWorkerProcess {
/// \return Void.
static void EnsureInitialized();

static void HandleAtExit();

/// Get the `CoreWorker` instance by worker ID.
///
/// \param[in] workerId The worker ID.
Expand Down

0 comments on commit e0b8179

Please sign in to comment.