-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Availability after local scheduler failure #329
Availability after local scheduler failure #329
Conversation
Test FAILed. |
Test FAILed. |
Merged build finished. Test FAILed. |
Test FAILed. |
@@ -34,13 +35,14 @@ | |||
# important because it determines the order in which these processes will be | |||
# terminated when Ray exits, and certain orders will cause errors to be logged | |||
# to the screen. | |||
all_processes = OrderedDict([(PROCESS_TYPE_WORKER, []), | |||
all_processes = OrderedDict([(PROCESS_TYPE_MONITOR, []), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I run ./scripts/start_ray.sh --head
on this branch, the script raises an exception when it exits and prints
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
File "/opt/conda/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
func(*targs, **kargs)
File "/home/ubuntu/ray/python/ray/worker.py", line 963, in cleanup
assert(len(processes) == 0)
AssertionError
Error in sys.exitfunc:
Traceback (most recent call last):
File "/opt/conda/lib/python2.7/atexit.py", line 24, in _run_exitfuncs
func(*targs, **kargs)
File "/home/ubuntu/ray/python/ray/worker.py", line 963, in cleanup
assert(len(processes) == 0)
AssertionError
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, this is fixed in the next commit.
Build finished. Test FAILed. |
Test FAILed. |
9d2455a
to
5f732a3
Compare
Merged build finished. Test FAILed. |
Test FAILed. |
python/ray/monitor.py
Outdated
"""Clean up global state for a failed local scheduler. | ||
|
||
Args: | ||
scheduler: The db client ID of the scheduler that failed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docstring is not accurate, right? This method cleans up tasks for all dead local schedulers and not just for a single dead local scheduler, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops yeah.
python/ray/monitor.py
Outdated
Args: | ||
scheduler: The db client ID of the scheduler that failed. | ||
""" | ||
task_ids = self.redis.keys("{prefix}*".format(prefix=TASK_PREFIX)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using keys
is considered bad because it can block the server for a long time, so sometimes scan
is preferred, but the downside of scan
is that the set of things can change while you're iterating over it, so maybe keys
is what we want.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We actually can use scan
!
@@ -958,9 +958,13 @@ def cleanup(worker=global_worker): | |||
{"end_time": time.time()}) | |||
services.cleanup() | |||
else: | |||
# If this is not a driver, make sure there are no orphan processes. | |||
# If this is not a driver, make sure there are no orphan processes, besides | |||
# possibly the worker itself. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would the worker case be different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mostly a sanity check that a worker process didn't start any other processes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But why the <= 1
, why not == 0
?
/* Check for local schedulers that have missed a number of heartbeats. If any | ||
* local schedulers have died, notify others so that the state can be cleaned | ||
* up. */ | ||
/* TODO(swang): If the local scheduler hasn't actually died, then it should |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why put this in the global scheduler and not the monitor?
@@ -11,11 +11,15 @@ | |||
/* The frequency with which the global scheduler checks if there are any tasks | |||
* that haven't been scheduled yet. */ | |||
#define GLOBAL_SCHEDULER_TASK_CLEANUP_MILLISECONDS 100 | |||
#define GLOBAL_SCHEDULER_HEARTBEAT_TIMEOUT 5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a comment saying that 5
refers to the number of missed heartbeats in a row that we're willing to tolerate?
Btw, I suspect this is much too low. This corresponds to a half second delay, and I wouldn't be surprised if that happens frequently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Yeah, I can increase this, say to 10 seconds.
db_client_table_remove(state->db, local_scheduler_ptr->id, NULL, NULL, | ||
NULL); | ||
/* Remove the scheduler from the local state. */ | ||
remove_local_scheduler(state, i); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we really want to always remove the local scheduler. What if we only have one local scheduler (e.g., in the single-node case). Then we should probably not remove it in case the missing heartbeats were a false positive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More generally, if this is the local scheduler on the head node, maybe we don't want to remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we're not handling failure on the head node at all right now. In the future, we should be able to discern between head node and worker node components. It would be a lot to do this as part of this PR though, since handling head node failure means at minimum restarting certain components.
local_scheduler_ptr = | ||
(LocalScheduler *) utarray_eltptr(state->local_schedulers, i); | ||
if (local_scheduler_ptr->num_heartbeats_missed >= | ||
GLOBAL_SCHEDULER_HEARTBEAT_TIMEOUT) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This probably deserves a LOG_WARN
or something, right?
@@ -75,6 +75,7 @@ void local_scheduler_reconstruct_object(LocalSchedulerConnection *conn, | |||
ObjectID object_id) { | |||
write_message(conn->conn, RECONSTRUCT_OBJECT, sizeof(object_id), | |||
(uint8_t *) &object_id); | |||
/* TODO(swang): Propagate the error. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably put a fatal check around this (and all other local_scheduler_client
methods). That can be a different PR though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. It just came up during testing, so I wanted to add the TODO so I didn't forget.
src/plasma/plasma_store.cc
Outdated
utarray_free(queue->object_notifications); | ||
HASH_DEL(plasma_state->pending_notifications, queue); | ||
free(queue); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it also make sense to close client_sock
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
Merged build finished. Test FAILed. |
Test FAILed. |
Merged build finished. Test FAILed. |
Test FAILed. |
Merged build finished. Test PASSed. |
Test PASSed. |
# associated with a node as LOST during cleanup. | ||
self.local_schedulers.add(NIL_ID) | ||
|
||
def subscribe(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Methods like this should eventually be collected into a python "state" module (I started doing this in experimental/state); I hope once we have full flatbuffer coverage we can use the python flatbuffer module to generate methods that read this state automatically (and get rid of code duplication like DBClientID_SIZE = 20)
python/ray/monitor.py
Outdated
|
||
# These variables must be kept in sync with the C codebase. | ||
# common/common.h | ||
DBClientID_SIZE = 20 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change this to DB_CLIENT_ID_SIZE?
entry); | ||
/* The hash entry is shared with the local_scheduler_plasma hashmap and | ||
* will | ||
* be freed there. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this line can be combined with the previous
python/ray/monitor.py
Outdated
|
||
# Read messages from the subscription channel. | ||
while True: | ||
time.sleep(LOCAL_SCHEDULER_HEARTBEAT_TIMEOUT_MILLISECONDS / 1000.) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can replace 1000.
with 1000
, since we are doing from __future__ import division
at the top
@@ -972,6 +1048,11 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, | |||
return REDISMODULE_ERR; | |||
} | |||
|
|||
if (RedisModule_CreateCommand(ctx, "ray.disconnect", Disconnect_RedisCommand, | |||
"write", 0, 0, 0) == REDISMODULE_ERR) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, should both this (and ray.connect
) be "write pubsub"
instead of just "write"
?
First pass at a monitoring script - monitor can detect local scheduler death Clean up task table upon local scheduler death in monitoring script Don't schedule to dead local schedulers in global scheduler Have global scheduler update the db clients table, monitor script cleans up state Documentation Monitor script should scan tables before beginning to read from subscription channel Fix for python3 Redirect monitor output to redis logs, fix hanging in multinode tests
Merged build finished. Test PASSed. |
Test PASSed. |
} | ||
|
||
RedisModuleString *client_type; | ||
RedisModuleString *aux_address; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to call RedisModule_FreeString
on these two strings?
3334d47
to
9bf8519
Compare
Merged build finished. Test PASSed. |
Test PASSed. |
Detect and recover from local scheduler failures. The global scheduler updates the db_clients table after a set number of missed heartbeats from a local scheduler. A monitoring script subscribes to these updates and is responsible for marking all tasks that that local scheduler was responsible for as
TASK_STATUS_LOST
.This does not provide availability if the local scheduler that failed was the one that the driver was connected to.