-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Start and clean up workers from the local scheduler. #250
Start and clean up workers from the local scheduler. #250
Conversation
time.sleep(0) | ||
|
||
# Allow the process one second to exit gracefully. | ||
p.terminate() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why reverse the order of kill
and terminate
? Doesn't it make sense to start with kill
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
kill
is stronger than terminate
. I'm doing terminate
first to give the local scheduler a chance to catch the signal and clean up state. Previously it wasn't really an issue because it just needed to free memory that would get cleaned up on exit anyway, but now there are worker processes to clean up.
python/ray/worker.py
Outdated
@@ -1523,6 +1523,7 @@ def process_task(task): # wrapping these lines in a function should cause the lo | |||
"function_name": function_name}) | |||
|
|||
check_main_thread() | |||
worker.photon_client.register_pid() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is more appropriate for this to be part of photon_connect
. I needed something similar for the actor PR (to pass an "actor ID" from the worker to the local scheduler), so we should probably use the same mechanism.
I did it like this 5b22750#diff-2a906576526ef8fb8a2de97b6e0bc3ceR7
What do you think about doing it that way? (Your PR will be ready first)
NOTE: the local scheduler should never terminate a driver, so if we do it in photon_connect
, we will need the option of passing -1
or something to indicate that it is a driver. (but this will be useful anyway because the local scheduler really ought to know which clients are drivers (for computing resource capacity).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, the purpose of this should be documented (e.g., "We pass the process ID to the local scheduler so that the local scheduler can terminate this worker if it wants to.")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, makes sense to do it as part of photon_connect
.
The local scheduler won't ever terminate the driver. It only terminates child processes. If you look at the handler for REGISTER_PID
messages, it matches up worker clients with child processes according to pid.
src/photon/photon_scheduler.c
Outdated
void kill_process(pid_t pid) { | ||
/* Kill the worker violently. */ | ||
kill(pid, SIGKILL); | ||
waitpid(pid, NULL, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a little nervous that in pathological situations this could cause the local scheduler to hang forever.
Even during normal usage, do we really want to local scheduler to block here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'm not sure about this either...ideally we would mark the worker as dead, and then clean up the rest of the state after receiving some signal that it died. I can try to look into that.
@@ -50,7 +44,7 @@ typedef struct scheduling_algorithm_state scheduling_algorithm_state; | |||
* scheduler. */ | |||
typedef struct { | |||
/** The script to use when starting a new worker. */ | |||
char *start_worker_command; | |||
const char **start_worker_command; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why split the command into different strings? Is this so you can use fork
instead of popen
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's for execvp
.
src/photon/photon_scheduler.c
Outdated
/* We can't start a worker if we don't have the path to the worker script. */ | ||
CHECK(state->config.start_worker_command != NULL); | ||
/* Launch the process to create the worker. */ | ||
pid_t pid = fork(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why use fork
instead of popen
? With fork
the new worker process will have some leftover state from the local scheduler, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It calls execvp
soon after, which if successful, completely replaces the process image.
popen
didn't seem necessary in this case, since we're already communicating with the process through sockets. Also, I needed the PID to be returned to the local scheduler so that it could later figure out if a worker process was a child or not.
src/photon/photon_scheduler.c
Outdated
@@ -46,7 +67,7 @@ void kill_worker(local_scheduler_client *worker, bool wait) { | |||
/* If the worker has registered a process ID with us, use to send a kill | |||
* signal. Else, the worker will exit since it won't be able to connect. */ | |||
if (worker->pid != 0) { | |||
kill_process(worker->pid); | |||
kill(pid, SIGKILL); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is pid
defined? Doesn't this need to be worker->pid
?
python/ray/services.py
Outdated
num_workers_per_local_scheduler[i] += 1 | ||
num_workers -= 1 | ||
i += 1 | ||
i %= num_local_schedulers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor, but I would rewrite lines 495-500 as
for i in range(num_local_schedulers):
num_workers_per_local_scheduler[i % num_local_schedulers] += 1
That's what it's doing, right?
@@ -345,6 +509,8 @@ void new_client_connection(event_loop *loop, | |||
local_scheduler_client *worker = malloc(sizeof(local_scheduler_client)); | |||
worker->sock = new_socket; | |||
worker->task_in_progress = NULL; | |||
worker->pid = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can a normal PID equal 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not any worker PID. PID 0 is reserved by the kernel.
src/photon/photon.h
Outdated
@@ -84,6 +81,10 @@ typedef struct { | |||
* no task is running on the worker, this will be NULL. This is used to | |||
* update the task table. */ | |||
task *task_in_progress; | |||
/** The process ID of the client. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe document that if this is 0, that means that the client has not connected to the local scheduler yet.
@@ -500,6 +669,13 @@ int main(int argc, char *argv[]) { | |||
if (!node_ip_address) { | |||
LOG_FATAL("please specify the node IP address with -h switch"); | |||
} | |||
int num_workers = 0; | |||
if (num_workers_str) { | |||
num_workers = strtol(num_workers_str, NULL, 10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should check that strtol
succeeded and that num_workers_str
was not malformed
434b11e
to
391dc29
Compare
Ability to kill workers in photon scheduler Test for old method of starting workers Common codepath for killing workers Common codepath for killing workers Photon test case for starting and killing workers fix build Fix component failure test Register a worker's pid as part of initial connection Address comments and revert photon_connect Set PATH during travis install Fix
eda4b68
to
a676b8a
Compare
@@ -476,12 +515,77 @@ TEST task_multi_dependency_test(void) { | |||
PASS(); | |||
} | |||
|
|||
TEST start_kill_workers_test(void) { | |||
/* Start some workers. */ | |||
int num_workers = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should add a separate test (a stress test if you will) that tries to do this with 100 workers. I've tried it with 100 workers just now, and it wasn't working for me.
destroy_photon_mock(photon); | ||
PASS(); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would be great to add a test that dynamically changes the number of workers : e.g.,
start 20 workers
repeat {
kill 2 workers
add 1 worker
} until num_workers > 0
This dynamic test will make sure we are not leaking worker state at runtime and that we're accounting for workers correctly (with asserts before/after each _worker operation )
ok, we've discussed with Stephanie that we should do a more graceful shutdown of workers when we try to kill or evict them. It's best to send the worker a SIGTERM signal and implement a SIGTERM handler in the worker to allow the worker to close all of its sockets. In the meantime, the local scheduler schedules a timer to send a followup SIGKILL to the evicted worker after 1ms. The reason for this is that, as things are right now, the worker ends up in an inconsistent state and attempts to communicate on sockets that are still open, fails and generates core files while failing. This is reproduceable on a Mac with num_workers >= 10 . Example core dump:
code where this worker is failing:
Another example core dump:
Here, the worker is trying to send the GET_TASK message to photon . Takeaway, we should let the worker clean up its socket connections and gracefully self-terminate. |
|
||
/* Clean up the task in progress. */ | ||
if (worker->task_in_progress) { | ||
/* TODO(swang): Update the task table to mark the task as lost. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So, this will stall all the downstream tasks that depend on any objects produced by this task_in_progress. We should really have a mechanism for re-scheduling the task in flight. One possibility is to
just update the task table entry for this task with status = TASK_STATUS_KILLED and subscribe to this task status in the global scheduler. The GS handler would then re-schedule the task.
Without this, we are likely to see hangs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, this task will never report done back to photon, so we need to do resource accounting for this task. Otherwise we will leak dynamic resources.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
basically, you need this code in this if block:
task_spec *spec = task_task_spec(worker->task_in_progress);
/* Return dynamic resources back for the task in progress. */
for (int i = 0; i < MAX_RESOURCE_INDEX; i++) {
state->dynamic_resources[i] += task_spec_get_required_resource(spec, i);
/* Sanity-check resource vector boundary conditions. */
CHECK(state->dynamic_resources[i] <= state->static_resources[i]);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this PR is a little incomplete right now, since workers are only killed during cleanup, when reporting task death doesn't matter anymore. I'm hoping to implement reporting task death properly in a separate PR for task preemption.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the resource accounting bit should still be addressed. I'm happy to do it in a separate PR, along with photon_tests updates that would expose and test this condition.
* ignored. | ||
* @return Void. | ||
*/ | ||
void kill_worker(local_scheduler_client *worker, bool wait) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we kill a worker, we probably need to let the scheduling algorithm do some clean up as well (in particular, the scheduling algorithm needs to remove the worker struct from its list of available workers, otherwise it will try to assign a task to that worker).
Workers can be started from either the local scheduler or the Python script, as before. Workers send their process IDs to the local scheduler upon initial connection, so that the local scheduler can clean up worker child processes.