diff --git a/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java b/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java index 2307b0489d3c..192e5550ceb4 100644 --- a/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java +++ b/java/runtime/src/main/java/io/ray/runtime/runner/RunManager.java @@ -96,7 +96,7 @@ public static void getAddressInfoAndFillConfig(RayConfig rayConfig) { * * @param command The command to start the process with. */ - private static String runCommand(List command) throws IOException, InterruptedException { + public static String runCommand(List command) throws IOException, InterruptedException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Starting process with command: {}", Joiner.on(" ").join(command)); } diff --git a/java/test.sh b/java/test.sh index a842194e67fb..b49f06037c10 100755 --- a/java/test.sh +++ b/java/test.sh @@ -16,30 +16,27 @@ pushd "$ROOT_DIR" mvn -T16 checkstyle:check popd -on_exit() { - exit_code=$? - if [ $exit_code -ne 0 ]; then - echo "Exit trap, printing ray logs" - cat /tmp/ray/session_latest/logs/* - fi -} - -trap on_exit EXIT - run_testng() { + local pid local exit_code - if "$@"; then + "$@" & + pid=$! + if wait $pid; then exit_code=0 else exit_code=$? fi # exit_code == 2 means there are skipped tests. if [ $exit_code -ne 2 ] && [ $exit_code -ne 0 ] ; then - if [ $exit_code -gt 128 ] ; then - # Test crashed. Print the driver log for diagnosis. - cat /tmp/ray/session_latest/logs/java-core-driver-* + # Only print log files if it ran in cluster mode + if [[ ! "$*" =~ SINGLE_PROCESS ]]; then + if [ $exit_code -gt 128 ] ; then + # Test crashed. Print the driver log for diagnosis. + cat /tmp/ray/session_latest/logs/java-core-driver-*$pid* + fi fi - find . -name "hs_err_*log" -exec cat {} + + # Only print the hs_err_pid file of TestNG process + find . -name "hs_err_pid$pid.log" -exec cat {} + exit $exit_code fi } @@ -60,11 +57,31 @@ if ! git diff --exit-code -- java src/ray/core_worker/lib/java; then exit 1 fi -echo "Running tests under cluster mode." -# TODO(hchen): Ideally, we should use the following bazel command to run Java tests. However, if there're skipped tests, -# TestNG will exit with code 2. And bazel treats it as test failure. -# bazel test //java:all_tests --config=ci || cluster_exit_code=$? -run_testng java -cp "$ROOT_DIR"/../bazel-bin/java/all_tests_deploy.jar org.testng.TestNG -d /tmp/ray_java_test_output "$ROOT_DIR"/testng.xml +# NOTE(kfstrom): Java test troubleshooting only. +# Set MAX_ROUNDS to a big number (e.g. 1000) to run Java tests repeatedly. +# You may also want to modify java/testng.xml to run only a subset of test cases. +MAX_ROUNDS=1 +if [ $MAX_ROUNDS -gt 1 ]; then + export RAY_BACKEND_LOG_LEVEL=debug +fi + +round=1 +while true; do + echo Starting cluster mode test round $round + + echo "Running tests under cluster mode." + # TODO(hchen): Ideally, we should use the following bazel command to run Java tests. However, if there're skipped tests, + # TestNG will exit with code 2. And bazel treats it as test failure. + # bazel test //java:all_tests --config=ci || cluster_exit_code=$? + run_testng java -cp "$ROOT_DIR"/../bazel-bin/java/all_tests_deploy.jar org.testng.TestNG -d /tmp/ray_java_test_output "$ROOT_DIR"/testng.xml + + echo Finished cluster mode test round $round + date + round=$((round+1)) + if (( round > MAX_ROUNDS )); then + break + fi +done echo "Running tests under single-process mode." # bazel test //java:all_tests --jvmopt="-Dray.run-mode=SINGLE_PROCESS" --config=ci || single_exit_code=$? diff --git a/java/test/src/main/java/io/ray/test/TestProgressListener.java b/java/test/src/main/java/io/ray/test/TestProgressListener.java index 1fed5ac21375..915d82af317b 100644 --- a/java/test/src/main/java/io/ray/test/TestProgressListener.java +++ b/java/test/src/main/java/io/ray/test/TestProgressListener.java @@ -1,27 +1,42 @@ package io.ray.test; +import com.google.common.collect.ImmutableList; +import io.ray.runtime.runner.RunManager; +import java.io.File; import java.time.LocalDateTime; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.SystemUtils; import org.testng.IInvokedMethod; import org.testng.IInvokedMethodListener; import org.testng.ITestContext; import org.testng.ITestListener; import org.testng.ITestResult; +import org.testng.SkipException; public class TestProgressListener implements IInvokedMethodListener, ITestListener { + // Travis aborts CI if no outputs for 10 minutes. So threshold needs to be smaller than 10m. + private static final long hangDetectionThresholdMillis = 5 * 60 * 1000; + private static final int TAIL_NO_OF_LINES = 500; + private Thread testMainThread; + private long testStartTimeMillis; + private String getFullTestName(ITestResult testResult) { return testResult.getTestClass().getName() + "." + testResult.getMethod().getMethodName(); } - private void printInfo(String tag, String content) { + private void printSection(String sectionName) { System.out.println( - "============ [" - + LocalDateTime.now().toString() - + "] [" - + tag - + "] " - + content - + " ============"); + "============ [" + LocalDateTime.now().toString() + "] " + sectionName + " ============"); + } + + private void printTestStage(String tag, String content) { + printSection("[" + tag + "] " + content); } @Override @@ -32,31 +47,50 @@ public void afterInvocation(IInvokedMethod method, ITestResult testResult) {} @Override public void onTestStart(ITestResult result) { - printInfo("TEST START", getFullTestName(result)); + printTestStage("TEST START", getFullTestName(result)); + testStartTimeMillis = System.currentTimeMillis(); + // TODO(kfstorm): Add a timer to detect hang + if (testMainThread == null) { + testMainThread = Thread.currentThread(); + Thread hangDetectionThread = + new Thread( + () -> { + try { + // If current task case has ran for more than 5 minutes. + while (System.currentTimeMillis() - testStartTimeMillis + < hangDetectionThresholdMillis) { + Thread.sleep(1000); + } + printDebugInfo(null, /*testHanged=*/ true); + } catch (InterruptedException e) { + // ignored + } + }); + hangDetectionThread.setDaemon(true); + hangDetectionThread.start(); + } } @Override public void onTestSuccess(ITestResult result) { - printInfo("TEST SUCCESS", getFullTestName(result)); + printTestStage("TEST SUCCESS", getFullTestName(result)); } @Override public void onTestFailure(ITestResult result) { - printInfo("TEST FAILURE", getFullTestName(result)); - Throwable throwable = result.getThrowable(); - if (throwable != null) { - throwable.printStackTrace(); - } + printTestStage("TEST FAILURE", getFullTestName(result)); + printDebugInfo(result, /*testHanged=*/ false); } @Override public void onTestSkipped(ITestResult result) { - printInfo("TEST SKIPPED", getFullTestName(result)); + printTestStage("TEST SKIPPED", getFullTestName(result)); + printDebugInfo(result, /*testHanged=*/ false); } @Override public void onTestFailedButWithinSuccessPercentage(ITestResult result) { - printInfo("TEST FAILED BUT WITHIN SUCCESS PERCENTAGE", getFullTestName(result)); + printTestStage("TEST FAILED BUT WITHIN SUCCESS PERCENTAGE", getFullTestName(result)); } @Override @@ -64,4 +98,102 @@ public void onStart(ITestContext context) {} @Override public void onFinish(ITestContext context) {} + + private void printDebugInfo(ITestResult result, boolean testHanged) { + boolean testFailed = false; + if (result != null) { + Throwable throwable = result.getThrowable(); + if (throwable != null && !(throwable instanceof SkipException)) { + testFailed = true; + throwable.printStackTrace(); + } + } + if (!testFailed && !testHanged) { + return; + } + + if (testHanged) { + printSection("TEST CASE HANGED"); + printSection("STACK TRACE OF TEST THREAD"); + for (StackTraceElement element : testMainThread.getStackTrace()) { + System.out.println(element.toString()); + } + Set javaPids = getJavaPids(); + for (Integer pid : javaPids) { + runCommandSafely(ImmutableList.of("jstack", pid.toString())); + // TODO(kfstorm): Check lldb or gdb exists rather than detecting OS type. + if (SystemUtils.IS_OS_MAC) { + runCommandSafely( + ImmutableList.of("lldb", "--batch", "-o", "bt all", "-p", pid.toString())); + } else { + runCommandSafely( + ImmutableList.of( + "sudo", "gdb", "-batch", "-ex", "thread apply all bt", "-p", pid.toString())); + } + } + } + + printLogFiles(); + + if (testHanged) { + printSection("ABORT TEST"); + System.exit(1); + } + } + + private String runCommandSafely(List command) { + String output; + String commandString = String.join(" ", command); + printSection(commandString); + try { + output = RunManager.runCommand(command); + System.out.println(output); + } catch (Exception e) { + System.out.println("Failed to execute command: " + commandString); + e.printStackTrace(); + output = ""; + } + return output; + } + + private Set getJavaPids() { + Set javaPids = new HashSet<>(); + String jpsOutput = runCommandSafely(ImmutableList.of("jps", "-v")); + try { + for (String line : StringUtils.split(jpsOutput, "\n")) { + String[] parts = StringUtils.split(line); + if (parts.length > 1 && parts[1].toLowerCase().equals("jps")) { + // Skip jps. + continue; + } + Integer pid = Integer.valueOf(parts[0]); + javaPids.add(pid); + } + } catch (Exception e) { + System.out.println("Failed to parse jps output."); + e.printStackTrace(); + } + + String pgrepJavaResult = runCommandSafely(ImmutableList.of("pgrep", "java")); + try { + for (String line : StringUtils.split(pgrepJavaResult, "\n")) { + Integer pid = Integer.valueOf(line); + javaPids.add(pid); + } + } catch (Exception e) { + System.out.println("Failed to parse pgrep java output."); + e.printStackTrace(); + } + + return javaPids; + } + + private void printLogFiles() { + Collection logFiles = + FileUtils.listFiles(new File("/tmp/ray/session_latest/logs"), null, false); + for (File file : logFiles) { + runCommandSafely( + ImmutableList.of("tail", "-n", String.valueOf(TAIL_NO_OF_LINES), file.getAbsolutePath())); + } + } } diff --git a/java/testng.xml b/java/testng.xml index 6cc10b9ab24a..0db2704845d4 100644 --- a/java/testng.xml +++ b/java/testng.xml @@ -1,6 +1,6 @@ - + diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 6c8287c1507b..cf5a1f532cb9 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -161,15 +161,21 @@ CoreWorkerProcess::CoreWorkerProcess(const CoreWorkerOptions &options) // RayConfig is generated in Java_io_ray_runtime_RayNativeRuntime_nativeInitialize // for java worker or in constructor of CoreWorker for python worker. ray::stats::Init(global_tags, options_.metrics_agent_port); + +#ifndef _WIN32 + // NOTE(kfstorm): std::atexit should be put at the end of `CoreWorkerProcess` + // constructor. We assume that spdlog has been initialized before this line. When the + // process is exiting, `HandleAtExit` will be invoked before destructing spdlog static + // variables. We explicitly destruct `CoreWorkerProcess` instance in the callback to + // ensure the static `CoreWorkerProcess` instance is destructed while spdlog is still + // usable. This prevents crashing (or hanging) when using `RAY_LOG` in + // `CoreWorkerProcess` destructor. + RAY_CHECK(std::atexit(CoreWorkerProcess::HandleAtExit) == 0); +#endif } CoreWorkerProcess::~CoreWorkerProcess() { RAY_LOG(INFO) << "Destructing CoreWorkerProcess. pid: " << getpid(); - { - // Check that all `CoreWorker` instances have been removed. - absl::ReaderMutexLock lock(&worker_map_mutex_); - RAY_CHECK(workers_.empty()); - } RAY_LOG(DEBUG) << "Stats stop in core worker."; // Shutdown stats module if worker process exits. ray::stats::Shutdown(); @@ -183,6 +189,8 @@ void CoreWorkerProcess::EnsureInitialized() { << "shutdown."; } +void CoreWorkerProcess::HandleAtExit() { instance_.reset(); } + std::shared_ptr CoreWorkerProcess::TryGetWorker(const WorkerID &worker_id) { if (!instance_) { return nullptr; diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 6fa24c29e94e..72ef4f36ca7b 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -265,6 +265,8 @@ class CoreWorkerProcess { /// \return Void. static void EnsureInitialized(); + static void HandleAtExit(); + /// Get the `CoreWorker` instance by worker ID. /// /// \param[in] workerId The worker ID.