-
Notifications
You must be signed in to change notification settings - Fork 6.2k
Refactor actor task queues #1118
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor actor task queues #1118
Conversation
Merged build finished. Test FAILed. |
Test FAILed. |
@@ -1376,17 +1395,42 @@ void handle_object_removed(LocalSchedulerState *state, | |||
} | |||
} | |||
|
|||
std::vector<ActorID> empty_actor_queues; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had completely forgotten about this handle_object_removed
function. This has always felt super expensive to me, but I don't see a good alternative.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, same for me. A better way is probably to remove the mapping to dependent tasks only when the task is actually assigned. We should probably add a stress test for this behavior where there is a lot of churn in the object store.
/* If a task was assigned to this actor and there was no checkpoint | ||
* failure, then it is now loaded. */ | ||
if (entry.assigned_task_counter > -1) { | ||
entry.loaded = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it happened to rerun an old checkpoint task, then this will prevent it from running newer checkpoint tasks, right?
Is this avoided in practice by only storing the most recent checkpoint so that all earlier checkpoint tasks fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, each checkpoint task deletes earlier checkpoints.
In the future, we can optimize this a bit by having any checkpoint task try to reload the most recent checkpoint, not just the one with a matching index. We'll have to change the response to the local scheduler from actor_checkpoint_failed
to an actual integer value for the new counter though. We should think about maybe making that a separate IPC.
/** Whether or not to request a transfer of this object. This should be set | ||
* to true for all objects except for actor dummy objects, where the object | ||
* must be generated by executing the task locally. */ | ||
bool request_transfer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't look like we're ever using this field. We set it in fetch_missing_dependency
, but we never actually use the value, right?
Also, what could go wrong if we do fetch an actor dummy object?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah oops, that was my bad. It's supposed to check the field in fetch_object_timeout_handler
. I'll fix that and see if I can come up with a test case for it as well.
The actor dummy objects are only supposed to be generated by the actor itself. Basically, the scenario where it would break reconstruction is if a local scheduler dies, but the corresponding plasma manager is still reachable. Then, when reconstructing the dummy object, if we transfer the object from the surviving plasma manager, it will look like we executed that task, even though we didn't. I'll clarify the documentation for that scenario.
* checkpoint. Before the actor has loaded, we may dispatch the first task | ||
* or any checkpoint tasks. After it has loaded, we may only dispatch tasks | ||
* in order. */ | ||
bool loaded; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Won't this prevent efficient recovery from checkpoints? E.g., during recovery, why won't the actor just rerun the constructor or some early checkpoint task and then be prevented from recovering from a later checkpoint?
You could sort of solve this by only keeping around the latest checkpoint, but it could still rerun the constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I think I was a bit confused. I thought the i
th checkpoint task loaded the i
th checkpoint, but actually it loads the most recent checkpoint, is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I think it will be okay since only certain tasks will get resubmitted during recovery. Earlier checkpoint tasks will fail since their checkpoint isn't available. It is possible that the constructor could run first, but that will only happen if someone actually needs the results of a task that happened before the checkpoint. Otherwise, the task won't get resubmitted.
For the latter case, it could be good to check for a checkpoint in the constructor (similar to what you had before). I'll add a TODO.
Nope, it does load the i
th checkpoint, but only if it exists. Else, it'll set the actor_checkpoint_failed
field in the response to the local scheduler.
/* Find the first task that either matches the task counter or that is a | ||
* checkpoint method. Remove any tasks that we have already executed past | ||
* (e.g., by executing a more recent checkpoint method). */ | ||
/* Check whether we can execute the first task in the queue. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because of the dummy object dependencies, does this queue in practice only have 0 or 1 tasks in it (the other tasks being in the waiting queue)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that will often be the case, but it will also have checkpoint tasks.
Scrolling to the end of the Jenkins logs, it looks like the local scheduler died (or was marked as dead) in one of the tests. Could that be related to this PR? |
retest this please |
Merged build finished. Test FAILed. |
Test FAILed. |
d06ed91
to
c56b34f
Compare
Merged build finished. Test PASSed. |
Test PASSed. |
/* If the task is for an actor, and the missing object is a dummy object, | ||
* then we must generate it locally by executing the corresponding task. | ||
* All other objects may be requested from another plasma manager. */ | ||
if (TaskSpec_is_actor_task(task_entry_it->spec) && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me like we're still never calling fetch_missing_dependencies
on actor tasks. Or are we somewhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh my mistake, I see that it happens through queue_task_locally
.
This does a partial refactor to better unify local scheduling for actor tasks and regular tasks. Actor tasks are now added to the waiting queue if their object dependencies are not fulfilled. They get added to their corresponding actor's queue once all dependencies are local.