-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Plasma and worker node failure. #373
Plasma and worker node failure. #373
Conversation
Merged build finished. Test FAILed. |
Test FAILed. |
python/ray/monitor.py
Outdated
table. Insertions are ignored. Cleanup of the associate state in the state | ||
tables should be handled by the caller. | ||
|
||
As documented in common/redis_module/ray_redis_module.c, the format for the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is outdated now!
@@ -73,6 +73,11 @@ flatbuffers::Offset<flatbuffers::String> RedisStringToFlatbuf( | |||
* Publish a notification to a client's notification channel about an insertion | |||
* or deletion to the db client table. | |||
* | |||
* The format for the published notification is: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO: think about flatbufferizing this!
if (!published) { | ||
return RedisModule_ReplyWithError(ctx, "PUBLISH unsuccessful"); | ||
if (!published) { | ||
RedisModule_CloseKey(db_client_table_key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this out of the if (to the front) and combine it with the one below the if
@@ -159,14 +164,20 @@ int Connect_RedisCommand(RedisModuleCtx *ctx, | |||
RedisModuleKey *db_client_table_key = | |||
OpenPrefixedKey(ctx, DB_CLIENT_PREFIX, ray_client_id, REDISMODULE_WRITE); | |||
|
|||
if (RedisModule_KeyType(db_client_table_key) != REDISMODULE_KEYTYPE_EMPTY) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
RedisModule_HashGet(db_client_table_key, REDISMODULE_HASH_CFIELDS, "deleted", | ||
&deleted_string, NULL); | ||
long long deleted; | ||
int parsed = RedisModule_StringToLongLong(deleted_string, &deleted); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the feeling a bunch of this code could be improved if we move more flatbuffers. Do you want to do this as a followup PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good point! I'll leave a TODO and do it another PR.
src/plasma/plasma_manager.cc
Outdated
@@ -765,35 +779,27 @@ void process_transfer_request(event_loop *loop, | |||
return; | |||
} | |||
|
|||
/* Allocate and append the request to the transfer queue. */ | |||
ObjectBuffer obj_buffer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that we have the new PascalCase naming convention, let's try to move away from these abbreviations and write object_buffer instead!
src/plasma/plasma_manager.cc
Outdated
/* We pass in 0 to indicate that the command should return immediately. */ | ||
plasma_get(conn->manager_state->plasma_conn, &obj_id, 1, 0, &obj_buffer); | ||
if (obj_buffer.data_size == -1) { | ||
/* If the object wasn't locally available, exit immediately. If the object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, I made some small comments.
37db5c4
to
d090c33
Compare
Build finished. Test FAILed. |
Test FAILed. |
src/common/state/table.h
Outdated
@@ -44,6 +44,11 @@ typedef struct { | |||
table_fail_callback fail_callback; | |||
} RetryInfo; | |||
|
|||
static const RetryInfo heartbeat_retry = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We aren't using this struct initialization anywhere else in the code base (because normally it doesn't compile in C++, I have no idea why it's compiling here).
E.g., http://stackoverflow.com/questions/5790534/static-structure-initialization-with-tags-in-c or http://stackoverflow.com/questions/11516657/c-structure-initialization. Obviously it's compiling so I'm mistaken about something, but it seems best to avoid this.
Can we just define it in plasma_manager_send_heartbeat
? Or better yet, just pass in NULL
to use the default retry info?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, we're using it in db_client_table.cc
, but yeah, we can define it there instead. I don't want to pass in NULL
since the default retry is to try things infinitely (we should just try heartbeats once).
src/plasma/plasma_manager.cc
Outdated
event_loop_add_file(manager_state->loop, manager_conn->fd, EVENT_LOOP_WRITE, | ||
send_queued_request, manager_conn); | ||
if (manager_conn->transfer_queue == NULL) { | ||
/* If we already have a connection to this manager and its inactive, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
its
-> it's
Merged build finished. Test FAILed. |
Test FAILed. |
Merged build finished. Test FAILed. |
Test FAILed. |
Merged build finished. Test FAILed. |
Test FAILed. |
@@ -472,9 +471,10 @@ void process_plasma_notification(event_loop *loop, | |||
uint8_t *notification = read_message_async(loop, client_sock); | |||
if (!notification) { | |||
/* The store has closed the socket. */ | |||
LocalSchedulerState_free(state); | |||
LOG_FATAL( | |||
kill(getpid(), SIGTERM); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice job tracking this down!
This was a pretty nasty bug, so probably worth documentation exactly what was going on.
The problem is the following, right?
When a Python script finished, Ray would call cleanup
in services.py
, which would kill the plasma store. That would cause this code here to get run and the local scheduler would call LocalSchedulerState_free
and begin to clean up its state (e.g., killing its workers), but then services.py
would also send a kill signal to the local scheduler, which would then cause LocalSchedulerState_free
to start running again and then the array of workers was already freed so it was looking at invalid data and would try to call kill on PID 0, which cause problems (e.g., it looked like it caused the driver to die).
It's not completely clear to me that this solves the problem, but I think that you can't interrupt the SIGINT handler with another SIGINT https://www.gnu.org/software/libc/manual/html_node/Signals-in-Handler.html#Signals-in-Handler, but the SIGINT handler may still get run twice.
@@ -252,8 +252,7 @@ void start_worker(LocalSchedulerState *state, ActorID actor_id) { | |||
execvp(start_actor_worker_command[0], | |||
(char *const *) start_actor_worker_command); | |||
free(start_actor_worker_command); | |||
LocalSchedulerState_free(state); | |||
LOG_FATAL("Failed to start worker"); | |||
kill(getpid(), SIGTERM); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth adding some description here (see comment below).
test/component_failures_test.py
Outdated
ray.services.all_processes[ray.services.PROCESS_TYPE_GLOBAL_SCHEDULER][0], | ||
]: | ||
process.terminate() | ||
process.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Want to add another test does the same thing as this, but does all of the process.terminate()
calls and then all of the process.wait()
calls instead of serializing them?
src/common/state/db_client_table.cc
Outdated
void plasma_manager_send_heartbeat(DBHandle *db_handle) { | ||
RetryInfo heartbeat_retry = {.num_retries = 0, | ||
.timeout = HEARTBEAT_TIMEOUT_MILLISECONDS, | ||
.fail_callback = NULL}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you mind replacing this with
RetryInfo heartbeat_retry;
heartbeat_retry.num_retries = 0;
heartbeat_retry.timeout = HEARTBEAT_TIMEOUT_MILLISECONDS;
heartbeat_retry.fail_callback = NULL;
Note that we use the default retry info for the local scheduler heartbeats. I agree retrying doesn't make sense (anyway, I'm not suggesting that we change it here).
Merged build finished. Test FAILed. |
Test FAILed. |
Merged build finished. Test FAILed. |
Test FAILed. |
test/component_failures_test.py
Outdated
]: | ||
process.terminate() | ||
time.sleep(0.1) | ||
process.kill() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why was it necessary to add the process.kill()
? why wasn't process.terminate()
enough?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In earlier commits, some of these tests were hanging (example logs). I wasn't able to confirm it locally, but I'm pretty sure it was hanging at one of the process.wait()
lines.
Merged build finished. Test PASSed. |
Test PASSed. |
This should address #264. |
This replaces #347.