Skip to content

Commit

Permalink
By default, start a number of workers equal to the number of CPUs. (r…
Browse files Browse the repository at this point in the history
…ay-project#430)

* By default, start a number of workers equal to the number of CPUs.

* Fix stress tests.
  • Loading branch information
robertnishihara authored and pcmoritz committed Apr 6, 2017
1 parent fa363a5 commit 320109a
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 33 deletions.
6 changes: 6 additions & 0 deletions doc/source/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ If there are GPUs available on the machine, you should specify this with the
By default, Ray will use ``psutil.cpu_count()`` to determine the number of CPUs,
and by default the number of GPUs will be zero.

Instead of thinking about the number of "worker" processes on each node, we
prefer to think in terms of the quantities of CPU and GPU resources on each
node and to provide the illusion of an infinite pool of workers. Tasks will be
assigned to workers based on the availability of resources so as to avoid
contention and not based on the number of available worker processes.

Connecting to an existing cluster
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
2 changes: 1 addition & 1 deletion doc/source/serialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ This can be addressed by calling ``ray.register_class(Foo)``.
import ray
ray.init(num_workers=10)
ray.init()
# Define a custom class.
class Foo(object):
Expand Down
19 changes: 10 additions & 9 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from __future__ import print_function

from collections import namedtuple, OrderedDict
import multiprocessing
import os
import psutil
import random
Expand Down Expand Up @@ -161,7 +160,7 @@ def all_processes_alive(exclude=[]):
# an exit code of None indicates that the process is still alive.
processes_alive = [p.poll() is None for p in processes]
if (not all(processes_alive) and process_type not in exclude):
print("A process of type {} has dead.".format(process_type))
print("A process of type {} has died.".format(process_type))
return False
return True

Expand Down Expand Up @@ -511,10 +510,12 @@ def start_local_scheduler(redis_address,
if num_cpus is None:
# By default, use the number of hardware execution threads for the number
# of cores.
num_cpus = multiprocessing.cpu_count()
num_cpus = psutil.cpu_count()
if num_gpus is None:
# By default, assume this node has no GPUs.
num_gpus = 0
print("Starting local scheduler with {} CPUs and {} GPUs.".format(num_cpus,
num_gpus))
local_scheduler_name, p = ray.local_scheduler.start_local_scheduler(
plasma_store_name,
plasma_manager_name,
Expand Down Expand Up @@ -693,7 +694,7 @@ def start_monitor(redis_address, node_ip_address, stdout_file=None,

def start_ray_processes(address_info=None,
node_ip_address="127.0.0.1",
num_workers=0,
num_workers=None,
num_local_schedulers=1,
worker_path=None,
cleanup=True,
Expand Down Expand Up @@ -753,6 +754,11 @@ def start_ray_processes(address_info=None,
assert len(num_cpus) == num_local_schedulers
assert len(num_gpus) == num_local_schedulers

if num_workers is not None:
workers_per_local_scheduler = num_local_schedulers * [num_workers]
else:
workers_per_local_scheduler = num_local_schedulers * [psutil.cpu_count()]

if address_info is None:
address_info = {}
address_info["node_ip_address"] = node_ip_address
Expand Down Expand Up @@ -854,11 +860,6 @@ def start_ray_processes(address_info=None,
object_store_addresses.append(object_store_address)
time.sleep(0.1)

# Determine how many workers to start for each local scheduler.
workers_per_local_scheduler = [0] * num_local_schedulers
for i in range(num_workers):
workers_per_local_scheduler[i % num_local_schedulers] += 1

# Start any local schedulers that do not yet exist.
for i in range(len(local_scheduler_socket_names), num_local_schedulers):
# Connect the local scheduler to the object store at the same index.
Expand Down
4 changes: 1 addition & 3 deletions python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,7 @@ def check_connected(worker=global_worker):
if not worker.connected:
raise RayConnectionError("This command cannot be called before Ray has "
"been started. You can start Ray with "
"'ray.init(num_workers=10)'.")
"'ray.init()'.")


def print_failed_task(task_status):
Expand Down Expand Up @@ -956,8 +956,6 @@ def _init(address_info=None,
# Use the address 127.0.0.1 in local mode.
node_ip_address = ("127.0.0.1" if node_ip_address is None
else node_ip_address)
# Use 1 worker if num_workers is not provided.
num_workers = 10 if num_workers is None else num_workers
# Use 1 local scheduler if num_local_schedulers is not provided. If
# existing local schedulers are provided, use that count as
# num_local_schedulers.
Expand Down
25 changes: 10 additions & 15 deletions scripts/start_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,17 @@
import ray.services as services

parser = argparse.ArgumentParser(
description="Parse addresses for the worker to connect to.")
description="Start the Ray processes on a node.")
parser.add_argument("--node-ip-address", required=False, type=str,
help="the IP address of the worker's node")
help="the IP address of this node")
parser.add_argument("--redis-address", required=False, type=str,
help="the address to use for connecting to Redis")
parser.add_argument("--redis-port", required=False, type=str,
help="the port to use for starting Redis")
parser.add_argument("--object-manager-port", required=False, type=int,
help="the port to use for starting the object manager")
parser.add_argument("--num-workers", default=10, required=False, type=int,
help="the number of workers to start on this node")
parser.add_argument("--num-workers", required=False, type=int,
help="the initial number of workers to start on this node")
parser.add_argument("--num-cpus", required=False, type=int,
help="the number of CPUs on this node")
parser.add_argument("--num-gpus", required=False, type=int,
Expand Down Expand Up @@ -93,10 +93,8 @@ def check_no_existing_redis_clients(node_ip_address, redis_address):
num_cpus=args.num_cpus,
num_gpus=args.num_gpus)
print(address_info)
print("\nStarted Ray with {} workers on this node. A different number of "
"workers can be set with the --num-workers flag (but you have to "
"first terminate the existing cluster). You can add additional "
"nodes to the cluster by calling\n\n"
print("\nStarted Ray on this node. You can add additional nodes to the "
"cluster by calling\n\n"
" ./scripts/start_ray.sh --redis-address {}\n\n"
"from the node you wish to add. You can connect a driver to the "
"cluster from Python by running\n\n"
Expand All @@ -105,8 +103,7 @@ def check_no_existing_redis_clients(node_ip_address, redis_address):
"If you have trouble connecting from a different machine, check "
"that your firewall is configured properly. If you wish to "
"terminate the processes that have been started, run\n\n"
" ./scripts/stop_ray.sh".format(args.num_workers,
address_info["redis_address"],
" ./scripts/stop_ray.sh".format(address_info["redis_address"],
address_info["redis_address"]))
else:
# Start Ray on a non-head node.
Expand Down Expand Up @@ -140,8 +137,6 @@ def check_no_existing_redis_clients(node_ip_address, redis_address):
num_cpus=args.num_cpus,
num_gpus=args.num_gpus)
print(address_info)
print("\nStarted {} workers on this node. A different number of workers "
"can be set with the --num-workers flag (but you have to first "
"terminate the existing cluster). If you wish to terminate the "
"processes that have been started, run\n\n"
" ./scripts/stop_ray.sh".format(args.num_workers))
print("\nStarted Ray on this node. If you wish to terminate the processes "
"that have been started, run\n\n"
" ./scripts/stop_ray.sh")
8 changes: 4 additions & 4 deletions test/array_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def testMethods(self):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random,
da.linalg]:
reload(module)
ray.init(num_workers=1)
ray.init()

# test eye
object_id = ra.eye.remote(3)
Expand Down Expand Up @@ -57,7 +57,7 @@ def testAssemble(self):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random,
da.linalg]:
reload(module)
ray.init(num_workers=1)
ray.init()

a = ra.ones.remote([da.BLOCK_SIZE, da.BLOCK_SIZE])
b = ra.zeros.remote([da.BLOCK_SIZE, da.BLOCK_SIZE])
Expand All @@ -72,8 +72,8 @@ def testMethods(self):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random,
da.linalg]:
reload(module)
ray.worker._init(start_ray_local=True, num_workers=10,
num_local_schedulers=2, num_cpus=[10, 10])
ray.worker._init(start_ray_local=True, num_local_schedulers=2,
num_cpus=[10, 10])

x = da.zeros.remote([9, 25, 51], "float")
assert_equal(ray.get(da.assemble.remote(x)), np.zeros([9, 25, 51]))
Expand Down
2 changes: 1 addition & 1 deletion test/stress_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def setUp(self):

# Start the rest of the services in the Ray cluster.
ray.worker._init(address_info=address_info, start_ray_local=True,
num_workers=self.num_local_schedulers,
num_workers=1,
num_local_schedulers=self.num_local_schedulers,
num_cpus=[1] * self.num_local_schedulers,
driver_mode=ray.SILENT_MODE)
Expand Down

0 comments on commit 320109a

Please sign in to comment.