Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start and clean up workers from the local scheduler. #250

Merged
merged 2 commits into from
Feb 10, 2017

Conversation

stephanie-wang
Copy link
Contributor

Workers can be started from either the local scheduler or the Python script, as before. Workers send their process IDs to the local scheduler upon initial connection, so that the local scheduler can clean up worker child processes.

time.sleep(0)

# Allow the process one second to exit gracefully.
p.terminate()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why reverse the order of kill and terminate? Doesn't it make sense to start with kill?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kill is stronger than terminate. I'm doing terminate first to give the local scheduler a chance to catch the signal and clean up state. Previously it wasn't really an issue because it just needed to free memory that would get cleaned up on exit anyway, but now there are worker processes to clean up.

@@ -1523,6 +1523,7 @@ def process_task(task): # wrapping these lines in a function should cause the lo
"function_name": function_name})

check_main_thread()
worker.photon_client.register_pid()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is more appropriate for this to be part of photon_connect. I needed something similar for the actor PR (to pass an "actor ID" from the worker to the local scheduler), so we should probably use the same mechanism.

I did it like this 5b22750#diff-2a906576526ef8fb8a2de97b6e0bc3ceR7

What do you think about doing it that way? (Your PR will be ready first)

NOTE: the local scheduler should never terminate a driver, so if we do it in photon_connect, we will need the option of passing -1 or something to indicate that it is a driver. (but this will be useful anyway because the local scheduler really ought to know which clients are drivers (for computing resource capacity).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the purpose of this should be documented (e.g., "We pass the process ID to the local scheduler so that the local scheduler can terminate this worker if it wants to.")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, makes sense to do it as part of photon_connect.

The local scheduler won't ever terminate the driver. It only terminates child processes. If you look at the handler for REGISTER_PID messages, it matches up worker clients with child processes according to pid.

void kill_process(pid_t pid) {
/* Kill the worker violently. */
kill(pid, SIGKILL);
waitpid(pid, NULL, 0);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little nervous that in pathological situations this could cause the local scheduler to hang forever.

Even during normal usage, do we really want to local scheduler to block here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm not sure about this either...ideally we would mark the worker as dead, and then clean up the rest of the state after receiving some signal that it died. I can try to look into that.

@@ -50,7 +44,7 @@ typedef struct scheduling_algorithm_state scheduling_algorithm_state;
* scheduler. */
typedef struct {
/** The script to use when starting a new worker. */
char *start_worker_command;
const char **start_worker_command;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why split the command into different strings? Is this so you can use fork instead of popen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's for execvp.

/* We can't start a worker if we don't have the path to the worker script. */
CHECK(state->config.start_worker_command != NULL);
/* Launch the process to create the worker. */
pid_t pid = fork();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use fork instead of popen? With fork the new worker process will have some leftover state from the local scheduler, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It calls execvp soon after, which if successful, completely replaces the process image.

popen didn't seem necessary in this case, since we're already communicating with the process through sockets. Also, I needed the PID to be returned to the local scheduler so that it could later figure out if a worker process was a child or not.

@@ -46,7 +67,7 @@ void kill_worker(local_scheduler_client *worker, bool wait) {
/* If the worker has registered a process ID with us, use to send a kill
* signal. Else, the worker will exit since it won't be able to connect. */
if (worker->pid != 0) {
kill_process(worker->pid);
kill(pid, SIGKILL);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is pid defined? Doesn't this need to be worker->pid?

num_workers_per_local_scheduler[i] += 1
num_workers -= 1
i += 1
i %= num_local_schedulers
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor, but I would rewrite lines 495-500 as

for i in range(num_local_schedulers):
  num_workers_per_local_scheduler[i % num_local_schedulers] += 1

That's what it's doing, right?

@@ -345,6 +509,8 @@ void new_client_connection(event_loop *loop,
local_scheduler_client *worker = malloc(sizeof(local_scheduler_client));
worker->sock = new_socket;
worker->task_in_progress = NULL;
worker->pid = 0;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can a normal PID equal 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not any worker PID. PID 0 is reserved by the kernel.

@@ -84,6 +81,10 @@ typedef struct {
* no task is running on the worker, this will be NULL. This is used to
* update the task table. */
task *task_in_progress;
/** The process ID of the client. */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe document that if this is 0, that means that the client has not connected to the local scheduler yet.

@@ -500,6 +669,13 @@ int main(int argc, char *argv[]) {
if (!node_ip_address) {
LOG_FATAL("please specify the node IP address with -h switch");
}
int num_workers = 0;
if (num_workers_str) {
num_workers = strtol(num_workers_str, NULL, 10);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should check that strtol succeeded and that num_workers_str was not malformed

Ability to kill workers in photon scheduler

Test for old method of starting workers

Common codepath for killing workers

Common codepath for killing workers

Photon test case for starting and killing workers

fix build

Fix component failure test

Register a worker's pid as part of initial connection

Address comments and revert photon_connect

Set PATH during travis install

Fix
@@ -476,12 +515,77 @@ TEST task_multi_dependency_test(void) {
PASS();
}

TEST start_kill_workers_test(void) {
/* Start some workers. */
int num_workers = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should add a separate test (a stress test if you will) that tries to do this with 100 workers. I've tried it with 100 workers just now, and it wasn't working for me.

destroy_photon_mock(photon);
PASS();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be great to add a test that dynamically changes the number of workers : e.g.,
start 20 workers
repeat {
kill 2 workers
add 1 worker
} until num_workers > 0
This dynamic test will make sure we are not leaking worker state at runtime and that we're accounting for workers correctly (with asserts before/after each _worker operation )

@atumanov
Copy link
Contributor

ok, we've discussed with Stephanie that we should do a more graceful shutdown of workers when we try to kill or evict them. It's best to send the worker a SIGTERM signal and implement a SIGTERM handler in the worker to allow the worker to close all of its sockets. In the meantime, the local scheduler schedules a timer to send a followup SIGKILL to the evicted worker after 1ms. The reason for this is that, as things are right now, the worker ends up in an inconsistent state and attempts to communicate on sockets that are still open, fails and generates core files while failing. This is reproduceable on a Mac with num_workers >= 10 . Example core dump:

(lldb) bt
* thread #1: tid = 0x0000, 0x00007fff9b1b5f06 libsystem_kernel.dylib`__pthread_kill + 10, stop reason = signal SIGSTOP
  * frame #0: 0x00007fff9b1b5f06 libsystem_kernel.dylib`__pthread_kill + 10
    frame #1: 0x00007fff931a04ec libsystem_pthread.dylib`pthread_kill + 90
    frame #2: 0x00007fff8bda26df libsystem_c.dylib`abort + 129
    frame #3: 0x0000000105d91281 libphoton.so`photon_connect(photon_socket="/tmp/photon_socket_1115438165") + 321 at photon_client.c:14
    frame #4: 0x0000000105d8e4ee libphoton.so`PyPhotonClient_init(self=0x00000001002ac3d8, args=0x00000001034e3bd0, kwds=0x0000000000000000) + 78 at photon_extension.c:24
    frame #5: 0x0000000100062465 libpython2.7.dylib`type_call + 229
    frame #6: 0x000000010000c761 libpython2.7.dylib`PyObject_Call + 97
    frame #7: 0x00000001000a35a5 libpython2.7.dylib`PyEval_EvalFrameEx + 4405
    frame #8: 0x00000001000ac7dc libpython2.7.dylib`PyEval_EvalCodeEx + 2092
    frame #9: 0x00000001000ac988 libpython2.7.dylib`fast_function + 296
    frame #10: 0x00000001000a338a libpython2.7.dylib`PyEval_EvalFrameEx + 3866
    frame #11: 0x00000001000ac7dc libpython2.7.dylib`PyEval_EvalCodeEx + 2092
    frame #12: 0x00000001000ac856 libpython2.7.dylib`PyEval_EvalCode + 54
    frame #13: 0x00000001000cb3d4 libpython2.7.dylib`PyRun_FileExFlags + 164
    frame #14: 0x00000001000cc709 libpython2.7.dylib`PyRun_SimpleFileExFlags + 409
    frame #15: 0x00000001000e01ca libpython2.7.dylib`Py_Main + 2938
    frame #16: 0x0000000100000f14 python`start + 52

code where this worker is failing:

  pid_t my_pid = getpid();
  int success = write_message(result->conn, REGISTER_PID, sizeof(my_pid),
                              (uint8_t *) &my_pid);
  CHECKM(success == 0, "Unable to register worker with local scheduler");
  return result;
}

Another example core dump:

(lldb) bt
* thread #1: tid = 0x0000, 0x00007fff9b1b5f06 libsystem_kernel.dylib`__pthread_kill + 10, stop reason = signal SIGSTOP
  * frame #0: 0x00007fff9b1b5f06 libsystem_kernel.dylib`__pthread_kill + 10
    frame #1: 0x00007fff931a04ec libsystem_pthread.dylib`pthread_kill + 90
    frame #2: 0x00007fff8bda26df libsystem_c.dylib`abort + 129
    frame #3: 0x0000000105d8f739 libphoton.so`photon_get_task(conn=0x000000010184fc20) + 297 at photon_client.c:58
    frame #4: 0x0000000105d8c5b2 libphoton.so`PyPhotonClient_get_task(self=0x00000001002ac3d8) + 34 at photon_extension.c:49
    frame #5: 0x00000001000a2f66 libpython2.7.dylib`PyEval_EvalFrameEx + 2806
    frame #6: 0x00000001000ac7dc libpython2.7.dylib`PyEval_EvalCodeEx + 2092
    frame #7: 0x00000001000ac988 libpython2.7.dylib`fast_function + 296
    frame #8: 0x00000001000a338a libpython2.7.dylib`PyEval_EvalFrameEx + 3866
    frame #9: 0x00000001000ac7dc libpython2.7.dylib`PyEval_EvalCodeEx + 2092
    frame #10: 0x00000001000ac856 libpython2.7.dylib`PyEval_EvalCode + 54
    frame #11: 0x00000001000cb3d4 libpython2.7.dylib`PyRun_FileExFlags + 164
    frame #12: 0x00000001000cc709 libpython2.7.dylib`PyRun_SimpleFileExFlags + 409
    frame #13: 0x00000001000e01ca libpython2.7.dylib`Py_Main + 2938
    frame #14: 0x0000000100000f14 python`start + 52

Here, the worker is trying to send the GET_TASK message to photon .

Takeaway, we should let the worker clean up its socket connections and gracefully self-terminate.


/* Clean up the task in progress. */
if (worker->task_in_progress) {
/* TODO(swang): Update the task table to mark the task as lost. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this will stall all the downstream tasks that depend on any objects produced by this task_in_progress. We should really have a mechanism for re-scheduling the task in flight. One possibility is to
just update the task table entry for this task with status = TASK_STATUS_KILLED and subscribe to this task status in the global scheduler. The GS handler would then re-schedule the task.
Without this, we are likely to see hangs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, this task will never report done back to photon, so we need to do resource accounting for this task. Otherwise we will leak dynamic resources.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically, you need this code in this if block:

      task_spec *spec = task_task_spec(worker->task_in_progress);
      /* Return dynamic resources back for the task in progress. */
      for (int i = 0; i < MAX_RESOURCE_INDEX; i++) {
        state->dynamic_resources[i] += task_spec_get_required_resource(spec, i);
        /* Sanity-check resource vector boundary conditions. */
        CHECK(state->dynamic_resources[i] <= state->static_resources[i]);
      }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this PR is a little incomplete right now, since workers are only killed during cleanup, when reporting task death doesn't matter anymore. I'm hoping to implement reporting task death properly in a separate PR for task preemption.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the resource accounting bit should still be addressed. I'm happy to do it in a separate PR, along with photon_tests updates that would expose and test this condition.

@robertnishihara robertnishihara merged commit 2b8e648 into ray-project:master Feb 10, 2017
@robertnishihara robertnishihara deleted the start-workers branch February 10, 2017 20:46
* ignored.
* @return Void.
*/
void kill_worker(local_scheduler_client *worker, bool wait) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we kill a worker, we probably need to let the scheduling algorithm do some clean up as well (in particular, the scheduling algorithm needs to remove the worker struct from its list of available workers, otherwise it will try to assign a task to that worker).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants