Skip to content

Dynamically grow worker pool to partially solve hanging workloads #286

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Feb 18, 2017

Conversation

stephanie-wang
Copy link
Contributor

This is a barebones policy that implements a worker pool: Whenever a task can be assigned (there are enough resources, the inputs are ready, etc.), and the pool of available workers is empty, the local scheduler replenishes the pool with a new worker.

This pull request also accounts for workers blocked on an object that isn't locally available. The local scheduler counts these workers as having temporarily returned their resources, allowing other tasks to run.

@stephanie-wang stephanie-wang changed the title First pass at a policy to solve deadlock [WIP] First pass at a policy to solve deadlock Feb 16, 2017
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

utarray_erase(algorithm_state->executing_workers, i, 1);
break;
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably worth adding a check that we actually removed it for now

test/runtest.py Outdated
@@ -1059,6 +1059,21 @@ def run_nested2():

ray.worker.cleanup()

def testWorkerDeadlock(self):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the test need to call ray.init at some point?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does (line 1072)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh somehow I missed that :)

@@ -504,6 +504,7 @@ def get_object(self, object_ids):
# their original index in the object_ids argument.
unready_ids = dict((object_id, i) for (i, (object_id, val)) in
enumerate(final_results) if val is None)
was_blocked = (unready_ids > 0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be len(unready_ids)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh oops, nice catch!

* available. Else, it will take from the dynamic resources available.
* @return Void.
*/
void update_dynamic_resources(local_scheduler_state *state,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @atumanov mentioned, we should call this method in kill_worker to reclaim the resources from the task that was running on that worker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, there were a couple things that were missing from kill_worker, like updating the task table. I want to make that a separate PR because I think that functionality should have more testing independent of this one. I can add a TODO though.

@robertnishihara
Copy link
Collaborator

Let's add a test starting with 0 workers, e.g.,

ray.init(num_workers=0)

@ray.remote
def f():
  return 1

# Make sure we can call a remote function. This will require starting a new worker.
ray.get(f.remote())

ray.get([f.remote() for _ in range(100)])

Let's also add a test where a worker is blocked for a while so that photon_client.notify_unblocked() gets called multiple times, e.g.,

import ray
import time

ray.init(num_workers=0, num_cpus=100)

@ray.remote
def f():
  time.sleep(3)

@ray.remote
def g():
  ray.get([f.remote() for _ in range(10)])

ray.get(g.remote())

@stephanie-wang stephanie-wang changed the title [WIP] First pass at a policy to solve deadlock Dynamically grown worker pool to partially solve hanging workloads Feb 16, 2017
@stephanie-wang stephanie-wang changed the title Dynamically grown worker pool to partially solve hanging workloads Dynamically grow worker pool to partially solve hanging workloads Feb 16, 2017
@@ -187,7 +190,9 @@ void free_local_scheduler(local_scheduler_state *state) {
*/
void start_worker(local_scheduler_state *state, actor_id actor_id) {
/* We can't start a worker if we don't have the path to the worker script. */
CHECK(state->config.start_worker_command != NULL);
if (state->config.start_worker_command == NULL) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe give another warning here (you gave one at the beginning, but it might be lost in the logs now)

@robertnishihara robertnishihara merged commit a0dd3a4 into ray-project:master Feb 18, 2017
@robertnishihara robertnishihara deleted the deadlock-policy branch February 18, 2017 01:08
Catch-Bull pushed a commit that referenced this pull request May 17, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants