-
Notifications
You must be signed in to change notification settings - Fork 6.2k
Actor checkpointing with object lineage reconstruction #1004
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Actor checkpointing with object lineage reconstruction #1004
Conversation
Merged build finished. Test FAILed. |
Test FAILed. |
0a95d7b
to
3e332a3
Compare
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test FAILed. |
Test FAILed. |
src/common/task.cc
Outdated
@@ -217,7 +217,18 @@ ActorID TaskSpec_actor_id(TaskSpec *spec) { | |||
int64_t TaskSpec_actor_counter(TaskSpec *spec) { | |||
CHECK(spec); | |||
auto message = flatbuffers::GetRoot<TaskInfo>(spec); | |||
return message->actor_counter(); | |||
int64_t actor_counter = message->actor_counter(); | |||
if (actor_counter < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not "return std::abs(message->actor_counter());"?
@@ -42,6 +42,7 @@ typedef struct { | |||
* restrict the submission of tasks on actors to the process that created the | |||
* actor. */ | |||
int64_t task_counter; | |||
int64_t assigned_task_counter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you document this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, can you fix the small comments and rebase it?
Great, thanks! I pushed the fixes. |
68f04e5
to
dfe51d1
Compare
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
…ted after task is successful
- Return new task counter in GetTaskRequest - Update worker state for actor tasks inside of the actor method executor
d740913
to
7ad3c3a
Compare
Merged build finished. Test PASSed. |
Test PASSed. |
@@ -217,7 +217,14 @@ ActorID TaskSpec_actor_id(TaskSpec *spec) { | |||
int64_t TaskSpec_actor_counter(TaskSpec *spec) { | |||
CHECK(spec); | |||
auto message = flatbuffers::GetRoot<TaskInfo>(spec); | |||
return message->actor_counter(); | |||
return std::abs(message->actor_counter()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why std::abs
? Is this negative sometimes? And if so, what does that mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I set the counter to be negative for checkpoint tasks. It was mostly to avoid adding another field to the task spec. Let me know if you prefer to just add a field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of negative numbers, i actually think adding a bool to the TaskSpec saying whether it is a checkpoint task or not is cleaner, what do you think?
src/common/task.cc
Outdated
return std::abs(message->actor_counter()); | ||
} | ||
|
||
int64_t TaskSpec_actor_is_checkpoint_method(TaskSpec *spec) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this return a bool
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup, thanks!
python/ray/actor.py
Outdated
args = args[:-1] | ||
if method_name == "__ray_checkpoint__": | ||
# Execute the checkpoint task. NOTE(swang): Checkpoint methods | ||
# should not throw an exception. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They definitely could throw an exception, right? E.g., if they try to pickle stuff, pickling can fail. Also, if we allow user-defined checkpointing methods (which may be necessary for neural nets and things like that), the user could have a bug in the checkpointing method.
Does that have any implications for this code? Maybe we should add a test case for the case where checkpointing fails?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm what do you think the expected behavior should be if the checkpoint task does throw an exception? For both saving and resuming a checkpoint?
For me, I would probably expect saving a checkpoint to store the exception like a normal task, and then continue executing tasks on the actor. An exception while resuming a checkpoint could reconstruct the previous object, as if there were no successful checkpoints. How does that sound?
Yeah, once we figure out the expected behavior, a test case would be good. Right now a checkpoint task that throws an exception would just hang, since it would never put the dummy object.
@@ -36,6 +36,11 @@ enum MessageType:int { | |||
PutObject | |||
} | |||
|
|||
// This message is sent from a worker to a local scheduler. | |||
table GetTaskRequest { | |||
task_success: bool; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does it mean if task_success
is True versus False? And why is this necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a little messy, but it's used to notify the local scheduler of the actor's new task counter. For checkpoint tasks, the actor's task counter should only be updated if the task was successful (the checkpoint was actually saved or resumed). If the checkpoint isn't there, for example, then task_success
is set to False
and the local scheduler won't modify the actor's task counter.
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
@@ -214,6 +322,10 @@ def export_actor(actor_id, class_id, actor_method_names, num_cpus, num_gpus, | |||
|
|||
|
|||
def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): | |||
# Add one to the checkpoint interval since we will insert a mock task for | |||
# every checkpoint. | |||
checkpoint_interval += 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a big deal, but
@ray.remote(checkpoint_interval=0)
class Foo(object):
def __init__(self):
pass
f = Foo.remote()
causes an infinite loop :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, this is a little unfortunate...
python/ray/actor.py
Outdated
plasma_id.binary()) | ||
worker.local_scheduler_client.notify_unblocked() | ||
|
||
return actor_checkpoint_failed, error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I run the following,
import ray
ray.init()
@ray.remote(checkpoint_interval=1)
class Foo(object):
def __init__(self):
pass
def method(self):
pass
def __ray_save__(self):
raise Exception("failure")
f = Foo.remote()
Then the first call to
ray.get(f.method.remote())
fails with
Traceback (most recent call last):
File "/Users/rkn/Workspace/ray/python/ray/worker.py", line 728, in _process_task
*arguments)
File "/Users/rkn/Workspace/ray/python/ray/actor.py", line 151, in actor_method_executor
actor_checkpoint_failed, error = method(actor, *args)
File "/Users/rkn/Workspace/ray/python/ray/actor.py", line 472, in __ray_checkpoint__
return actor_checkpoint_failed, error
UnboundLocalError: local variable 'error' referenced before assignment
And the second call to
ray.get(f.method.remote())
hangs.
Not sure if we want to address that here.
|
||
ray.worker.cleanup() | ||
|
||
def testCheckpointException(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test hangs for me, and I see the error
Remote function __ray_checkpoint__ failed with:
Traceback (most recent call last):
File "/Users/rkn/Workspace/ray/python/ray/worker.py", line 728, in _process_task
*arguments)
File "/Users/rkn/Workspace/ray/python/ray/actor.py", line 151, in actor_method_executor
actor_checkpoint_failed, error = method(actor, *args)
File "/Users/rkn/Workspace/ray/python/ray/actor.py", line 472, in __ray_checkpoint__
return actor_checkpoint_failed, error
UnboundLocalError: local variable 'error' referenced before assignment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably a python 3 thing, I think I can fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, fixed it.
Merged build finished. Test PASSed. |
Test PASSed. |
python/ray/actor.py
Outdated
previous_object_id = previous_object_id[0] | ||
# Make sure that a previous object was given. | ||
if previous_object_id is None: | ||
return False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If this code path is ever taken, that's a bug, right (that is if previous_object_id is None
)? Or can it happen normally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, that's right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
Actor checkpointing built on top of the existing reconstruction infrastructure.
Briefly, at every task interval for a given actor, a mock task responsible for saving and resuming checkpoints is submitted. The task tries to save a checkpoint to Redis during normal execution. During reconstruction, the task tries to resume from the most recent checkpoint. If the most recent checkpoint is older, then the task requests reconstruction of the preceding tasks.