-
Notifications
You must be signed in to change notification settings - Fork 6.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Plasma and worker node failure #347
Plasma and worker node failure #347
Conversation
Merged build finished. Test FAILed. |
Test FAILed. |
Merged build finished. Test FAILed. |
Test FAILed. |
Merged build finished. Test PASSed. |
Test PASSed. |
5ea0a48
to
2b54b3e
Compare
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
Merged build finished. Test PASSed. |
Test PASSed. |
aadf37d
to
957d9b5
Compare
Merged build finished. Test FAILed. |
Test FAILed. |
ok = self.redis.execute_command("RAY.TASK_TABLE_UPDATE", | ||
task_id, | ||
TASK_STATUS_LOST, | ||
NIL_ID) | ||
if ok != b"OK": | ||
log.warn("Failed to update lost task for dead scheduler.") | ||
|
||
def cleanup_object_table(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to make sure I understand what's happening, the following scenario is possible, right?
- The monitor mistakenly marks a plasma manager as dead.
- The monitor removes the manager from the DB client table.
- The monitor removes the manager from all entries of the object table.
- The manager adds a new object to the object table.
That could lead to a get hanging, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this will be an issue if the manager then dies after the add completes (if the manager is still alive, it will still be able to serve requests). Right now the failure scenario that we're handling is that components are actually dead when we timeout their heartbeats.
self.client2.transfer("127.0.0.1", self.port1, object_id2) | ||
# Transfer the buffer to the the other Plasma store. There is a race | ||
# condition on the create and transfer of the object, so keep trying | ||
# until the object appears on the second Plasma store. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, good catch.
@@ -20,6 +20,14 @@ extern "C" { | |||
} | |||
#endif | |||
|
|||
/* The duration between heartbeats. These are sent by the plasma manager and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be
/** The duration ...
* local scheduler. */
And similarly below.
@@ -44,6 +44,11 @@ typedef struct { | |||
table_fail_callback fail_callback; | |||
} RetryInfo; | |||
|
|||
static const RetryInfo heartbeat_retry = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not valid C++, right? how did this compile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we probably shouldn't retry heartbeats, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see that you're using these retries as the actual mechanism for sending heartbeats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's valid?
Yeah, it's a bit of a hack, but I thought it was better than having to allocate/deallocate the memory every time. Probably we should rethink this one we do redis.c redux. :)
Merged build finished. Test PASSed. |
Test PASSed. |
@@ -81,8 +81,13 @@ int64_t table_timeout_handler(event_loop *loop, | |||
|
|||
CHECK(callback_data->retry.num_retries >= 0 || | |||
callback_data->retry.num_retries == -1); | |||
LOG_WARN("retrying operation %s, retry_count = %d", callback_data->label, | |||
callback_data->retry.num_retries); | |||
if (callback_data->retry.timeout > HEARTBEAT_TIMEOUT_MILLISECONDS) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
won't this suppress retry messages from things other than heartbeats?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. If you have any ideas on how to do this better, I'm open. Maybe comparing the label string?
* heartbeat contains this database client's ID. Heartbeats can be subscribed | ||
* to through the plasma_managers channel. Once called, this "retries" the | ||
* heartbeat operation forever, every HEARTBEAT_TIMEOUT_MILLISECONDS | ||
* milliseconds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, all of this is to avoid a malloc? It feels cleaner to me to have the timer in the manager instead of in redis code. Especially since that's what we're doing with the local scheduler and we'll need to do it if we want to include load information or other information in the heartbeats.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm the original reason was actually that I wanted to use a RetryInfo
with 0 retries, and I'd thought that I couldn't do that, but I think I can work around it now.
time.sleep(HEARTBEAT_TIMEOUT_MILLISECONDS * 1e-3) | ||
|
||
|
||
if __name__ == '__main__': |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"__main__"
# If the data was an integer, then the message was a response to an | ||
# initial subscription request. | ||
is_subscribe = int(data) | ||
message_handler = self.subscribe_handler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we also add here assert(not self.subscribed[channel])
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
or just get rid of the try/except
and instead do an
if not self.subscribed[channel]:
is_subscribe = int(data)
...
else:
...
"""Handle a notification from the db_client table from Redis. | ||
|
||
This handler processes any notifications for deletions from the db_client | ||
table. Insertions are ignored. Cleanup of the associate state in the state |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
associate
-> associated
manager. | ||
""" | ||
# The first DB_CLIENT_ID_SIZE characters are the client ID. | ||
db_client_id = data[:DB_CLIENT_ID_SIZE] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also assert that len(data) == DB_CLIENT_ID_SIZE
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I will fix this in another PR that switches to flatbuffers for plasma manager, like @pcmoritz suggested.
This provides fault tolerance in the case of a Plasma store or manager failure on a worker node. As a result, this also provides fault tolerance for a worker node.
To provide fault detection, Plasma managers send heartbeats to the Python monitoring process. Plasma managers that time out are deleted (using tombstones) from the
db_client
table by the monitor process. Once the deletion is published, the monitor process cleans up the object table by removing the dead Plasma manager from any location entries.This also fixes a couple potential bugs in the Plasma manager including: