Skip to content

Commit

Permalink
[Core] use higher niceness for workers (ray-project#24928)
Browse files Browse the repository at this point in the history
Looking at past failures of dataset_shuffle_push_based_random_shuffle_1tb and when running it on my own, I noticed that raylets are killed because GCS was not able to respond to it in time. It seems at the beginning of the run, there is a huge CPU spike which starved GCS out of CPU. With the same spirit of adjusting workers to higher OOM scores, we can give workers higher niceness so they yield CPU to GCS, Raylet and other user processes.

I ran dataset_shuffle_push_based_random_shuffle_1tb a few time which no longer sees raylet death because of GCS CPU starvation. But there are other issues making the test fail which I will continue to investigate.
  • Loading branch information
mwtian authored May 23, 2022
1 parent bcf77f3 commit 50d49a2
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 29 deletions.
13 changes: 13 additions & 0 deletions python/ray/tests/test_advanced_6.py
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,19 @@ def normal_task_was_reconstructed():
wait_for_condition(lambda: not psutil.pid_exists(normal_task_pid), 10)


@pytest.mark.skipif(platform.system() == "Windows", reason="Niceness is posix-only")
def test_worker_niceness(ray_start_regular):
@ray.remote
class PIDReporter:
def get(self):
return os.getpid()

reporter = PIDReporter.remote()
worker_pid = ray.get(reporter.get.remote())
worker_proc = psutil.Process(worker_pid)
assert worker_proc.nice() == 15, worker_proc


if __name__ == "__main__":
import pytest

Expand Down
7 changes: 7 additions & 0 deletions src/ray/common/ray_config_def.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,13 @@ RAY_CONFIG(uint32_t, worker_max_resource_analysis_iteration, 128);
/// NOTE(swang): Linux only.
RAY_CONFIG(int, worker_oom_score_adjustment, 1000)

/// Sets workers' nice value on posix systems, so that the OS prioritizes CPU for other
/// processes over worker. This makes CPU available to GCS, Raylet and user processes
/// even when workers are busy.
/// Valid value is [0, 19] (negative values require sudo permissions).
/// NOTE: Linux, Unix and MacOS only.
RAY_CONFIG(int, worker_niceness, 15)

/// Allow up to 60 seconds for connecting to Redis.
RAY_CONFIG(int64_t, redis_db_connect_retries, 600)
RAY_CONFIG(int64_t, redis_db_connect_wait_milliseconds, 100)
Expand Down
19 changes: 19 additions & 0 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@

#include "ray/core_worker/core_worker.h"

#ifndef _WIN32
#include <unistd.h>
#endif

#include <google/protobuf/util/json_util.h>

#include "boost/fiber/all.hpp"
Expand Down Expand Up @@ -516,6 +520,21 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_
}
},
100);

#ifndef _WIN32
// Doing this last during CoreWorker initialization, so initialization logic like
// registering with Raylet can finish with higher priority.
static const bool niced = [this]() {
if (options_.worker_type != WorkerType::DRIVER) {
const auto niceness = nice(RayConfig::instance().worker_niceness());
RAY_LOG(INFO) << "Adjusted worker niceness to " << niceness;
return true;
}
return false;
}();
// Verify driver and worker are never mixed in the same process.
RAY_CHECK_EQ(options_.worker_type != WorkerType::DRIVER, niced);
#endif
}

CoreWorker::~CoreWorker() { RAY_LOG(INFO) << "Core worker is destructed"; }
Expand Down
55 changes: 26 additions & 29 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace {

// Add this prefix because the worker setup token is just a counter which is easy to
// duplicate with other ids.
static const std::string kWorkerSetupTokenPrefix = "worker_startup_token:";
const std::string kWorkerSetupTokenPrefix = "worker_startup_token:";

// A helper function to get a worker from a list.
std::shared_ptr<ray::raylet::WorkerInterface> GetWorker(
Expand All @@ -61,7 +61,6 @@ bool RemoveWorker(
const std::shared_ptr<ray::raylet::WorkerInterface> &worker) {
return worker_pool.erase(worker) > 0;
}

} // namespace

namespace ray {
Expand Down Expand Up @@ -316,7 +315,7 @@ std::tuple<Process, StartupToken> WorkerPool::StartWorkerProcess(

// Extract pointers from the worker command to pass into execvpe.
std::vector<std::string> worker_command_args;
for (auto const &token : state.worker_command) {
for (const auto &token : state.worker_command) {
if (token == kWorkerDynamicOptionPlaceholder) {
worker_command_args.insert(
worker_command_args.end(), options.begin(), options.end());
Expand All @@ -332,10 +331,9 @@ std::tuple<Process, StartupToken> WorkerPool::StartWorkerProcess(
replaced_token.replace(node_manager_port_position,
strlen(kNodeManagerPortPlaceholder),
std::to_string(node_manager_port_));
worker_command_args.push_back(replaced_token);
worker_command_args.push_back(std::move(replaced_token));
continue;
}

worker_command_args.push_back(token);
}

Expand Down Expand Up @@ -364,6 +362,29 @@ std::tuple<Process, StartupToken> WorkerPool::StartWorkerProcess(
std::to_string(worker_startup_token_counter_));
}

if (language == Language::PYTHON || language == Language::JAVA) {
if (serialized_runtime_env_context != "{}" &&
!serialized_runtime_env_context.empty()) {
worker_command_args.push_back("--language=" + Language_Name(language));
worker_command_args.push_back("--runtime-env-hash=" +
std::to_string(runtime_env_hash));
worker_command_args.push_back("--serialized-runtime-env-context=" +
serialized_runtime_env_context);
} else if (language == Language::PYTHON && worker_command_args.size() >= 2 &&
worker_command_args[1].find(kSetupWorkerFilename) != std::string::npos) {
// Check that the arg really is the path to the setup worker before erasing it, to
// prevent breaking tests that mock out the worker command args.
worker_command_args.erase(worker_command_args.begin() + 1,
worker_command_args.begin() + 2);
} else if (language == Language::JAVA) {
worker_command_args.push_back("--language=" + Language_Name(language));
}

if (ray_debugger_external) {
worker_command_args.push_back("--ray-debugger-external");
}
}

ProcessEnvironment env;
if (!IsIOWorkerType(worker_type)) {
// We pass the job ID to worker processes via an environment variable, so we don't
Expand Down Expand Up @@ -398,30 +419,6 @@ std::tuple<Process, StartupToken> WorkerPool::StartWorkerProcess(
}
}

if (language == Language::PYTHON || language == Language::JAVA) {
if (serialized_runtime_env_context != "{}" &&
!serialized_runtime_env_context.empty()) {
worker_command_args.push_back("--language=" + Language_Name(language));
worker_command_args.push_back("--runtime-env-hash=" +
std::to_string(runtime_env_hash));

worker_command_args.push_back("--serialized-runtime-env-context=" +
serialized_runtime_env_context);
} else if (language == Language::PYTHON && worker_command_args.size() >= 2 &&
worker_command_args[1].find(kSetupWorkerFilename) != std::string::npos) {
// Check that the arg really is the path to the setup worker before erasing it, to
// prevent breaking tests that mock out the worker command args.
worker_command_args.erase(worker_command_args.begin() + 1,
worker_command_args.begin() + 2);
} else if (language == Language::JAVA) {
worker_command_args.push_back("--language=" + Language_Name(language));
}

if (ray_debugger_external) {
worker_command_args.push_back("--ray-debugger-external");
}
}

// We use setproctitle to change python worker process title,
// causing the process's /proc/PID/environ being empty.
// Add `SPT_NOENV` env to prevent setproctitle breaking /proc/PID/environ.
Expand Down

0 comments on commit 50d49a2

Please sign in to comment.