Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Availability after local scheduler failure #329

Merged

Conversation

stephanie-wang
Copy link
Contributor

Detect and recover from local scheduler failures. The global scheduler updates the db_clients table after a set number of missed heartbeats from a local scheduler. A monitoring script subscribes to these updates and is responsible for marking all tasks that that local scheduler was responsible for as TASK_STATUS_LOST.

This does not provide availability if the local scheduler that failed was the one that the driver was connected to.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/143/
Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/144/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/151/
Test FAILed.

@@ -34,13 +35,14 @@
# important because it determines the order in which these processes will be
# terminated when Ray exits, and certain orders will cause errors to be logged
# to the screen.
all_processes = OrderedDict([(PROCESS_TYPE_WORKER, []),
all_processes = OrderedDict([(PROCESS_TYPE_MONITOR, []),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I run ./scripts/start_ray.sh --head on this branch, the script raises an exception when it exits and prints

Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/opt/conda/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/home/ubuntu/ray/python/ray/worker.py", line 963, in cleanup
    assert(len(processes) == 0)
AssertionError
Error in sys.exitfunc:
Traceback (most recent call last):
  File "/opt/conda/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "/home/ubuntu/ray/python/ray/worker.py", line 963, in cleanup
    assert(len(processes) == 0)
AssertionError

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, this is fixed in the next commit.

@AmplabJenkins
Copy link

Build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/155/
Test FAILed.

@stephanie-wang stephanie-wang force-pushed the local-scheduler-failure branch from 9d2455a to 5f732a3 Compare March 1, 2017 19:59
@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/156/
Test FAILed.

"""Clean up global state for a failed local scheduler.

Args:
scheduler: The db client ID of the scheduler that failed.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring is not accurate, right? This method cleans up tasks for all dead local schedulers and not just for a single dead local scheduler, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops yeah.

Args:
scheduler: The db client ID of the scheduler that failed.
"""
task_ids = self.redis.keys("{prefix}*".format(prefix=TASK_PREFIX))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using keys is considered bad because it can block the server for a long time, so sometimes scan is preferred, but the downside of scan is that the set of things can change while you're iterating over it, so maybe keys is what we want.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually can use scan!

@@ -958,9 +958,13 @@ def cleanup(worker=global_worker):
{"end_time": time.time()})
services.cleanup()
else:
# If this is not a driver, make sure there are no orphan processes.
# If this is not a driver, make sure there are no orphan processes, besides
# possibly the worker itself.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would the worker case be different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly a sanity check that a worker process didn't start any other processes.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But why the <= 1, why not == 0?

/* Check for local schedulers that have missed a number of heartbeats. If any
* local schedulers have died, notify others so that the state can be cleaned
* up. */
/* TODO(swang): If the local scheduler hasn't actually died, then it should
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why put this in the global scheduler and not the monitor?

@@ -11,11 +11,15 @@
/* The frequency with which the global scheduler checks if there are any tasks
* that haven't been scheduled yet. */
#define GLOBAL_SCHEDULER_TASK_CLEANUP_MILLISECONDS 100
#define GLOBAL_SCHEDULER_HEARTBEAT_TIMEOUT 5
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment saying that 5 refers to the number of missed heartbeats in a row that we're willing to tolerate?

Btw, I suspect this is much too low. This corresponds to a half second delay, and I wouldn't be surprised if that happens frequently.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Yeah, I can increase this, say to 10 seconds.

db_client_table_remove(state->db, local_scheduler_ptr->id, NULL, NULL,
NULL);
/* Remove the scheduler from the local state. */
remove_local_scheduler(state, i);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we really want to always remove the local scheduler. What if we only have one local scheduler (e.g., in the single-node case). Then we should probably not remove it in case the missing heartbeats were a false positive.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More generally, if this is the local scheduler on the head node, maybe we don't want to remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we're not handling failure on the head node at all right now. In the future, we should be able to discern between head node and worker node components. It would be a lot to do this as part of this PR though, since handling head node failure means at minimum restarting certain components.

local_scheduler_ptr =
(LocalScheduler *) utarray_eltptr(state->local_schedulers, i);
if (local_scheduler_ptr->num_heartbeats_missed >=
GLOBAL_SCHEDULER_HEARTBEAT_TIMEOUT) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably deserves a LOG_WARN or something, right?

@@ -75,6 +75,7 @@ void local_scheduler_reconstruct_object(LocalSchedulerConnection *conn,
ObjectID object_id) {
write_message(conn->conn, RECONSTRUCT_OBJECT, sizeof(object_id),
(uint8_t *) &object_id);
/* TODO(swang): Propagate the error. */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably put a fatal check around this (and all other local_scheduler_client methods). That can be a different PR though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. It just came up during testing, so I wanted to add the TODO so I didn't forget.

utarray_free(queue->object_notifications);
HASH_DEL(plasma_state->pending_notifications, queue);
free(queue);
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it also make sense to close client_sock?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/160/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test FAILed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/161/
Test FAILed.

@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/168/
Test PASSed.

# associated with a node as LOST during cleanup.
self.local_schedulers.add(NIL_ID)

def subscribe(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Methods like this should eventually be collected into a python "state" module (I started doing this in experimental/state); I hope once we have full flatbuffer coverage we can use the python flatbuffer module to generate methods that read this state automatically (and get rid of code duplication like DBClientID_SIZE = 20)


# These variables must be kept in sync with the C codebase.
# common/common.h
DBClientID_SIZE = 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change this to DB_CLIENT_ID_SIZE?

entry);
/* The hash entry is shared with the local_scheduler_plasma hashmap and
* will
* be freed there. */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this line can be combined with the previous


# Read messages from the subscription channel.
while True:
time.sleep(LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS / 1000.)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can replace 1000. with 1000, since we are doing from __future__ import division at the top

@@ -972,6 +1048,11 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx,
return REDISMODULE_ERR;
}

if (RedisModule_CreateCommand(ctx, "ray.disconnect", Disconnect_RedisCommand,
"write", 0, 0, 0) == REDISMODULE_ERR) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, should both this (and ray.connect) be "write pubsub" instead of just "write"?

stephanie-wang and others added 7 commits March 2, 2017 13:54
First pass at a monitoring script - monitor can detect local scheduler death

Clean up task table upon local scheduler death in monitoring script

Don't schedule to dead local schedulers in global scheduler

Have global scheduler update the db clients table, monitor script cleans up state

Documentation

Monitor script should scan tables before beginning to read from subscription channel

Fix for python3

Redirect monitor output to redis logs, fix hanging in multinode tests
@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/169/
Test PASSed.

}

RedisModuleString *client_type;
RedisModuleString *aux_address;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to call RedisModule_FreeString on these two strings?

@stephanie-wang stephanie-wang force-pushed the local-scheduler-failure branch from 3334d47 to 9bf8519 Compare March 2, 2017 23:49
@AmplabJenkins
Copy link

Merged build finished. Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/175/
Test PASSed.

@robertnishihara robertnishihara merged commit 41b8675 into ray-project:master Mar 3, 2017
@robertnishihara robertnishihara deleted the local-scheduler-failure branch March 3, 2017 03:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants