Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plasma and worker node failure. #373

Merged
merged 25 commits into from
Mar 18, 2017

Conversation

stephanie-wang
Copy link
Contributor

This replaces #347.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/306/
Test FAILed.

table. Insertions are ignored. Cleanup of the associate state in the state
tables should be handled by the caller.

As documented in common/redis_module/ray_redis_module.c, the format for the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment is outdated now!

@@ -73,6 +73,11 @@ flatbuffers::Offset<flatbuffers::String> RedisStringToFlatbuf(
* Publish a notification to a client's notification channel about an insertion
* or deletion to the db client table.
*
* The format for the published notification is:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: think about flatbufferizing this!

if (!published) {
return RedisModule_ReplyWithError(ctx, "PUBLISH unsuccessful");
if (!published) {
RedisModule_CloseKey(db_client_table_key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this out of the if (to the front) and combine it with the one below the if

@@ -159,14 +164,20 @@ int Connect_RedisCommand(RedisModuleCtx *ctx,
RedisModuleKey *db_client_table_key =
OpenPrefixedKey(ctx, DB_CLIENT_PREFIX, ray_client_id, REDISMODULE_WRITE);

if (RedisModule_KeyType(db_client_table_key) != REDISMODULE_KEYTYPE_EMPTY) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

RedisModule_HashGet(db_client_table_key, REDISMODULE_HASH_CFIELDS, "deleted",
&deleted_string, NULL);
long long deleted;
int parsed = RedisModule_StringToLongLong(deleted_string, &deleted);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the feeling a bunch of this code could be improved if we move more flatbuffers. Do you want to do this as a followup PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good point! I'll leave a TODO and do it another PR.

@@ -765,35 +779,27 @@ void process_transfer_request(event_loop *loop,
return;
}

/* Allocate and append the request to the transfer queue. */
ObjectBuffer obj_buffer;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we have the new PascalCase naming convention, let's try to move away from these abbreviations and write object_buffer instead!

/* We pass in 0 to indicate that the command should return immediately. */
plasma_get(conn->manager_state->plasma_conn, &obj_id, 1, 0, &obj_buffer);
if (obj_buffer.data_size == -1) {
/* If the object wasn't locally available, exit immediately. If the object
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

@pcmoritz pcmoritz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, I made some small comments.

@AmplabJenkins
Copy link

Build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/312/
Test FAILed.

@@ -44,6 +44,11 @@ typedef struct {
table_fail_callback fail_callback;
} RetryInfo;

static const RetryInfo heartbeat_retry = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We aren't using this struct initialization anywhere else in the code base (because normally it doesn't compile in C++, I have no idea why it's compiling here).

E.g., http://stackoverflow.com/questions/5790534/static-structure-initialization-with-tags-in-c or http://stackoverflow.com/questions/11516657/c-structure-initialization. Obviously it's compiling so I'm mistaken about something, but it seems best to avoid this.

Can we just define it in plasma_manager_send_heartbeat? Or better yet, just pass in NULL to use the default retry info?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, we're using it in db_client_table.cc, but yeah, we can define it there instead. I don't want to pass in NULL since the default retry is to try things infinitely (we should just try heartbeats once).

event_loop_add_file(manager_state->loop, manager_conn->fd, EVENT_LOOP_WRITE,
send_queued_request, manager_conn);
if (manager_conn->transfer_queue == NULL) {
/* If we already have a connection to this manager and its inactive,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its -> it's

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/324/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/325/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/326/
Test FAILed.

@@ -472,9 +471,10 @@ void process_plasma_notification(event_loop *loop,
uint8_t *notification = read_message_async(loop, client_sock);
if (!notification) {
/* The store has closed the socket. */
LocalSchedulerState_free(state);
LOG_FATAL(
kill(getpid(), SIGTERM);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice job tracking this down!

This was a pretty nasty bug, so probably worth documentation exactly what was going on.

The problem is the following, right?

When a Python script finished, Ray would call cleanup in services.py, which would kill the plasma store. That would cause this code here to get run and the local scheduler would call LocalSchedulerState_free and begin to clean up its state (e.g., killing its workers), but then services.py would also send a kill signal to the local scheduler, which would then cause LocalSchedulerState_free to start running again and then the array of workers was already freed so it was looking at invalid data and would try to call kill on PID 0, which cause problems (e.g., it looked like it caused the driver to die).

It's not completely clear to me that this solves the problem, but I think that you can't interrupt the SIGINT handler with another SIGINT https://www.gnu.org/software/libc/manual/html_node/Signals-in-Handler.html#Signals-in-Handler, but the SIGINT handler may still get run twice.

@@ -252,8 +252,7 @@ void start_worker(LocalSchedulerState *state, ActorID actor_id) {
execvp(start_actor_worker_command[0],
(char *const *) start_actor_worker_command);
free(start_actor_worker_command);
LocalSchedulerState_free(state);
LOG_FATAL("Failed to start worker");
kill(getpid(), SIGTERM);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably worth adding some description here (see comment below).

ray.services.all_processes[ray.services.PROCESS_TYPE_GLOBAL_SCHEDULER][0],
]:
process.terminate()
process.wait()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Want to add another test does the same thing as this, but does all of the process.terminate() calls and then all of the process.wait() calls instead of serializing them?

void plasma_manager_send_heartbeat(DBHandle *db_handle) {
RetryInfo heartbeat_retry = {.num_retries = 0,
.timeout = HEARTBEAT_TIMEOUT_MILLISECONDS,
.fail_callback = NULL};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind replacing this with

RetryInfo heartbeat_retry;
heartbeat_retry.num_retries = 0;
heartbeat_retry.timeout = HEARTBEAT_TIMEOUT_MILLISECONDS;
heartbeat_retry.fail_callback = NULL;

Note that we use the default retry info for the local scheduler heartbeats. I agree retrying doesn't make sense (anyway, I'm not suggesting that we change it here).

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/327/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/328/
Test FAILed.

]:
process.terminate()
time.sleep(0.1)
process.kill()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why was it necessary to add the process.kill()? why wasn't process.terminate() enough?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In earlier commits, some of these tests were hanging (example logs). I wasn't able to confirm it locally, but I'm pretty sure it was hanging at one of the process.wait() lines.

@robertnishihara robertnishihara changed the title [WIP] Plasma and worker node failure Plasma and worker node failure. Mar 17, 2017
@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/330/
Test PASSed.

@robertnishihara robertnishihara merged commit 12c9618 into ray-project:master Mar 18, 2017
@robertnishihara robertnishihara deleted the node-failure branch March 18, 2017 00:04
@robertnishihara
Copy link
Collaborator

This should address #264.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants