Skip to content

Commit

Permalink
Limit number of concurrent workers started by hardware concurrency. (r…
Browse files Browse the repository at this point in the history
…ay-project#2753)

* Limit number of concurrent workers started by hardware concurrency.

* Check if std::thread::hardware_concurrency() returns 0.

* Pass in max concurrency from Python.

* Fix Java call to startRaylet.

* Fix typo

* Remove unnecessary cast.

* Fix linting.

* Cleanups on Java side.

* Comment back in actor test.

* Require maximum_startup_concurrency to be at least 1.

* Fix linting and test.

* Improve documentation.

* Fix typo.
  • Loading branch information
robertnishihara authored and raulchen committed Aug 29, 2018
1 parent 3850e3b commit 132f133
Show file tree
Hide file tree
Showing 11 changed files with 49 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -623,10 +623,14 @@ private void startRaylet(String storeName, AddressInfo info, int numWorkers,

String resourceArgument = ResourceUtil.getResourcesStringFromMap(staticResources);

int hardwareConcurrency = Runtime.getRuntime().availableProcessors();
int maximumStartupConcurrency = Math.max(1, Math.min(staticResources.get("CPU").intValue(),
hardwareConcurrency));

// The second-last arugment is the worker command for Python, not needed for Java.
String[] cmds = new String[]{filePath, rayletSocketName, storeName, ip, gcsIp,
gcsPort, "" + numWorkers, resourceArgument,
"", workerCommand};
gcsPort, String.valueOf(numWorkers), String.valueOf(maximumStartupConcurrency),
resourceArgument, "", workerCommand};

Process p = startProcess(cmds, null, RunInfo.ProcessType.PT_RAYLET,
"raylet", redisAddress, ip, redirect, cleanup);
Expand Down
6 changes: 6 additions & 0 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,11 @@ def start_raylet(redis_address,

static_resources = check_and_update_resources(resources, True)

# Limit the number of workers that can be started in parallel by the
# raylet. However, make sure it is at least 1.
maximum_startup_concurrency = max(
1, min(psutil.cpu_count(), static_resources["CPU"]))

# Format the resource argument in a form like 'CPU,1.0,GPU,0,Custom,3'.
resource_argument = ",".join([
"{},{}".format(resource_name, resource_value)
Expand Down Expand Up @@ -1035,6 +1040,7 @@ def start_raylet(redis_address,
gcs_ip_address,
gcs_port,
str(num_workers),
str(maximum_startup_concurrency),
resource_argument,
start_worker_command,
"", # Worker command for Java, not needed for Python.
Expand Down
10 changes: 6 additions & 4 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@ static std::vector<std::string> parse_worker_command(std::string worker_command)

int main(int argc, char *argv[]) {
RayLog::StartRayLog(argv[0], RAY_INFO);
RAY_CHECK(argc == 10);
RAY_CHECK(argc == 11);

const std::string raylet_socket_name = std::string(argv[1]);
const std::string store_socket_name = std::string(argv[2]);
const std::string node_ip_address = std::string(argv[3]);
const std::string redis_address = std::string(argv[4]);
int redis_port = std::stoi(argv[5]);
int num_initial_workers = std::stoi(argv[6]);
const std::string static_resource_list = std::string(argv[7]);
const std::string python_worker_command = std::string(argv[8]);
const std::string java_worker_command = std::string(argv[9]);
int maximum_startup_concurrency = std::stoi(argv[7]);
const std::string static_resource_list = std::string(argv[8]);
const std::string python_worker_command = std::string(argv[9]);
const std::string java_worker_command = std::string(argv[10]);

// Configuration for the node manager.
ray::raylet::NodeManagerConfig node_manager_config;
Expand All @@ -48,6 +49,7 @@ int main(int argc, char *argv[]) {
node_manager_config.num_initial_workers = num_initial_workers;
node_manager_config.num_workers_per_process =
RayConfig::instance().num_workers_per_process();
node_manager_config.maximum_startup_concurrency = maximum_startup_concurrency;

if (!python_worker_command.empty()) {
node_manager_config.worker_commands.emplace(
Expand Down
3 changes: 1 addition & 2 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service,
local_resources_(config.resource_config),
local_available_resources_(config.resource_config),
worker_pool_(config.num_initial_workers, config.num_workers_per_process,
static_cast<int>(config.resource_config.GetNumCpus()),
config.worker_commands),
config.maximum_startup_concurrency, config.worker_commands),
local_queues_(SchedulingQueue()),
scheduling_policy_(local_queues_),
reconstruction_policy_(
Expand Down
3 changes: 3 additions & 0 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ struct NodeManagerConfig {
ResourceSet resource_config;
int num_initial_workers;
int num_workers_per_process;
/// The maximum number of workers that can be started concurrently by a
/// worker pool.
int maximum_startup_concurrency;
/// The commands used to start the worker process, grouped by language.
std::unordered_map<Language, std::vector<std::string>> worker_commands;
uint64_t heartbeat_period_ms;
Expand Down
18 changes: 13 additions & 5 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

#include <sys/wait.h>

#include <algorithm>
#include <thread>

#include "ray/status.h"
#include "ray/util/logging.h"

Expand Down Expand Up @@ -41,10 +44,13 @@ namespace raylet {
/// A constructor that initializes a worker pool with
/// (num_worker_processes * num_workers_per_process) workers for each language.
WorkerPool::WorkerPool(
int num_worker_processes, int num_workers_per_process, int num_cpus,
int num_worker_processes, int num_workers_per_process,
int maximum_startup_concurrency,
const std::unordered_map<Language, std::vector<std::string>> &worker_commands)
: num_workers_per_process_(num_workers_per_process), num_cpus_(num_cpus) {
: num_workers_per_process_(num_workers_per_process),
maximum_startup_concurrency_(maximum_startup_concurrency) {
RAY_CHECK(num_workers_per_process > 0) << "num_workers_per_process must be positive.";
RAY_CHECK(maximum_startup_concurrency > 0);
// Ignore SIGCHLD signals. If we don't do this, then worker processes will
// become zombies instead of dying gracefully.
signal(SIGCHLD, SIG_IGN);
Expand Down Expand Up @@ -95,9 +101,11 @@ uint32_t WorkerPool::Size(const Language &language) const {
}

void WorkerPool::StartWorkerProcess(const Language &language, bool force_start) {
// The first condition makes sure that we are always starting up to
// num_cpus_ number of processes in parallel.
if (static_cast<int>(starting_worker_processes_.size()) >= num_cpus_ && !force_start) {
// If we are already starting up too many workers, then return without starting
// more.
if (static_cast<int>(starting_worker_processes_.size()) >=
maximum_startup_concurrency_ &&
!force_start) {
// Workers have been started, but not registered. Force start disabled -- returning.
RAY_LOG(DEBUG) << starting_worker_processes_.size()
<< " worker processes pending registration";
Expand Down
21 changes: 13 additions & 8 deletions src/ray/raylet/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@ class WorkerPool {
///
/// \param num_worker_processes The number of worker processes to start, per language.
/// \param num_workers_per_process The number of workers per process.
/// \param maximum_startup_concurrency The maximum number of worker processes
/// that can be started in parallel (typically this should be set to the number of CPU
/// resources on the machine). Note that this limit can be overridden in
/// StartWorkerProcess by the force_start flag.
/// \param worker_commands The commands used to start the worker process, grouped by
/// language.
WorkerPool(
int num_worker_processes, int num_workers_per_process, int num_cpus,
int num_worker_processes, int num_workers_per_process,
int maximum_startup_concurrency,
const std::unordered_map<Language, std::vector<std::string>> &worker_commands);

/// Destructor responsible for freeing a set of workers owned by this class.
Expand All @@ -42,13 +47,14 @@ class WorkerPool {
/// Asynchronously start a new worker process. Once the worker process has
/// registered with an external server, the process should create and
/// register num_workers_per_process_ workers, then add them to the pool.
/// Failure to start the worker process is a fatal error.
/// This function will start up to num_cpus many workers in parallel
/// if it is called multiple times.
/// Failure to start the worker process is a fatal error. If too many workers
/// are already being started and force_start is false, then this function
/// will return without starting any workers.
///
/// \param language Which language this worker process should be.
/// \param force_start Controls whether to force starting a worker regardless of any
/// workers that have already been started but not yet registered.
/// workers that have already been started but not yet registered. This overrides
/// the maximum_startup_concurrency_ value.
void StartWorkerProcess(const Language &language, bool force_start = false);

/// Register a new worker. The Worker should be added by the caller to the
Expand Down Expand Up @@ -137,9 +143,8 @@ class WorkerPool {
/// for a given language.
inline State &GetStateForLanguage(const Language &language);

/// The number of CPUs this Raylet has available.
int num_cpus_;

/// The maximum number of workers that can be started concurrently.
int maximum_startup_concurrency_;
/// Pool states per language.
std::unordered_map<Language, State> states_by_lang_;
};
Expand Down
2 changes: 1 addition & 1 deletion src/ray/raylet/worker_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ int NUM_WORKERS_PER_PROCESS = 3;
class WorkerPoolMock : public WorkerPool {
public:
WorkerPoolMock()
: WorkerPool(0, NUM_WORKERS_PER_PROCESS, 0,
: WorkerPool(0, NUM_WORKERS_PER_PROCESS, 1,
{{Language::PYTHON, {"dummy_py_worker_command"}},
{Language::JAVA, {"dummy_java_worker_command"}}}) {}

Expand Down
29 changes: 0 additions & 29 deletions src/ray/test/start_raylet.sh

This file was deleted.

52 changes: 0 additions & 52 deletions src/ray/test/start_raylets.sh

This file was deleted.

3 changes: 0 additions & 3 deletions test/actor_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -923,9 +923,6 @@ def get_location_and_ids(self):
ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10)
assert ready_ids == []

@unittest.skipIf(
os.environ.get("RAY_USE_XRAY") == "1",
"This test does not work with xray yet.")
def testActorMultipleGPUsFromMultipleTasks(self):
num_local_schedulers = 10
num_gpus_per_scheduler = 10
Expand Down

0 comments on commit 132f133

Please sign in to comment.