Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow manually writing to return ObjectIDs from tasks/actor methods #3805

Merged
merged 16 commits into from
Mar 19, 2019

Conversation

pschafhalter
Copy link
Contributor

@pschafhalter pschafhalter commented Jan 18, 2019

What do these changes do?

Allows manually writing to return ObjectIDs from tasks and actor methods.

Implements the ray.worker.Worker.return_object_ids property which provides the list of ObjectIDs returned by the current task/actor method.

Also adds the ray.worker.RayNoReturn class which when returned signals not to store the return value in the object store .

This works for tasks:

In [3]: @ray.remote
   ...: def f():
   ...:     oid = ray.worker.global_worker.return_object_ids[0]
   ...:     ray.worker.global_worker.put_object(oid, 123)
   ...:     return ray.worker.RayNoReturn()
   ...: 
   ...: 

In [4]: ray.get(f.remote())
Out[4]: 123

As well as actors:

In [3]: @ray.remote
   ...: class Foo:
   ...:     def bar(self):
   ...:         oid = ray.worker.global_worker.return_object_ids[0]
   ...:         ray.worker.global_worker.put_object(oid, 123)
   ...:         return ray.worker.RayNoReturn
   ...: 

In [4]: f = Foo.remote()

In [5]: ray.get(f.bar.remote())
Out[5]: 123

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10932/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10942/
Test PASSed.

@zhijunfu
Copy link
Contributor

Personally I think adding a test in this case would be helpful. If it's not intended for general purpose usage, how about adding it to ray.internal ?

@pschafhalter
Copy link
Contributor Author

Adding the API to ray.internal or ray.experimental might be good, but will require modifying worker.py either way.

@raulchen
Copy link
Contributor

  • Instead of providing a return_object_ids property, I think it might be more useful to provide a current_task property, then users can obtain the return ids as well as other attributes.
  • ray.worker.RayNoReturn can already be done by using @ray.remote(num_return_vals=0)

@pschafhalter
Copy link
Contributor Author

@raulchen Adding a current_task property could be interesting, but I'm not sure the naming is great since this should work with actor methods in addition to tasks. Do you have any additional attributes in mind that would be useful?

Using the decorator @ray.remote(num_return_vals=0) is different because the task caller should still get ObjectIDs in the case of this PR. ray.worker.RayNoReturn signals to Ray that instead of using the return values of the function, the user manually sets the Python objects corresponding to the return ObjectIDs (like in the examples).

@robertnishihara
Copy link
Collaborator

robertnishihara commented Jan 24, 2019

I agree that we should just make the task object accessible (through implementation details and not an API for the time being) and get rid of return_object_ids. @raulchen is task_context the right place to store the task object? Then we can write functions like

@ray.remote
def f():
    return_ids = ray.worker.global_worker.task_context.task.returns()

Note that this should allow us to remove a bunch of fields like worker.task_driver_id and task_context.current_task_id.

This is not a real API yet, it's just an experimental hack to try things out.

@raulchen Note that something like ray.worker.RayNoReturn() is necessary. We still want the function to have a return value, but we want it to be returned not by the return statement at the end, but by a call to ray.put. We also need it in order to do things like

@ray.remote(num_return_vals=3)
def f():
    return_ids = ray.worker.global_worker.task_context.task.returns()
    ray.worker.global_worker.put_object(return_ids[1], 222)
    return 1, ray.worker.RayNoReturn(), 3

@pschafhalter I think ray.worker.RayNoReturn() is ok for now, but let's be clear that this is not part of the API and is not considered exposed for the time being. And if it does get exposed, it will be renamed.

Minor comment: task is acceptable because both remote function invocations and actor method invocations are considered tasks despite the fact that we sometimes refer to "tasks" and "actors".

@robertnishihara
Copy link
Collaborator

robertnishihara commented Jan 24, 2019

@pschafhalter could you add some tests including the following cases.

  1. A single return value.
  2. Multiple return values where all possible subsets are returned through ray.put and the complement is returned regularly.
  3. There's going to be some weird interaction in the case where the task raises an exception after calling ray.put on one of the return values. Please make sure to write tests for this case. (Because when the task raises an exception, it will try to store exception objects in object IDs that have already been filled (it won't know about the RayNoReturn).

What should the behavior be in the following cases? Please also add tests for these.

  1. The case where we return something with ray.put but forget to use ray.RayNoReturn
  2. The case where we use ray.RayNoReturn but forget to call ray.put.

@raulchen
Copy link
Contributor

is task_context the right place to store the task object?

@robertnishihara I don't think so. Because task_context is supposed to store thread-specific info about the current task. For the main thread, it's the same as the "actual current task". But for other threads, we only have a pseudo task id, no real task object. Thus, I prefer using a separate worker.current_task property (and probably only allow the main thread to access it).

Regarding RayNoReturn, I'm afraid I still didn't get the point of replacing return with put. Is the purpose to put part of the return objects earlier, in case the whole function is slow? If so, we can also pass in an object id, and use worker.put_object(id, value) to save the object earlier.

@robertnishihara
Copy link
Collaborator

robertnishihara commented Jan 27, 2019

@raulchen, returning things earlier could be one reason, however, there are some situations where it isn't possible to return the object from the function. For example, suppose that I have an actor method that runs a TensorFlow graph, and the graph takes an object ID as input and actually stores its output directly in the plasma store using the object ID (we do this in the SGD implementation).

What you're proposing could certainly work during regular execution, but if we lose the object, and somebody calls ray.get to retrieve the object, we need to be able to figure out which task created that object and if the object was a return value of a specific task then that is much easier.

@raulchen
Copy link
Contributor

@robertnishihara thanks, that makes sense.

@robertnishihara
Copy link
Collaborator

Let's do the following

  • Add a test.
  • RayNoReturn should be under experimental, also perhaps NoReturn.
  • Expose _current_task instead of return_object_ids.
  • Give a good error message if it's misused (e.g., they try to instantiate ray.experimental.NoReturn.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-Perf-Integration-PRB/19/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-Perf-Integration-PRB/30/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-Perf-Integration-PRB/31/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/12758/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/12760/
Test FAILed.

Copy link
Collaborator

@robertnishihara robertnishihara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pschafhalter nice work! This looks pretty good :) I left some comments.

Note that I removed the pytest timing stuff because I think it's important to run the tests even if pytest timing is not installed. Also, any test in the codebase could hang if there are bugs, so that isn't unique to these tests.

@@ -790,7 +792,6 @@ def _store_outputs_in_object_store(self, object_ids, outputs):
if isinstance(outputs[i], ray.actor.ActorHandle):
raise Exception("Returning an actor handle from a remote "
"function is not allowed).")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about having an

if outputs[i] is ray.experimental.NoReturn:
    if not self.plasma_client.contains(object_ids[i]):
        raise Exception("Attempting to return 'ray.experimental.NoReturn' from a remote function, but the corresponding ObjectID does not exist in the local object store.")

not actually Exception, something more specific should be used.

The only issue I can think of is if the task is a long-running task and the object ends up getting evicted before the task returns..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up using a ValueError for this. Do you think that's specific enough?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to RuntimeError because ValueError seems to be intended for the case where an argument to a function has the right type but the wrong value.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-Perf-Integration-PRB/97/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/12901/
Test FAILed.

@pschafhalter
Copy link
Contributor Author

@robertnishihara thanks for the feedback! It should be addressed in the latest commit.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-Perf-Integration-PRB/115/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/12927/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-Perf-Integration-PRB/153/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-Perf-Integration-PRB/154/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/12989/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/12990/
Test PASSed.

@robertnishihara robertnishihara merged commit c93eb12 into ray-project:master Mar 19, 2019
@pschafhalter pschafhalter deleted the set-return-oid branch March 19, 2019 22:06
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants