diff --git a/python/ray/tests/test_advanced_6.py b/python/ray/tests/test_advanced_6.py index cecd33290c3a..00741c74aaa6 100644 --- a/python/ray/tests/test_advanced_6.py +++ b/python/ray/tests/test_advanced_6.py @@ -219,6 +219,19 @@ def normal_task_was_reconstructed(): wait_for_condition(lambda: not psutil.pid_exists(normal_task_pid), 10) +@pytest.mark.skipif(platform.system() == "Windows", reason="Niceness is posix-only") +def test_worker_niceness(ray_start_regular): + @ray.remote + class PIDReporter: + def get(self): + return os.getpid() + + reporter = PIDReporter.remote() + worker_pid = ray.get(reporter.get.remote()) + worker_proc = psutil.Process(worker_pid) + assert worker_proc.nice() == 15, worker_proc + + if __name__ == "__main__": import pytest diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index fe70677582c3..803bdf1aa7af 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -242,6 +242,13 @@ RAY_CONFIG(uint32_t, worker_max_resource_analysis_iteration, 128); /// NOTE(swang): Linux only. RAY_CONFIG(int, worker_oom_score_adjustment, 1000) +/// Sets workers' nice value on posix systems, so that the OS prioritizes CPU for other +/// processes over worker. This makes CPU available to GCS, Raylet and user processes +/// even when workers are busy. +/// Valid value is [0, 19] (negative values require sudo permissions). +/// NOTE: Linux, Unix and MacOS only. +RAY_CONFIG(int, worker_niceness, 15) + /// Allow up to 60 seconds for connecting to Redis. RAY_CONFIG(int64_t, redis_db_connect_retries, 600) RAY_CONFIG(int64_t, redis_db_connect_wait_milliseconds, 100) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 9d9fc4c67da5..d2f51a33c220 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -14,6 +14,10 @@ #include "ray/core_worker/core_worker.h" +#ifndef _WIN32 +#include +#endif + #include #include "boost/fiber/all.hpp" @@ -516,6 +520,21 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ } }, 100); + +#ifndef _WIN32 + // Doing this last during CoreWorker initialization, so initialization logic like + // registering with Raylet can finish with higher priority. + static const bool niced = [this]() { + if (options_.worker_type != WorkerType::DRIVER) { + const auto niceness = nice(RayConfig::instance().worker_niceness()); + RAY_LOG(INFO) << "Adjusted worker niceness to " << niceness; + return true; + } + return false; + }(); + // Verify driver and worker are never mixed in the same process. + RAY_CHECK_EQ(options_.worker_type != WorkerType::DRIVER, niced); +#endif } CoreWorker::~CoreWorker() { RAY_LOG(INFO) << "Core worker is destructed"; } diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 0d8eb977a38e..44cf08836896 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -40,7 +40,7 @@ namespace { // Add this prefix because the worker setup token is just a counter which is easy to // duplicate with other ids. -static const std::string kWorkerSetupTokenPrefix = "worker_startup_token:"; +const std::string kWorkerSetupTokenPrefix = "worker_startup_token:"; // A helper function to get a worker from a list. std::shared_ptr GetWorker( @@ -61,7 +61,6 @@ bool RemoveWorker( const std::shared_ptr &worker) { return worker_pool.erase(worker) > 0; } - } // namespace namespace ray { @@ -316,7 +315,7 @@ std::tuple WorkerPool::StartWorkerProcess( // Extract pointers from the worker command to pass into execvpe. std::vector worker_command_args; - for (auto const &token : state.worker_command) { + for (const auto &token : state.worker_command) { if (token == kWorkerDynamicOptionPlaceholder) { worker_command_args.insert( worker_command_args.end(), options.begin(), options.end()); @@ -332,10 +331,9 @@ std::tuple WorkerPool::StartWorkerProcess( replaced_token.replace(node_manager_port_position, strlen(kNodeManagerPortPlaceholder), std::to_string(node_manager_port_)); - worker_command_args.push_back(replaced_token); + worker_command_args.push_back(std::move(replaced_token)); continue; } - worker_command_args.push_back(token); } @@ -364,6 +362,29 @@ std::tuple WorkerPool::StartWorkerProcess( std::to_string(worker_startup_token_counter_)); } + if (language == Language::PYTHON || language == Language::JAVA) { + if (serialized_runtime_env_context != "{}" && + !serialized_runtime_env_context.empty()) { + worker_command_args.push_back("--language=" + Language_Name(language)); + worker_command_args.push_back("--runtime-env-hash=" + + std::to_string(runtime_env_hash)); + worker_command_args.push_back("--serialized-runtime-env-context=" + + serialized_runtime_env_context); + } else if (language == Language::PYTHON && worker_command_args.size() >= 2 && + worker_command_args[1].find(kSetupWorkerFilename) != std::string::npos) { + // Check that the arg really is the path to the setup worker before erasing it, to + // prevent breaking tests that mock out the worker command args. + worker_command_args.erase(worker_command_args.begin() + 1, + worker_command_args.begin() + 2); + } else if (language == Language::JAVA) { + worker_command_args.push_back("--language=" + Language_Name(language)); + } + + if (ray_debugger_external) { + worker_command_args.push_back("--ray-debugger-external"); + } + } + ProcessEnvironment env; if (!IsIOWorkerType(worker_type)) { // We pass the job ID to worker processes via an environment variable, so we don't @@ -398,30 +419,6 @@ std::tuple WorkerPool::StartWorkerProcess( } } - if (language == Language::PYTHON || language == Language::JAVA) { - if (serialized_runtime_env_context != "{}" && - !serialized_runtime_env_context.empty()) { - worker_command_args.push_back("--language=" + Language_Name(language)); - worker_command_args.push_back("--runtime-env-hash=" + - std::to_string(runtime_env_hash)); - - worker_command_args.push_back("--serialized-runtime-env-context=" + - serialized_runtime_env_context); - } else if (language == Language::PYTHON && worker_command_args.size() >= 2 && - worker_command_args[1].find(kSetupWorkerFilename) != std::string::npos) { - // Check that the arg really is the path to the setup worker before erasing it, to - // prevent breaking tests that mock out the worker command args. - worker_command_args.erase(worker_command_args.begin() + 1, - worker_command_args.begin() + 2); - } else if (language == Language::JAVA) { - worker_command_args.push_back("--language=" + Language_Name(language)); - } - - if (ray_debugger_external) { - worker_command_args.push_back("--ray-debugger-external"); - } - } - // We use setproctitle to change python worker process title, // causing the process's /proc/PID/environ being empty. // Add `SPT_NOENV` env to prevent setproctitle breaking /proc/PID/environ.