-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ray signal #3624
Ray signal #3624
Conversation
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation using ray objects looks good, but I have a couple suggestions to simplify this first prototype (see my comment on the code for more information):
- Do not generate object IDs using a custom hashing scheme. Instead, use the existing C++ code to generate object IDs, either
ComputeReturnId
orComputePutId
. For actors, you can use the actor creation task ID as an argument to these functions. This will allow us to reuse the existing FT features - the backend will automatically reconstruct the actor/task that created the signal OR raise an exception for un-reconstructable actors/tasks. - Consider how we can avoid depending on the GCS. It's good that it doesn't need to be on the critical path here, but in general I worry about depending on the GCS too much, especially since we don't have a timeline for GCS fault tolerance yet. It seems like it's only necessary here for "forgetting" old signals; is it necessary for the first prototype to have that? As an extreme, a process that wants to forget old signals could also just try a binary search on the signal object IDs until it finds a recent signal. Maybe inefficient, but at least it doesn't depend on any new GCS functionality.
python/ray/experimental/signal.py
Outdated
m = hashlib.sha1() | ||
m.update(source_id) | ||
m.update(bytes(counter)) | ||
return ray.ObjectID(m.digest()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to break the backend because the raylet expects that all object IDs have a certain byte structure, that the first few bytes are the object's index, followed by the task ID of the task that created it. This is required for reconstruction, so that the backend can always learn which task created an object without having to consult the GCS.
Also, this means that fault tolerance will not work for signal objects. For example, if a process calls ray.wait
on a signal object, and the actor that created the signal has failed, then the process is likely going to hang. This is in contrast to normal objects, where an exception will be raised that the actor has died.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@stephanie-wang, thanks for the comments.
Regarding (2) we also need GCS because by default we want to get the old signals. Also, I think that we should optimize the first version for simplicity. Two more questions:
- Is there a simple way to get the task id? Right now I have a function
_get_tesk_id_from_object_id(object_id)
to do it. - Do you have an example of calling
ComputeReturnId
orComputePutId
from Python?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, a few things about fault tolerance:
- The typical use case of is for the raylet/bakend to send a signal (i.e., SIG_FAULT) on the behalf of an actor/task that has failed, i.e.,
signal.send(signal.Signal(SIG_FAULT), source_id = a)
. In this case we might want to actually disable fault tolerance for the actors/tasks. - If there is a user level signal, and if the actor/task sending it fails and the signal is lost, I think we are probably ok, as the backend will send a SIG_FAULT on behalf of the failed actor/task. So I don't think we need to reconstruct the lost signal. However, we do need to ignore it which the current code doesn't do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't you get the old signals by just iterating through the signals until one of them times out? Then, I don't think the GCS is necessary.
- You should replace that method with the C++ extension for Python. It actually looks like that particular method is already implemented here.
- I don't think we have an example of this in the code right now, but it will be similar to the above example for getting the task ID from the object ID.
For fault tolerance: Right I agree, but that actually wouldn't be possible with the object IDs generated out-of-band, since the backend cannot associate the signal object ID with the actor/task that failed.
Test FAILed. |
Test PASSed. |
Test FAILed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are the changes necessary to use ComputeReturnId
to compute signal object IDs, so that we can preserve lineage.
python/ray/experimental/signal.py
Outdated
return self.sig_type | ||
|
||
def _get_signal_object_id_from_binary(source_id, counter): | ||
m = hashlib.sha1() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change this function so that source_id
is a task ID. Implement a Python extension for ComputeReturnId
, then use ComputeReturnId(task_id, counter)
to compute the signal object ID.
python/ray/experimental/signal.py
Outdated
|
||
def _get_signal_object_id(source_id, counter): | ||
if type(source_id) is ray.actor.ActorHandle: | ||
return _get_signal_object_id_from_binary(source_id._actor_id.id(), counter) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the actor creation task ID instead of the actor ID. This will be ComputeTaskId(source_id._ray_actor_creation_dummy_object_id)
. ComputeTaskId
is implemented in C++ but also needs a Python extension.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, an extension for ComputeTaskId
already exists: code. This is a good example for how you should implement the Python extension for ComputeReturnId
.
python/ray/experimental/signal.py
Outdated
def _get_source_dictionary(source_ids, counters): | ||
return {source_id: _get_signal_object_id(source_id, self.signal_counter[source_id]) for source_id in source_ids} | ||
|
||
def _get_source_key(source_id): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not used?
python/ray/experimental/signal.py
Outdated
object_id = _get_signal_object_id(source_id, signal_counters[source_id]) | ||
ready_ids, _ = ray.wait([object_id], num_returns=1, timeout=0) | ||
if len(ready_ids) > 0: | ||
results.append((source_id, ray.get(object_id))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, we should check whether the return value of ray.get(object_id)
is an instance of Signal
. You can do that like this:
value = ray.get(object_id)
if isinstance(value, Signal):
# Process the signal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When there is a node failure and an actor dies, an exception will be thrown when you try to ray.get
the object. You can handle it like this:
try:
ray.get(object_id)
except:
# The actor is dead.
Note that this will not work for non-actor tasks since the backend will try to reconstruct the task. Also, this might not work right now because of #3715.
Test PASSed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test PASSed. |
Test FAILed. |
Test PASSed. |
Test FAILed. |
Test PASSed. |
Test PASSed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test FAILed. |
Test PASSed. |
Test FAILed. |
Test FAILed. |
What do these changes do?
This PR implements the signaling functionality in Python, and it is loosely based on this document: https://docs.google.com/document/d/1onHmFSqxY_P5s5ZNRCBWkTogreM8JRnGS9maIeguqD4/edit#heading=h.pyc3ahp7b4uz
A signal has a type, a value, and and a source. The source can be an actor or a task. Signals with the same source are ordered. More precisely, each signal has an index, which starts from 1 and increases monotonically.
There are two kinds of signals: (1) user signals and (2) system errors. User signals are sent by tasks and actors and their type is
SIG_USER
. Error signals are sent on behalf of the task/actor by the worker executing that task/actor. In particular, any error caught by the worker is sent as a signal. In addition, when a task finishes normally (i.e., without error) the worker sends aSIG_DONE
signal.Note: If an actor fails without any other actor or task invoking a method on the failed task, no error is generated. However, if a method is invoked on a failed actor an error is generated as a signal.
To receive a signal from a source, one needs to wait on that source using the
receive()
method of theSignalHandler
class.receive()
will get all signals from the sources is waiting on since the previous call ofreceive()
.Related issue number
Relevant issue: #3164