Skip to content

Actor checkpointing with object lineage reconstruction #1004

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Oct 12, 2017

Conversation

stephanie-wang
Copy link
Contributor

@stephanie-wang stephanie-wang commented Sep 22, 2017

Actor checkpointing built on top of the existing reconstruction infrastructure.

Briefly, at every task interval for a given actor, a mock task responsible for saving and resuming checkpoints is submitted. The task tries to save a checkpoint to Redis during normal execution. During reconstruction, the task tries to resume from the most recent checkpoint. If the most recent checkpoint is older, then the task requests reconstruction of the preceding tasks.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/1910/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2057/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2058/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2059/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2064/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2065/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2063/
Test FAILed.

@@ -217,7 +217,18 @@ ActorID TaskSpec_actor_id(TaskSpec *spec) {
int64_t TaskSpec_actor_counter(TaskSpec *spec) {
CHECK(spec);
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
return message->actor_counter();
int64_t actor_counter = message->actor_counter();
if (actor_counter < 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not "return std::abs(message->actor_counter());"?

@@ -42,6 +42,7 @@ typedef struct {
* restrict the submission of tasks on actors to the process that created the
* actor. */
int64_t task_counter;
int64_t assigned_task_counter;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you document this?

Copy link
Contributor

@pcmoritz pcmoritz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, can you fix the small comments and rebase it?

@stephanie-wang
Copy link
Contributor Author

Great, thanks! I pushed the fixes.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2070/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2072/
Test PASSed.

@robertnishihara
Copy link
Collaborator

#605

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2077/
Test PASSed.

- Return new task counter in GetTaskRequest
- Update worker state for actor tasks inside of the actor method
  executor
@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2088/
Test PASSed.

@@ -217,7 +217,14 @@ ActorID TaskSpec_actor_id(TaskSpec *spec) {
int64_t TaskSpec_actor_counter(TaskSpec *spec) {
CHECK(spec);
auto message = flatbuffers::GetRoot<TaskInfo>(spec);
return message->actor_counter();
return std::abs(message->actor_counter());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why std::abs? Is this negative sometimes? And if so, what does that mean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I set the counter to be negative for checkpoint tasks. It was mostly to avoid adding another field to the task spec. Let me know if you prefer to just add a field.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of negative numbers, i actually think adding a bool to the TaskSpec saying whether it is a checkpoint task or not is cleaner, what do you think?

return std::abs(message->actor_counter());
}

int64_t TaskSpec_actor_is_checkpoint_method(TaskSpec *spec) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this return a bool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, thanks!

args = args[:-1]
if method_name == "__ray_checkpoint__":
# Execute the checkpoint task. NOTE(swang): Checkpoint methods
# should not throw an exception.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They definitely could throw an exception, right? E.g., if they try to pickle stuff, pickling can fail. Also, if we allow user-defined checkpointing methods (which may be necessary for neural nets and things like that), the user could have a bug in the checkpointing method.

Does that have any implications for this code? Maybe we should add a test case for the case where checkpointing fails?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm what do you think the expected behavior should be if the checkpoint task does throw an exception? For both saving and resuming a checkpoint?

For me, I would probably expect saving a checkpoint to store the exception like a normal task, and then continue executing tasks on the actor. An exception while resuming a checkpoint could reconstruct the previous object, as if there were no successful checkpoints. How does that sound?

Yeah, once we figure out the expected behavior, a test case would be good. Right now a checkpoint task that throws an exception would just hang, since it would never put the dummy object.

@@ -36,6 +36,11 @@ enum MessageType:int {
PutObject
}

// This message is sent from a worker to a local scheduler.
table GetTaskRequest {
task_success: bool;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean if task_success is True versus False? And why is this necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little messy, but it's used to notify the local scheduler of the actor's new task counter. For checkpoint tasks, the actor's task counter should only be updated if the task was successful (the checkpoint was actually saved or resumed). If the checkpoint isn't there, for example, then task_success is set to False and the local scheduler won't modify the actor's task counter.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2095/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2096/
Test PASSed.

@@ -214,6 +322,10 @@ def export_actor(actor_id, class_id, actor_method_names, num_cpus, num_gpus,


def make_actor(cls, num_cpus, num_gpus, checkpoint_interval):
# Add one to the checkpoint interval since we will insert a mock task for
# every checkpoint.
checkpoint_interval += 1
Copy link
Collaborator

@robertnishihara robertnishihara Oct 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal, but

@ray.remote(checkpoint_interval=0)
class Foo(object):
    def __init__(self):
        pass

f = Foo.remote()

causes an infinite loop :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, this is a little unfortunate...

plasma_id.binary())
worker.local_scheduler_client.notify_unblocked()

return actor_checkpoint_failed, error
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I run the following,

import ray
ray.init()

@ray.remote(checkpoint_interval=1)
class Foo(object):
    def __init__(self):
        pass
    def method(self):
        pass
    def __ray_save__(self):
        raise Exception("failure")

f = Foo.remote()

Then the first call to

ray.get(f.method.remote())

fails with

Traceback (most recent call last):
  File "/Users/rkn/Workspace/ray/python/ray/worker.py", line 728, in _process_task
    *arguments)
  File "/Users/rkn/Workspace/ray/python/ray/actor.py", line 151, in actor_method_executor
    actor_checkpoint_failed, error = method(actor, *args)
  File "/Users/rkn/Workspace/ray/python/ray/actor.py", line 472, in __ray_checkpoint__
    return actor_checkpoint_failed, error
UnboundLocalError: local variable 'error' referenced before assignment

And the second call to

ray.get(f.method.remote())

hangs.

Not sure if we want to address that here.


ray.worker.cleanup()

def testCheckpointException(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test hangs for me, and I see the error

Remote function __ray_checkpoint__ failed with:

Traceback (most recent call last):
  File "/Users/rkn/Workspace/ray/python/ray/worker.py", line 728, in _process_task
    *arguments)
  File "/Users/rkn/Workspace/ray/python/ray/actor.py", line 151, in actor_method_executor
    actor_checkpoint_failed, error = method(actor, *args)
  File "/Users/rkn/Workspace/ray/python/ray/actor.py", line 472, in __ray_checkpoint__
    return actor_checkpoint_failed, error
UnboundLocalError: local variable 'error' referenced before assignment

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably a python 3 thing, I think I can fix it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, fixed it.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2097/
Test PASSed.

previous_object_id = previous_object_id[0]
# Make sure that a previous object was given.
if previous_object_id is None:
return False
Copy link
Collaborator

@robertnishihara robertnishihara Oct 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this code path is ever taken, that's a bug, right (that is if previous_object_id is None)? Or can it happen normally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, that's right.

Copy link
Collaborator

@robertnishihara robertnishihara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me!

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2099/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2104/
Test PASSed.

@robertnishihara robertnishihara merged commit 3764f2f into ray-project:master Oct 12, 2017
@robertnishihara robertnishihara deleted the actor-checkpoint branch October 12, 2017 16:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants