Skip to content

Local scheduler filters out dead clients during reconstruction #1182

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

stephanie-wang
Copy link
Contributor

@stephanie-wang stephanie-wang commented Nov 2, 2017

When there are many keys in Redis, monitor cleanup of the task and object tables is very slow. This leads to high latencies (minutes or more) during reconstruction.

This removes that latency by allowing the local scheduler to listen to notifications about dead clients from the db client table and filter out lost tasks/objects itself, instead of waiting for the monitor. This includes the following changes:

  1. Expose a call to the db_client_table that allows a client to build a cache of the other database clients.
  2. Add an option to specify a local scheduler ID in the task_table_test_and_update call. If this option is set, then the current local scheduler ID is tested before doing the task update.
  3. During reconstruction, if the local scheduler is not able to update a task to TASK_STATUS_RECONSTRUCTING at first, then it checks if the currently assigned local scheduler is dead. If yes, then the local scheduler tries the test_and_update again.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2254/
Test FAILed.

@stephanie-wang stephanie-wang force-pushed the local-scheduler-dead-clients branch from 81a5a43 to 9924066 Compare November 3, 2017 00:16
@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2255/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2256/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2257/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2259/
Test PASSed.

Copy link
Collaborator

@robertnishihara robertnishihara left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work here, I left some comments/questions. My primary question is the one about the two code paths for reconstruction.

Also, this PR does a lot more than what is stated in the PR description. Could you add some more details to the description? Including why we need to change the task_table_test_and_update API?

CHECK(manager_ids.size() == 2);
const std::vector<std::string> managers =
db_client_table_get_ip_addresses(db, manager_ids);
if (sscanf(managers.at(0).c_str(), "%15[0-9.]:%5[0-9]", received_addr1,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to use this function?

int parse_ip_addr_port(const char *ip_addr_port, char *ip_addr, int *port) {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, thanks! I think this is legacy.

received_port1) != 2) {
CHECK(0);
}
if (sscanf(manager_vector.at(1).c_str(), "%15[0-9.]:%5[0-9]", received_addr2,
if (sscanf(managers.at(1).c_str(), "%15[0-9.]:%5[0-9]", received_addr2,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment here.

}

DBClient db_client_table_cache_get(DBHandle *db_handle, DBClientID client_id) {
CHECK(!ObjectID_is_nil(client_id));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be DBClientID_is_nil

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

@@ -464,7 +458,7 @@ PlasmaManagerState *PlasmaManagerState_init(const char *store_socket_name,
db_connect_args[1] = store_socket_name;
db_connect_args[2] = "manager_socket_name";
db_connect_args[3] = manager_socket_name;
db_connect_args[4] = "address";
db_connect_args[4] = "aux_address";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this terminology change?

This is the address of the plasma manager, so doesn't it make sense to call it address (or perhaps manager_address)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly so that we can have just one field for both the local scheduler and the plasma manager, instead of two. I guess it's not totally necessary, but it makes the parsing on the Redis module a little easier if we only have to look for one string. I think calling it manager_address instead probably makes sense though.

* avoid waiting for the heartbeat timeout. */
if (state->db != NULL) {
local_scheduler_table_disconnect(state->db);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At a high level, is there some danger that this change introduces two code separate code paths? One that gets used when we kill the local schedulers manually and the exit gracefully, and one that gets used when things die unexpectedly and ungracefully? The first code path is what happens in all of our tests, and the second code path is what would happen in real world usage?

Do you see a resolution to this? What are your thoughts about it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's true. When I wrote the local_scheduler_table_disconnect call, I tried to match it to the current codepath as much as possible, i.e. the global scheduler sets the last heartbeat to long ago instead of deleting the local scheduler immediately. Therefore, it should be as if we just sped up the time to timeout the local scheduler, but it's hard to say if we covered all corner cases.

Probably the resolution to this is to do some more thorough testing with a live cluster, where we abruptly kill nodes. This is on the backlog.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the only point of this to speed up the tests? If so, another alternative is to just use a smaller number of missed heartbeats to trigger cleanup in the tests. If you think that makes sense we can do it in a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's also for cases where the local scheduler doesn't exit fatally. I think it's useful to keep it here so that we can do things like scale down a live cluster without having to wait for the full heartbeat timeout.

@@ -18,14 +18,14 @@
typedef void (*object_table_lookup_done_callback)(
ObjectID object_id,
bool never_created,
const std::vector<std::string> &manager_vector,
const std::vector<DBClientID> &manager_ids,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great change.

/* The test-and-set failed. The task is either: (1) not finished yet, (2)
* lost, but not yet updated, or (3) already being reconstructed. */
DBClientID current_local_scheduler_id = Task_local_scheduler(task);
if (!ObjectID_is_nil(current_local_scheduler_id)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DBClientID_is_nil

/* The test-and-set failed. The task is either: (1) not finished yet, (2)
* lost, but not yet updated, or (3) already being reconstructed. */
DBClientID current_local_scheduler_id = Task_local_scheduler(task);
if (!ObjectID_is_nil(current_local_scheduler_id)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DBClientID_is_nil

if (!ObjectID_is_nil(current_local_scheduler_id)) {
DBClient current_local_scheduler =
db_client_table_cache_get(state->db, current_local_scheduler_id);
if (!current_local_scheduler.is_insertion) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The is_insertion terminology doesn't really make sense in this context. E.g., it's a property of an update to the DBClient, not of the DBClient itself, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, perhaps we can change it to is_alive?

/* A test local scheduler ID was provided. Test whether it is equal to
* the current local scheduler ID before performing the update. */
RedisModuleString *test_local_scheduler_id = argv[5];
if (RedisModule_StringCompare(current_local_scheduler_id,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do this comparison before the if (current_state_integer & test_state_bitmask) { line? It seems a little confusing to start with update = false, then set update = true, and then set it back to update = false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, let me clean up this section...

@robertnishihara
Copy link
Collaborator

I introduced some conflicts in #1192 (which moved configuration constants into a separate file). I think it should just involved renaming constants.

@stephanie-wang stephanie-wang force-pushed the local-scheduler-dead-clients branch from 8faf14d to 4587c34 Compare November 9, 2017 06:08
@stephanie-wang stephanie-wang force-pushed the local-scheduler-dead-clients branch from 4587c34 to cfccebd Compare November 9, 2017 06:14
@AmplabJenkins
Copy link

Build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2318/
Test PASSed.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2319/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2320/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/2329/
Test PASSed.

@robertnishihara robertnishihara merged commit 07f0532 into ray-project:master Nov 10, 2017
@robertnishihara robertnishihara deleted the local-scheduler-dead-clients branch November 10, 2017 19:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants