Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ray signal #3624

Merged
merged 24 commits into from
Feb 11, 2019
Merged

Ray signal #3624

merged 24 commits into from
Feb 11, 2019

Conversation

istoica
Copy link
Contributor

@istoica istoica commented Dec 24, 2018

What do these changes do?

This PR implements the signaling functionality in Python, and it is loosely based on this document: https://docs.google.com/document/d/1onHmFSqxY_P5s5ZNRCBWkTogreM8JRnGS9maIeguqD4/edit#heading=h.pyc3ahp7b4uz

A signal has a type, a value, and and a source. The source can be an actor or a task. Signals with the same source are ordered. More precisely, each signal has an index, which starts from 1 and increases monotonically.

There are two kinds of signals: (1) user signals and (2) system errors. User signals are sent by tasks and actors and their type is SIG_USER. Error signals are sent on behalf of the task/actor by the worker executing that task/actor. In particular, any error caught by the worker is sent as a signal. In addition, when a task finishes normally (i.e., without error) the worker sends a SIG_DONE signal.

Note: If an actor fails without any other actor or task invoking a method on the failed task, no error is generated. However, if a method is invoked on a failed actor an error is generated as a signal.

To receive a signal from a source, one needs to wait on that source using the receive() method of the SignalHandler class. receive() will get all signals from the sources is waiting on since the previous call of receive().

Related issue number

Relevant issue: #3164

@istoica istoica requested a review from pcmoritz December 24, 2018 02:20
@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10345/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10365/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10366/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10367/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10370/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10368/
Test FAILed.

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation using ray objects looks good, but I have a couple suggestions to simplify this first prototype (see my comment on the code for more information):

  1. Do not generate object IDs using a custom hashing scheme. Instead, use the existing C++ code to generate object IDs, either ComputeReturnId or ComputePutId. For actors, you can use the actor creation task ID as an argument to these functions. This will allow us to reuse the existing FT features - the backend will automatically reconstruct the actor/task that created the signal OR raise an exception for un-reconstructable actors/tasks.
  2. Consider how we can avoid depending on the GCS. It's good that it doesn't need to be on the critical path here, but in general I worry about depending on the GCS too much, especially since we don't have a timeline for GCS fault tolerance yet. It seems like it's only necessary here for "forgetting" old signals; is it necessary for the first prototype to have that? As an extreme, a process that wants to forget old signals could also just try a binary search on the signal object IDs until it finds a recent signal. Maybe inefficient, but at least it doesn't depend on any new GCS functionality.

m = hashlib.sha1()
m.update(source_id)
m.update(bytes(counter))
return ray.ObjectID(m.digest())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to break the backend because the raylet expects that all object IDs have a certain byte structure, that the first few bytes are the object's index, followed by the task ID of the task that created it. This is required for reconstruction, so that the backend can always learn which task created an object without having to consult the GCS.

Also, this means that fault tolerance will not work for signal objects. For example, if a process calls ray.wait on a signal object, and the actor that created the signal has failed, then the process is likely going to hang. This is in contrast to normal objects, where an exception will be raised that the actor has died.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stephanie-wang, thanks for the comments.

Regarding (2) we also need GCS because by default we want to get the old signals. Also, I think that we should optimize the first version for simplicity. Two more questions:

  1. Is there a simple way to get the task id? Right now I have a function _get_tesk_id_from_object_id(object_id) to do it.
  2. Do you have an example of calling ComputeReturnId or ComputePutId from Python?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, a few things about fault tolerance:

  1. The typical use case of is for the raylet/bakend to send a signal (i.e., SIG_FAULT) on the behalf of an actor/task that has failed, i.e., signal.send(signal.Signal(SIG_FAULT), source_id = a). In this case we might want to actually disable fault tolerance for the actors/tasks.
  2. If there is a user level signal, and if the actor/task sending it fails and the signal is lost, I think we are probably ok, as the backend will send a SIG_FAULT on behalf of the failed actor/task. So I don't think we need to reconstruct the lost signal. However, we do need to ignore it which the current code doesn't do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you get the old signals by just iterating through the signals until one of them times out? Then, I don't think the GCS is necessary.

  1. You should replace that method with the C++ extension for Python. It actually looks like that particular method is already implemented here.
  2. I don't think we have an example of this in the code right now, but it will be similar to the above example for getting the task ID from the object ID.

For fault tolerance: Right I agree, but that actually wouldn't be possible with the object IDs generated out-of-band, since the backend cannot associate the signal object ID with the actor/task that failed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10462/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10482/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10492/
Test FAILed.

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the changes necessary to use ComputeReturnId to compute signal object IDs, so that we can preserve lineage.

return self.sig_type

def _get_signal_object_id_from_binary(source_id, counter):
m = hashlib.sha1()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change this function so that source_id is a task ID. Implement a Python extension for ComputeReturnId, then use ComputeReturnId(task_id, counter) to compute the signal object ID.


def _get_signal_object_id(source_id, counter):
if type(source_id) is ray.actor.ActorHandle:
return _get_signal_object_id_from_binary(source_id._actor_id.id(), counter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use the actor creation task ID instead of the actor ID. This will be ComputeTaskId(source_id._ray_actor_creation_dummy_object_id). ComputeTaskId is implemented in C++ but also needs a Python extension.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, an extension for ComputeTaskId already exists: code. This is a good example for how you should implement the Python extension for ComputeReturnId.

def _get_source_dictionary(source_ids, counters):
return {source_id: _get_signal_object_id(source_id, self.signal_counter[source_id]) for source_id in source_ids}

def _get_source_key(source_id):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not used?

object_id = _get_signal_object_id(source_id, signal_counters[source_id])
ready_ids, _ = ray.wait([object_id], num_returns=1, timeout=0)
if len(ready_ids) > 0:
results.append((source_id, ray.get(object_id)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, we should check whether the return value of ray.get(object_id) is an instance of Signal. You can do that like this:

value = ray.get(object_id)
if isinstance(value, Signal):
  # Process the signal.

Copy link
Contributor

@stephanie-wang stephanie-wang Jan 12, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When there is a node failure and an actor dies, an exception will be thrown when you try to ray.get the object. You can handle it like this:

try:
  ray.get(object_id)
except:
  # The actor is dead.

Note that this will not work for non-actor tasks since the backend will try to reconstruct the task. Also, this might not work right now because of #3715.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10798/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10856/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10859/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10955/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10956/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/10965/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11021/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11020/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11028/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11030/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11032/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11034/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11628/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11670/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11672/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11696/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11695/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11703/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11724/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11725/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11734/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11743/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11748/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11751/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11752/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11758/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/11769/
Test FAILed.

@pcmoritz pcmoritz merged commit 3c32343 into ray-project:master Feb 11, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants