-
Notifications
You must be signed in to change notification settings - Fork 6.5k
Dynamically grow worker pool to partially solve hanging workloads #286
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Dynamically grow worker pool to partially solve hanging workloads #286
Conversation
Can one of the admins verify this patch? |
utarray_erase(algorithm_state->executing_workers, i, 1); | ||
break; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably worth adding a check that we actually removed it for now
test/runtest.py
Outdated
@@ -1059,6 +1059,21 @@ def run_nested2(): | |||
|
|||
ray.worker.cleanup() | |||
|
|||
def testWorkerDeadlock(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't the test need to call ray.init
at some point?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does (line 1072)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh somehow I missed that :)
python/ray/worker.py
Outdated
@@ -504,6 +504,7 @@ def get_object(self, object_ids): | |||
# their original index in the object_ids argument. | |||
unready_ids = dict((object_id, i) for (i, (object_id, val)) in | |||
enumerate(final_results) if val is None) | |||
was_blocked = (unready_ids > 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be len(unready_ids)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh oops, nice catch!
* available. Else, it will take from the dynamic resources available. | ||
* @return Void. | ||
*/ | ||
void update_dynamic_resources(local_scheduler_state *state, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As @atumanov mentioned, we should call this method in kill_worker
to reclaim the resources from the task that was running on that worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yes, there were a couple things that were missing from kill_worker
, like updating the task table. I want to make that a separate PR because I think that functionality should have more testing independent of this one. I can add a TODO though.
Let's add a test starting with 0 workers, e.g., ray.init(num_workers=0)
@ray.remote
def f():
return 1
# Make sure we can call a remote function. This will require starting a new worker.
ray.get(f.remote())
ray.get([f.remote() for _ in range(100)]) Let's also add a test where a worker is blocked for a while so that import ray
import time
ray.init(num_workers=0, num_cpus=100)
@ray.remote
def f():
time.sleep(3)
@ray.remote
def g():
ray.get([f.remote() for _ in range(10)])
ray.get(g.remote()) |
dc3b9c2
to
ed0776d
Compare
@@ -187,7 +190,9 @@ void free_local_scheduler(local_scheduler_state *state) { | |||
*/ | |||
void start_worker(local_scheduler_state *state, actor_id actor_id) { | |||
/* We can't start a worker if we don't have the path to the worker script. */ | |||
CHECK(state->config.start_worker_command != NULL); | |||
if (state->config.start_worker_command == NULL) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe give another warning here (you gave one at the beginning, but it might be lost in the logs now)
70ec4f5
to
5e6f9a7
Compare
This is a barebones policy that implements a worker pool: Whenever a task can be assigned (there are enough resources, the inputs are ready, etc.), and the pool of available workers is empty, the local scheduler replenishes the pool with a new worker.
This pull request also accounts for workers blocked on an object that isn't locally available. The local scheduler counts these workers as having temporarily returned their resources, allowing other tasks to run.