Skip to content

Actor fault tolerance using object lineage reconstruction #902

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

stephanie-wang
Copy link
Contributor

This is the first step towards actor fault tolerance using the existing object lineage reconstruction. This PR covers actor fault tolerance in the case where a node with an actor dies. It does not cover the case where just the actor process dies, or where the local scheduler dies and the actor gets reassigned to a local scheduler on the same node. It also does not cover actor checkpointing.

This PR includes:

  1. Revert of actor reconstruction by scanning the task table from the Python actor process.
  2. Add dummy dependencies and return values for actor tasks.
  3. Reconstruct actor tasks using existing object lineage reconstruction.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/1739/
Test PASSed.

Copy link
Collaborator

@robertnishihara robertnishihara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice job! This approach is going to be a lot cleaner :)

@@ -96,6 +103,11 @@ class TaskBuilder {
ObjectID return_id = task_compute_return_id(task_id, i);
returns.push_back(to_flatbuf(fbb, return_id));
}
/* For actor tasks, add a dummy output. */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can probably just increase num_return_vals by one when we construct the task in Python, right?

ObjectID predecessor_id =
task_compute_return_id(actor_id_, actor_counter_ - 1);
NextReferenceArgument(predecessor_id);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably be done in Python, e.g., simply by passing the dummy object in as an argument to the task spec.

E.g., in the block

            task = ray.local_scheduler.Task(
                self.task_driver_id,
                ray.local_scheduler.ObjectID(function_id.id()),
                args_for_local_scheduler,
                function_properties.num_return_vals,
                self.current_task_id,
                self.task_index,
                actor_id,
                self.actor_counters[actor_id],
                [function_properties.num_cpus, function_properties.num_gpus,
                 function_properties.num_custom_resource])

in worker.py.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but I did it here so that we could create an object ID that was a function of the actor_id and actor_counter values. We could do it in Python too, but we would have to duplicate or expose the C code for task_compute_return_id. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't all of the return object IDs should already be a function of the actor_id and actor_counter values because they are computed from the task_id which hashes in those two values (see https://github.com/ray-project/ray/blob/master/src/common/task.cc#L57-L58)?

Copy link
Contributor Author

@stephanie-wang stephanie-wang Sep 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but they are also a function of the rest of the task. I believe it's better to have it as a function of just actor_id and actor_counter, so that anyone can determine the object ID returned by that actor task. If want to do it just in Python, the other option would be to record the dummy objects on the caller, in addition to the current actor_counter.

Edit: On further thought, it might make sense to record the dummy objects on the caller, so we can be more flexible with things like checkpointing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's try to make the code work regardless of how the object IDs are generated (assuming they are generated in deterministic and collision-avoiding manner). We can always bake more semantics into the IDs, but I'd prefer to avoid that unless there is a very good reason.

I agree that recording the dummy objects on the caller make sense. In fact, I thought that's what this PR was doing (but I see that it's not now).

# numpy array as a hack to pin the object in the object store.
# Once we allow object pinning in the store, we may use `None`.
if task.actor_id().id() != NIL_ACTOR_ID:
outputs = outputs + (np.zeros(1), )
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment here about the ,

num_returns -= 1

if num_returns == 1:
outputs = (outputs, )
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

purely aesthetic, but can you remove the space after the ,?

# to prevent eviction from the object store.
if task.actor_id().id() != NIL_ACTOR_ID:
dummy_object = self.get_object(return_object_ids[-1:])[0]
self.actor_dummy_objects[task.actor_counter()] = dummy_object
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes it seem like a list would be more appropriate than a dictionary.

@@ -10,6 +10,7 @@
import time

import ray
from ray.worker import NIL_ACTOR_ID
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe put this after ray.services? I'm not exactly sure what the convention is, but that seems a bit more alphabetical

if task["LocalSchedulerID"] not in self.dead_local_schedulers:
continue

# Remove dummy objects returned by actor tasks from any plasma
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I was going to suggest putting this in cleanup_actors, but I guess that would require looping over the task table again, which is a bit unfortunate.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's actually why I have it here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, makes sense.

# that died.
assert(len(manager_ids) <= 1)
for manager in manager_ids:
# If the object was on a dead plasma manager, remove that
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. the plasma manager will often not be dead (e.g., if the local scheduler dies it does not trigger the death of the manager). Wouldn't it make sense to go ahead and remove the object from the manager that corresponds to the dead local scheduler?

In our tests the manager will probably be dead because we kill the object store manually. But then the dummy objects are probably getting cleaned up by cleanup_object_table and not here.

So I have a feeling that this code isn't really necessary. Either the manager dies, in which case this happens anyway in cleanup_object_table, or the manager doesn't die, in which case this code won't do anything.

Does that sound right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block is to handle the case that you wrote where the manager doesn't die. It removes the object from any manager that had the object. For actor objects, the list should either have no managers (the object wasn't created and/or published yet), or the list should have one manager, in which case it will be the manager corresponding to the local scheduler. We have to remove the one manager so that reconstruction will go through.

This code does need to be replaced, though. Eventually, the plasma manager should be listening for a corresponding local scheduler's death, and remove the actor objects from the object table itself, rather than doing it in the monitor. Otherwise, there is a risk of a race here between the monitor removing the plasma manager and the plasma manager adding itself back (see the TODO at line 153).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I find the comment confusing then. The comment says

# If the object was on a dead plasma manager, remove that
# location entry.

Since it's possible that the plasma manager is still running, right? Is there a better way to phrase this comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, that was a comment left over from the copy pasta :) I'll update the comment.

@robertnishihara
Copy link
Collaborator

cc @concretevitamin

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/1798/
Test PASSed.

@robertnishihara robertnishihara force-pushed the actor-lineage-reconstruction branch from 1f941f0 to 9f875b6 Compare September 10, 2017 00:54
@robertnishihara
Copy link
Collaborator

I fixed the linting errors and attempted to fix the test failure but decided to revert my fix. The issue is/was the following.

By introducing a dependence between consecutive actor tasks, we cause exceptions in previous tasks (in particular, the actor constructor) to propagate to subsequent tasks. So once one actor method fails, then all subsequent actor methods will fail.

At first I thought this was a feature because it causes failures in the constructor to propagate (which is similar to how regular Python classes work), but there are two downsides.

  1. We don't want failures in a regular (non-constructor) actor method to cause failures in subsequent actor methods.
  2. A failure in any actor method will cause the actor __ray_terminate__ method to fail, which is needed for doing some cleanup.

@@ -502,13 +511,20 @@ def submit_task(self, function_id, args, actor_id=None):
# Look up the various function properties.
function_properties = self.function_properties[
self.task_driver_id.id()][function_id.id()]
num_return_vals = function_properties.num_return_vals
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave this for a subsequent PR depending on what you prefer, but I think it would be much cleaner to not have any special cases in submit_task at all and to just have the actor call submit_task differently. This would require the actor handle (as opposed to the worker object) to own the actor_counter and the actor_dummy_object, which I think makes a lot of sense.

I think we can also make it so that there are no special cases in _process_task and that the differences are handled inside of actor_method_call.

If you decide to defer this to a subsequent PR, can you submit an issue for this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you describe a bit more how this would be done? I'm not sure if I see how we can call submit_task differently without changing its arguments, since it doesn't currently take a number of return values as an argument. Do you mean that we should do the increment to the number of return values when registering the function? I'm also not sure how we can have the actor add the dependency rather than the caller, since it seems like by the time the actor receives the task, the dependencies should already be decided?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose we could either increment the number of returns when registering the function or make num_return_vals an argument to submit_task.

And yeah, the caller needs to add the dependency. I didn't mean the actor itself would add the dependency but rather the actor "handle", which I'm using to refer to the actor object that the caller has (e.g., this is NewClass in actor.py). Does that make more sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that does seem much cleaner. I pushed a commit to do this. Thanks!

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/1807/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/1808/
Test PASSed.

@stephanie-wang
Copy link
Contributor Author

Do you have any suggestions on how to solve the exception propagation problem? I'm not very familiar with the flow.

@robertnishihara
Copy link
Collaborator

It currently works as follows.

  • If we call ray.get(x_id) and the task that created x_id threw an exception, then ray.get(x_id) will re-raise the exception.
  • If we call f.remote(x_id) and the task that created x_id threw an exception, then when the f task begins executing, it will call ray.get(x_id) under the hood which will throw an exception causing the f task to fail.

We could preserve the original behavior (I think), by simply passing args[:-1] instead of args into the line below

ray/python/ray/worker.py

Lines 695 to 696 in 546ba23

arguments = self._get_arguments_for_execution(function_name,
args)

Unfortunately this adds more of the special casing for actors that it would be nice to avoid.

@stephanie-wang
Copy link
Contributor Author

I decided to refactor _process_task to make the error handling a little cleaner, and I moved storage of the dummy output to its caller. It now always stores a valid dummy output, even if the task produced an exception. _process_task still gets all arguments, including the dummy output. I guess it's not totally necessary since the dummy objects get pinned anyway, but it's nice to have a sanity check that they are actually there when the task begins to execute.

I agree that we should try to avoid special casing for actors, but it's hard to avoid with the current structure. A better way to handle it would probably be to make an ActorWorker that is a subclass of Worker, but that would be much a larger refactor effort.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/1815/
Test PASSed.

@stephanie-wang stephanie-wang force-pushed the actor-lineage-reconstruction branch from 8c29c2a to 9250b5d Compare September 10, 2017 23:28
@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/1816/
Test PASSed.

else:
print("WARNING: this object has no __init__ method.")

# The function actor_method_call gets called if somebody tries to call
# a method on their local actor stub object.
def _actor_method_call(self, attr, function_signature, *args,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this change also fixes #783.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants