Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various cleanups: remove start_ray_local from ray.init, remove unused code, fix "pip install numbuf". #193

Merged
merged 7 commits into from
Jan 11, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Remove start_ray_local from ray.init and change default number of wor…
…kers to 10.
  • Loading branch information
robertnishihara committed Jan 10, 2017
commit b2ef34b9489912f2d60a30f95d7429ffdddb5d90
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ machines).
import ray
import numpy as np

# Start a scheduler, an object store, and some workers.
ray.init(start_ray_local=True, num_workers=10)
# Start Ray with some workers.
ray.init(num_workers=10)

# Define a remote function for estimating pi.
@ray.remote
Expand Down
2 changes: 1 addition & 1 deletion doc/reusable-variables.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Python wrapper for an Atari simulator.
import gym
import ray

ray.init(start_ray_local=True, num_workers=5)
ray.init(num_workers=10)

# Define a function to create the gym environment.
def env_initializer():
Expand Down
2 changes: 1 addition & 1 deletion doc/serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ This can be addressed by calling `ray.register_class(Foo)`.
```python
import ray

ray.init(start_ray_local=True, num_workers=1)
ray.init(num_workers=10)

# Define a custom class.
class Foo(object):
Expand Down
5 changes: 0 additions & 5 deletions doc/services-api.rst

This file was deleted.

36 changes: 21 additions & 15 deletions doc/tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,34 @@ To use Ray, you need to understand the following:

## Overview

Ray is a distributed extension of Python. When using Ray, several processes are
involved.

- A **scheduler**: The scheduler assigns tasks to workers. It is its own
process.
- Multiple **workers**: Workers execute tasks and store the results in object
stores. Each worker is a separate process.
- One **object store** per node: The object store enables the sharing of Python
objects between worker processes so each worker does not have to have a separate
copy.
- A **driver**: The driver is the Python process that the user controls and
which submits tasks to the scheduler. For example, if the user is running a
script or using a Python shell, then the driver is the process that runs the
script or the shell.
Ray is a Python-based distributed execution engine. It can be used on a single
machine to achieve effective multiprocessing, and it can be used on a cluster
for large computations.

When using Ray, several processes are involved.

- Multiple **worker** processes execute tasks and store results in object stores.
Each worker is a separate process.
- One **object store** per node stores immutable objects in shared memory and
allows workers to efficiently share objects on the same node with minimal
copying and deserialization.
- One **local scheduler** per node assigns tasks to workers on the same node.
- A **global scheduler** receives tasks from local schedulers and assigns them
to other local schedulers.
- A **driver** is the Python process that the user controls. For example, if the
user is running a script or using a Python shell, then the driver is the Python
process that runs the script or the shell. A driver is similar to a worker in
that it can submit tasks to its local scheduler and get objects from the object
store, but it is different in that the local scheduler will not assign tasks to
the driver to be executed.

## Starting Ray

To start Ray, start Python, and run the following commands.

```python
import ray
ray.init(start_ray_local=True, num_workers=10)
ray.init(num_workers=10)
```

That command starts a scheduler, one object store, and ten workers. Each of
Expand Down
2 changes: 1 addition & 1 deletion doc/using-ray-with-tensorflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ import tensorflow as tf
import numpy as np
import ray

ray.init(start_ray_local=True, num_workers=5)
ray.init(num_workers=5)

BATCH_SIZE = 100
NUM_BATCHES = 1
Expand Down
2 changes: 1 addition & 1 deletion examples/alexnet/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
if __name__ == "__main__":
args = parser.parse_args()

ray.init(start_ray_local=True, num_workers=10)
ray.init(num_workers=10)

# Note we do not do sess.run(tf.initialize_all_variables()) because that would
# result in a different initialization on each worker. Instead, we initialize
Expand Down
2 changes: 1 addition & 1 deletion examples/hyperopt/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
if __name__ == "__main__":
args = parser.parse_args()

ray.init(start_ray_local=True, num_workers=10)
ray.init(num_workers=10)

# The number of sets of random hyperparameters to try.
trials = args.trials
Expand Down
2 changes: 1 addition & 1 deletion examples/lbfgs/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from tensorflow.examples.tutorials.mnist import input_data

if __name__ == "__main__":
ray.init(start_ray_local=True, num_workers=10)
ray.init(num_workers=10)

# Define the dimensions of the data and of the model.
image_dimension = 784
Expand Down
2 changes: 1 addition & 1 deletion examples/rl_pong/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def compute_gradient(model):
return policy_backward(eph, epx, epdlogp, model), reward_sum

if __name__ == "__main__":
ray.init(start_ray_local=True, num_workers=10)
ray.init(num_workers=10)

# Run the reinforcement learning
running_reward = None
Expand Down
23 changes: 10 additions & 13 deletions lib/python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ def check_connected(worker=global_worker):
Exception: An exception is raised if the worker is not connected.
"""
if not worker.connected:
raise RayConnectionError("This command cannot be called before Ray has been started. You can start Ray with 'ray.init(start_ray_local=True, num_workers=1)'.")
raise RayConnectionError("This command cannot be called before Ray has been started. You can start Ray with 'ray.init(num_workers=10)'.")

def print_failed_task(task_status):
"""Print information about failed tasks.
Expand Down Expand Up @@ -724,7 +724,7 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None,
# Use the address 127.0.0.1 in local mode.
node_ip_address = "127.0.0.1" if node_ip_address is None else node_ip_address
# Use 1 worker if num_workers is not provided.
num_workers = 1 if num_workers is None else num_workers
num_workers = 10 if num_workers is None else num_workers
# Use 1 local scheduler if num_local_schedulers is not provided. If
# existing local schedulers are provided, use that count as
# num_local_schedulers.
Expand Down Expand Up @@ -770,8 +770,8 @@ def _init(address_info=None, start_ray_local=False, object_id_seed=None,
connect(driver_address_info, object_id_seed=object_id_seed, mode=driver_mode, worker=global_worker)
return address_info

def init(node_ip_address=None, redis_address=None, start_ray_local=False,
object_id_seed=None, num_workers=None, driver_mode=SCRIPT_MODE):
def init(redis_address=None, node_ip_address=None, object_id_seed=None,
num_workers=None, driver_mode=SCRIPT_MODE):
"""Either connect to an existing Ray cluster or start one and connect to it.

This method handles two cases. Either a Ray cluster already exists and we
Expand All @@ -780,18 +780,16 @@ def init(node_ip_address=None, redis_address=None, start_ray_local=False,

Args:
node_ip_address (str): The IP address of the node that we are on.
redis_address (str): The address of the Redis server to connect to. This
should only be provided if start_ray_local is False.
start_ray_local (bool): If True then this will start Redis, a global
redis_address (str): The address of the Redis server to connect to. If this
address is not provided, then this command will start Redis, a global
scheduler, a local scheduler, a plasma store, a plasma manager, and some
workers. It will also kill these processes when Python exits. If False,
this will attach to an existing Ray cluster.
workers. It will also kill these processes when Python exits.
object_id_seed (int): Used to seed the deterministic generation of object
IDs. The same value can be used across multiple runs of the same job in
order to generate the object IDs in a consistent manner. However, the same
ID should not be used for different jobs.
num_workers (int): The number of workers to start. This is only provided if
start_ray_local is True.
redis_address is not provided.
driver_mode (bool): The mode in which to start the driver. This should be
one of ray.SCRIPT_MODE, ray.PYTHON_MODE, and ray.SILENT_MODE.

Expand All @@ -806,9 +804,8 @@ def init(node_ip_address=None, redis_address=None, start_ray_local=False,
"node_ip_address": node_ip_address,
"redis_address": redis_address,
}
return _init(address_info=info,
start_ray_local=start_ray_local, num_workers=num_workers,
driver_mode=driver_mode)
return _init(address_info=info, start_ray_local=(redis_address is None),
num_workers=num_workers, driver_mode=driver_mode)

def cleanup(worker=global_worker):
"""Disconnect the driver, and terminate any processes started in init.
Expand Down
4 changes: 2 additions & 2 deletions test/array_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class RemoteArrayTest(unittest.TestCase):
def testMethods(self):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]:
reload(module)
ray.init(start_ray_local=True)
ray.init(num_workers=1)

# test eye
object_id = ra.eye.remote(3)
Expand Down Expand Up @@ -54,7 +54,7 @@ class DistributedArrayTest(unittest.TestCase):
def testAssemble(self):
for module in [ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg]:
reload(module)
ray.init(start_ray_local=True, num_workers=1)
ray.init(num_workers=1)

a = ra.ones.remote([da.BLOCK_SIZE, da.BLOCK_SIZE])
b = ra.zeros.remote([da.BLOCK_SIZE, da.BLOCK_SIZE])
Expand Down
14 changes: 7 additions & 7 deletions test/failure_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def wait_for_errors(error_type, num_errors, timeout=10):
class FailureTest(unittest.TestCase):
def testUnknownSerialization(self):
reload(test_functions)
ray.init(start_ray_local=True, num_workers=1, driver_mode=ray.SILENT_MODE)
ray.init(num_workers=1, driver_mode=ray.SILENT_MODE)

test_functions.test_unknown_type.remote()
wait_for_errors(b"TaskError", 1)
Expand All @@ -35,7 +35,7 @@ def testUnknownSerialization(self):

class TaskSerializationTest(unittest.TestCase):
def testReturnAndPassUnknownType(self):
ray.init(start_ray_local=True, num_workers=1, driver_mode=ray.SILENT_MODE)
ray.init(num_workers=1, driver_mode=ray.SILENT_MODE)

class Foo(object):
pass
Expand All @@ -57,7 +57,7 @@ def g(x):
class TaskStatusTest(unittest.TestCase):
def testFailedTask(self):
reload(test_functions)
ray.init(start_ray_local=True, num_workers=3, driver_mode=ray.SILENT_MODE)
ray.init(num_workers=3, driver_mode=ray.SILENT_MODE)

test_functions.throw_exception_fct1.remote()
test_functions.throw_exception_fct1.remote()
Expand Down Expand Up @@ -87,7 +87,7 @@ def testFailedTask(self):
ray.worker.cleanup()

def testFailImportingRemoteFunction(self):
ray.init(start_ray_local=True, num_workers=2, driver_mode=ray.SILENT_MODE)
ray.init(num_workers=2, driver_mode=ray.SILENT_MODE)

# This example is somewhat contrived. It should be successfully pickled, and
# then it should throw an exception when it is unpickled. This may depend a
Expand Down Expand Up @@ -115,7 +115,7 @@ def __call__(self):
ray.worker.cleanup()

def testFailImportingReusableVariable(self):
ray.init(start_ray_local=True, num_workers=2, driver_mode=ray.SILENT_MODE)
ray.init(num_workers=2, driver_mode=ray.SILENT_MODE)

# This will throw an exception when the reusable variable is imported on the
# workers.
Expand All @@ -131,7 +131,7 @@ def initializer():
ray.worker.cleanup()

def testFailReinitializingVariable(self):
ray.init(start_ray_local=True, num_workers=2, driver_mode=ray.SILENT_MODE)
ray.init(num_workers=2, driver_mode=ray.SILENT_MODE)

def initializer():
return 0
Expand All @@ -149,7 +149,7 @@ def use_foo():
ray.worker.cleanup()

def testFailedFunctionToRun(self):
ray.init(start_ray_local=True, num_workers=2, driver_mode=ray.SILENT_MODE)
ray.init(num_workers=2, driver_mode=ray.SILENT_MODE)

def f(worker):
if ray.worker.global_worker.mode == ray.WORKER_MODE:
Expand Down
4 changes: 2 additions & 2 deletions test/microbenchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class MicroBenchmarkTest(unittest.TestCase):

def testTiming(self):
reload(test_functions)
ray.init(start_ray_local=True, num_workers=3)
ray.init(num_workers=3)

# measure the time required to submit a remote task to the scheduler
elapsed_times = []
Expand Down Expand Up @@ -88,7 +88,7 @@ def testTiming(self):
ray.worker.cleanup()

def testCache(self):
ray.init(start_ray_local=True, num_workers=1)
ray.init(num_workers=1)

A = np.random.rand(1, 1000000)
v = np.random.rand(1000000)
Expand Down
Loading