Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Export remote functions when first used and also fix bug in which rem… #4844

Merged
merged 4 commits into from
May 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 0 additions & 26 deletions doc/source/internals-overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -66,32 +66,6 @@ listens for the addition of remote functions to the centralized control state.
When a new remote function is added, the thread fetches the pickled remote
function, unpickles it, and can then execute that function.

Notes and limitations
~~~~~~~~~~~~~~~~~~~~~

- Because we export remote functions as soon as they are defined, that means
that remote functions can't close over variables that are defined after the
remote function is defined. For example, the following code gives an error.

.. code-block:: python

@ray.remote
def f(x):
return helper(x)

def helper(x):
return x + 1

If you call ``f.remote(0)``, it will give an error of the form.

.. code-block:: python

Traceback (most recent call last):
File "<ipython-input-3-12a5beeb2306>", line 3, in f
NameError: name 'helper' is not defined

On the other hand, if ``helper`` is defined before ``f``, then it will work.

Calling a remote function
-------------------------

Expand Down
18 changes: 11 additions & 7 deletions python/ray/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,12 @@ class ActorClass(object):
task.
_resources: The default resources required by the actor creation task.
_actor_method_cpus: The number of CPUs required by actor method tasks.
_last_export_session: The index of the last session in which the remote
function was exported. This is used to determine if we need to
export the remote function again.
_last_driver_id_exported_for: The ID of the driver ID of the last Ray
session during which this actor class definition was exported. This
is an imperfect mechanism used to determine if we need to export
the remote function again. It is imperfect in the sense that the
actor class definition could be exported multiple times by
different workers.
_actor_methods: The actor methods.
_method_decorators: Optional decorators that should be applied to the
method invocation function before invoking the actor methods. These
Expand All @@ -209,7 +212,7 @@ def __init__(self, modified_class, class_id, max_reconstructions, num_cpus,
self._num_cpus = num_cpus
self._num_gpus = num_gpus
self._resources = resources
self._last_export_session = None
self._last_driver_id_exported_for = None

self._actor_methods = inspect.getmembers(
self._modified_class, ray.utils.is_function_or_method)
Expand Down Expand Up @@ -342,12 +345,13 @@ def _remote(self,
*copy.deepcopy(args), **copy.deepcopy(kwargs))
else:
# Export the actor.
if (self._last_export_session is None
or self._last_export_session < worker._session_index):
if (self._last_driver_id_exported_for is None
or self._last_driver_id_exported_for !=
worker.task_driver_id):
# If this actor class was exported in a previous session, we
# need to export this function again, because current GCS
# doesn't have it.
self._last_export_session = worker._session_index
self._last_driver_id_exported_for = worker.task_driver_id
worker.function_actor_manager.export_actor_class(
self._modified_class, self._actor_method_names)

Expand Down
2 changes: 1 addition & 1 deletion python/ray/function_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ def export(self, remote_function):
# and export it later.
self._functions_to_export.append(remote_function)
return
if self._worker.mode != ray.worker.SCRIPT_MODE:
if self._worker.mode == ray.worker.LOCAL_MODE:
# Don't need to export if the worker is not a driver.
return
self._do_export(remote_function)
Expand Down
19 changes: 10 additions & 9 deletions python/ray/remote_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ class RemoteFunction(object):
return the resulting ObjectIDs. For an example, see
"test_decorated_function" in "python/ray/tests/test_basic.py".
_function_signature: The function signature.
_last_export_session: The index of the last session in which the remote
function was exported. This is used to determine if we need to
export the remote function again.
_last_driver_id_exported_for: The ID of the driver ID of the last Ray
session during which this remote function definition was exported.
This is an imperfect mechanism used to determine if we need to
export the remote function again. It is imperfect in the sense that
the actor class definition could be exported multiple times by
different workers.
"""

def __init__(self, function, num_cpus, num_gpus, resources,
Expand All @@ -69,10 +72,7 @@ def __init__(self, function, num_cpus, num_gpus, resources,
self._function_signature = ray.signature.extract_signature(
self._function)

# Export the function.
worker = ray.worker.get_global_worker()
self._last_export_session = worker._session_index
worker.function_actor_manager.export(self)
self._last_driver_id_exported_for = None

def __call__(self, *args, **kwargs):
raise Exception("Remote functions cannot be called directly. Instead "
Expand Down Expand Up @@ -111,10 +111,11 @@ def _remote(self,
worker = ray.worker.get_global_worker()
worker.check_connected()

if self._last_export_session < worker._session_index:
if (self._last_driver_id_exported_for is None
or self._last_driver_id_exported_for != worker.task_driver_id):
# If this function was exported in a previous session, we need to
# export this function again, because current GCS doesn't have it.
self._last_export_session = worker._session_index
self._last_driver_id_exported_for = worker.task_driver_id
worker.function_actor_manager.export(self)

kwargs = {} if kwargs is None else kwargs
Expand Down
31 changes: 31 additions & 0 deletions python/ray/tests/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,23 @@ def f(x):
assert_equal(obj, ray.get(ray.put(obj)))


def test_nested_functions(ray_start_regular):
# Make sure that remote functions can use other values that are defined
# after the remote function but before the first function invocation.
@ray.remote
def f():
return g(), ray.get(h.remote())

def g():
return 1

@ray.remote
def h():
return 2

assert ray.get(f.remote()) == (1, 2)


def test_ray_recursive_objects(ray_start_regular):
class ClassA(object):
pass
Expand Down Expand Up @@ -2968,3 +2985,17 @@ def method(self):
ray.get(f.remote())
a = Actor.remote()
ray.get(a.method.remote())

ray.shutdown()

# Start Ray again and make sure that these definitions can be exported from
# workers.
ray.init(num_cpus=2)

@ray.remote
def export_definitions_from_worker(remote_function, actor_class):
ray.get(remote_function.remote())
actor_handle = actor_class.remote()
ray.get(actor_handle.method.remote())

ray.get(export_definitions_from_worker.remote(f, Actor))
13 changes: 12 additions & 1 deletion python/ray/tests/test_failure.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,15 @@ def temporary_helper_function():
# fail when it is unpickled.
@ray.remote
def g():
return module.temporary_python_file()
try:
module.temporary_python_file()
except Exception:
# This test is not concerned with the error from running this
# function. Only from unpickling the remote function.
pass

# Invoke the function so that the definition is exported.
g.remote()

wait_for_errors(ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR, 2)
errors = relevant_errors(ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR)
Expand Down Expand Up @@ -499,6 +507,9 @@ def test_export_large_objects(ray_start_regular):
def f():
large_object

# Invoke the function so that the definition is exported.
f.remote()

# Make sure that a warning is generated.
wait_for_errors(ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, 1)

Expand Down
9 changes: 1 addition & 8 deletions python/ray/tests/test_monitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,6 @@ def Driver(success):
# Two new objects.
ray.get(ray.put(1111))
ray.get(ray.put(1111))
attempts = 0
while (2, 1, summary_start[2]) != StateSummary():
time.sleep(0.1)
attempts += 1
if attempts == max_attempts_before_failing:
success.value = False
break

@ray.remote
def f():
Expand All @@ -61,7 +54,7 @@ def f():

# 1 new function.
attempts = 0
while (2, 1, summary_start[2] + 1) != StateSummary():
while (2, 1, summary_start[2]) != StateSummary():
time.sleep(0.1)
attempts += 1
if attempts == max_attempts_before_failing:
Expand Down