-
Notifications
You must be signed in to change notification settings - Fork 6.2k
Actor fault tolerance using object lineage reconstruction #902
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Actor fault tolerance using object lineage reconstruction #902
Conversation
Merged build finished. Test PASSed. |
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice job! This approach is going to be a lot cleaner :)
src/common/task.cc
Outdated
@@ -96,6 +103,11 @@ class TaskBuilder { | |||
ObjectID return_id = task_compute_return_id(task_id, i); | |||
returns.push_back(to_flatbuf(fbb, return_id)); | |||
} | |||
/* For actor tasks, add a dummy output. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can probably just increase num_return_vals
by one when we construct the task in Python, right?
src/common/task.cc
Outdated
ObjectID predecessor_id = | ||
task_compute_return_id(actor_id_, actor_counter_ - 1); | ||
NextReferenceArgument(predecessor_id); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be done in Python, e.g., simply by passing the dummy object in as an argument to the task spec.
E.g., in the block
task = ray.local_scheduler.Task(
self.task_driver_id,
ray.local_scheduler.ObjectID(function_id.id()),
args_for_local_scheduler,
function_properties.num_return_vals,
self.current_task_id,
self.task_index,
actor_id,
self.actor_counters[actor_id],
[function_properties.num_cpus, function_properties.num_gpus,
function_properties.num_custom_resource])
in worker.py.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could, but I did it here so that we could create an object ID that was a function of the actor_id
and actor_counter
values. We could do it in Python too, but we would have to duplicate or expose the C code for task_compute_return_id
. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't all of the return object IDs should already be a function of the actor_id
and actor_counter
values because they are computed from the task_id
which hashes in those two values (see https://github.com/ray-project/ray/blob/master/src/common/task.cc#L57-L58)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but they are also a function of the rest of the task. I believe it's better to have it as a function of just actor_id
and actor_counter
, so that anyone can determine the object ID returned by that actor task. If want to do it just in Python, the other option would be to record the dummy objects on the caller, in addition to the current actor_counter
.
Edit: On further thought, it might make sense to record the dummy objects on the caller, so we can be more flexible with things like checkpointing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's try to make the code work regardless of how the object IDs are generated (assuming they are generated in deterministic and collision-avoiding manner). We can always bake more semantics into the IDs, but I'd prefer to avoid that unless there is a very good reason.
I agree that recording the dummy objects on the caller make sense. In fact, I thought that's what this PR was doing (but I see that it's not now).
python/ray/worker.py
Outdated
# numpy array as a hack to pin the object in the object store. | ||
# Once we allow object pinning in the store, we may use `None`. | ||
if task.actor_id().id() != NIL_ACTOR_ID: | ||
outputs = outputs + (np.zeros(1), ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment here about the ,
python/ray/worker.py
Outdated
num_returns -= 1 | ||
|
||
if num_returns == 1: | ||
outputs = (outputs, ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
purely aesthetic, but can you remove the space after the ,
?
python/ray/worker.py
Outdated
# to prevent eviction from the object store. | ||
if task.actor_id().id() != NIL_ACTOR_ID: | ||
dummy_object = self.get_object(return_object_ids[-1:])[0] | ||
self.actor_dummy_objects[task.actor_counter()] = dummy_object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes it seem like a list would be more appropriate than a dictionary.
python/ray/monitor.py
Outdated
@@ -10,6 +10,7 @@ | |||
import time | |||
|
|||
import ray | |||
from ray.worker import NIL_ACTOR_ID |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe put this after ray.services
? I'm not exactly sure what the convention is, but that seems a bit more alphabetical
if task["LocalSchedulerID"] not in self.dead_local_schedulers: | ||
continue | ||
|
||
# Remove dummy objects returned by actor tasks from any plasma |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I was going to suggest putting this in cleanup_actors
, but I guess that would require looping over the task table again, which is a bit unfortunate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's actually why I have it here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, makes sense.
python/ray/monitor.py
Outdated
# that died. | ||
assert(len(manager_ids) <= 1) | ||
for manager in manager_ids: | ||
# If the object was on a dead plasma manager, remove that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. the plasma manager will often not be dead (e.g., if the local scheduler dies it does not trigger the death of the manager). Wouldn't it make sense to go ahead and remove the object from the manager that corresponds to the dead local scheduler?
In our tests the manager will probably be dead because we kill the object store manually. But then the dummy objects are probably getting cleaned up by cleanup_object_table
and not here.
So I have a feeling that this code isn't really necessary. Either the manager dies, in which case this happens anyway in cleanup_object_table
, or the manager doesn't die, in which case this code won't do anything.
Does that sound right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block is to handle the case that you wrote where the manager doesn't die. It removes the object from any manager that had the object. For actor objects, the list should either have no managers (the object wasn't created and/or published yet), or the list should have one manager, in which case it will be the manager corresponding to the local scheduler. We have to remove the one manager so that reconstruction will go through.
This code does need to be replaced, though. Eventually, the plasma manager should be listening for a corresponding local scheduler's death, and remove the actor objects from the object table itself, rather than doing it in the monitor. Otherwise, there is a risk of a race here between the monitor removing the plasma manager and the plasma manager adding itself back (see the TODO at line 153).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. I find the comment confusing then. The comment says
# If the object was on a dead plasma manager, remove that
# location entry.
Since it's possible that the plasma manager is still running, right? Is there a better way to phrase this comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, that was a comment left over from the copy pasta :) I'll update the comment.
Merged build finished. Test PASSed. |
Test PASSed. |
1f941f0
to
9f875b6
Compare
I fixed the linting errors and attempted to fix the test failure but decided to revert my fix. The issue is/was the following. By introducing a dependence between consecutive actor tasks, we cause exceptions in previous tasks (in particular, the actor constructor) to propagate to subsequent tasks. So once one actor method fails, then all subsequent actor methods will fail. At first I thought this was a feature because it causes failures in the constructor to propagate (which is similar to how regular Python classes work), but there are two downsides.
|
python/ray/worker.py
Outdated
@@ -502,13 +511,20 @@ def submit_task(self, function_id, args, actor_id=None): | |||
# Look up the various function properties. | |||
function_properties = self.function_properties[ | |||
self.task_driver_id.id()][function_id.id()] | |||
num_return_vals = function_properties.num_return_vals |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can leave this for a subsequent PR depending on what you prefer, but I think it would be much cleaner to not have any special cases in submit_task
at all and to just have the actor call submit_task
differently. This would require the actor handle (as opposed to the worker object) to own the actor_counter
and the actor_dummy_object
, which I think makes a lot of sense.
I think we can also make it so that there are no special cases in _process_task
and that the differences are handled inside of actor_method_call
.
If you decide to defer this to a subsequent PR, can you submit an issue for this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you describe a bit more how this would be done? I'm not sure if I see how we can call submit_task
differently without changing its arguments, since it doesn't currently take a number of return values as an argument. Do you mean that we should do the increment to the number of return values when registering the function? I'm also not sure how we can have the actor add the dependency rather than the caller, since it seems like by the time the actor receives the task, the dependencies should already be decided?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose we could either increment the number of returns when registering the function or make num_return_vals
an argument to submit_task
.
And yeah, the caller needs to add the dependency. I didn't mean the actor itself would add the dependency but rather the actor "handle", which I'm using to refer to the actor object that the caller has (e.g., this is NewClass
in actor.py
). Does that make more sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that does seem much cleaner. I pushed a commit to do this. Thanks!
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
Do you have any suggestions on how to solve the exception propagation problem? I'm not very familiar with the flow. |
It currently works as follows.
We could preserve the original behavior (I think), by simply passing Lines 695 to 696 in 546ba23
Unfortunately this adds more of the special casing for actors that it would be nice to avoid. |
I decided to refactor I agree that we should try to avoid special casing for actors, but it's hard to avoid with the current structure. A better way to handle it would probably be to make an |
Merged build finished. Test PASSed. |
Test PASSed. |
sequential actor tasks
8c29c2a
to
9250b5d
Compare
Merged build finished. Test PASSed. |
Test PASSed. |
else: | ||
print("WARNING: this object has no __init__ method.") | ||
|
||
# The function actor_method_call gets called if somebody tries to call | ||
# a method on their local actor stub object. | ||
def _actor_method_call(self, attr, function_signature, *args, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this change also fixes #783.
This is the first step towards actor fault tolerance using the existing object lineage reconstruction. This PR covers actor fault tolerance in the case where a node with an actor dies. It does not cover the case where just the actor process dies, or where the local scheduler dies and the actor gets reassigned to a local scheduler on the same node. It also does not cover actor checkpointing.
This PR includes: