Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raylet task dispatch and throttling worker startup #1912

Merged
merged 12 commits into from
Apr 18, 2018

Conversation

atumanov
Copy link
Contributor

@atumanov atumanov commented Apr 16, 2018

What do these changes do?

There are two parts to this PR:

  • separation of task placement and local task dispatch decisions in the scheduler
  • worker startup throttling, by keeping track of workers in the process of being started.

Minor:

  • kill raylet_monitor on ray stop

@atumanov atumanov requested a review from stephanie-wang April 16, 2018 21:42
@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4952/
Test FAILed.

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! I left a few low-level comments.

Just so we can start thinking about it, does it make sense to put ScheduleTasks on a timer in the future?

@@ -306,7 +306,7 @@ def stop():
subprocess.call(
[
"killall global_scheduler plasma_store plasma_manager "
"local_scheduler raylet"
"local_scheduler raylet raylet_monitor"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, thanks :)

return;
}
// Early return if there are no resources available.
const ClientID &my_client_id = gcs_client_->client_table().GetLocalClientId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to move this inside the for loop, in case the resources go to zero after assignment of a task?

@@ -386,7 +404,8 @@ void NodeManager::ScheduleTasks() {

// Extract decision for this local scheduler.
std::unordered_set<TaskID, UniqueIDHasher> local_task_ids;
// Iterate over (taskid, clientid) pairs, extract tasks to run on the local client.
// Iterate over (taskid, clientid) pairs, extract tasks assigned to the local node.
// TODO(atumanov): move the assigned tasks to scheduled and call DispatchTasks().
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this TODO still valid?

return static_cast<uint32_t>(actor_pool_.size() + pool_.size());
}

void WorkerPool::StartWorker(bool force_start) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The force_start parameter doesn't seem to be used. Can we remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not used yet, but I think it will be useful soon. The idea here is to let the caller decide if they want to start a worker no matter what (i.e., if we want raylet to pre-start num_cpus workers, without considering in-flight worker pool. Otherwise, starting num_cpus workers will be serialized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, to be clear, the implementation does use this flag.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Couldn't we just do the check for started_worker_pids.empty() outside of WorkerPool::Start() though, through your modified WorkerPool::Size()? And in the pre-start case, we can just call StartWorker without checking the worker pool size at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it and decided against it, because I'd prefer to encapsulate this as an implementation detail and not expose the in-flight worker status through the public interface of the worker pool. I just think it'd be preferable for us to keep thinking about the worker pool as an opaque, self-managing container of workers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that this is how we do it in the local_scheduler, but we're in the proper OO land now :) If I had to choose, I'd rather drop the flag than expose the in-flight worker status. The disadvantage of dropping the flag is that all calls to StartWorker will serialize starting k workers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Okay, I'm fine with this. Can you document the force_start parameter in the header file though?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by the way, with the latest change, the force_start is now used in the constructor to speed up starting num_workers workers.

}
// We have enough resources for this task. Assign task.
// TODO(atumanov): perform the task state/queue transition inside AssignTask.
auto scheduled_tasks =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this something other than scheduled_tasks? It's a little confusing given the other scheduled_tasks local variable here.

@atumanov
Copy link
Contributor Author

retest this please

return static_cast<uint32_t>(actor_pool_.size() + pool_.size());
}

void WorkerPool::StartWorker(bool force_start) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Couldn't we just do the check for started_worker_pids.empty() outside of WorkerPool::Start() though, through your modified WorkerPool::Size()? And in the pre-start case, we can just call StartWorker without checking the worker pool size at all.

cluster_resource_map_[my_client_id].GetAvailableResources();
if (local_resources.IsEmpty()) {
// Early return if there are no resources available.
return;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I just realized there's an issue here where a task that requires zero resources (e.g., an actor task) might not get scheduled. Was that why you had this check outside of the loop at first? Although even having it outside of the loop wouldn't fix this issue...

Not sure what the best approach here is. Should we just always loop through all the tasks for now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

honestly, I think zero-resource tasks are just wrong... How can you have an active thread of execution running without having it acquire at least some CPU resource?
Yes, this is why I was asking about actor tasks. Yes, I think we have no choice but to drop the early termination check.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4963/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4964/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4965/
Test FAILed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4966/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4967/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4973/
Test FAILed.

@@ -16,27 +16,46 @@ WorkerPool::WorkerPool(int num_workers, const std::vector<std::string> &worker_c
// become zombies instead of dying gracefully.
signal(SIGCHLD, SIG_IGN);
for (int i = 0; i < num_workers; i++) {
StartWorker();
// Force-start num_workers workers.
StartWorker(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I learned this tip from Zongheng to write this as StartWorker(/*force_start*/=true) to make it clear to the reader what the parameter is. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this introduce C-style comments? I'd rather not, if there's a cleaner, pythonic way, that'd be great.

WorkerPool::~WorkerPool() {
// Kill all registered workers. NOTE(swang): This assumes that the registered
// workers were started by the pool.
// TODO(atumanov): remove killed workers from the pool.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this TODO mean we should also kill all the PIDs in started_worker_pids_? Either way, we should probably do that here in this PR.

: WorkerPool(worker_command) {}

void StartWorker(pid_t pid, bool force_start = false) {
AddStartedWorker(pid);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably just want to keep the second call to AddStartedWorker, right?

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4971/
Test PASSed.

Copy link
Contributor

@stephanie-wang stephanie-wang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove the TODO that you fixed? Looks good!

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4975/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4977/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4984/
Test PASSed.

@stephanie-wang stephanie-wang merged commit 1c965fc into ray-project:master Apr 18, 2018
@stephanie-wang stephanie-wang deleted the raylet_task_dispatch branch April 18, 2018 17:58
royf added a commit to royf/ray that referenced this pull request Apr 22, 2018
* master:
  Handle interrupts correctly for ASIO synchronous reads and writes. (ray-project#1929)
  [DataFrame] Adding read methods and tests (ray-project#1712)
  Allow task_table_update to fail when tasks are finished. (ray-project#1927)
  [rllib] Contribute DDPG to RLlib (ray-project#1877)
  [xray] Workers blocked in a `ray.get` release their resources (ray-project#1920)
  Raylet task dispatch and throttling worker startup (ray-project#1912)
  [DataFrame] Eval fix (ray-project#1903)
  [tune] Polishing docs (ray-project#1846)
  [tune] [rllib] Automatically determine RLlib resources and add queueing mechanism for autoscaling (ray-project#1848)
  Preemptively push local arguments for actor tasks (ray-project#1901)
  [tune] Allow fetching pinned objects from trainable functions (ray-project#1895)
  Multithreading refactor for ObjectManager. (ray-project#1911)
  Add slice functionality (ray-project#1832)
  [DataFrame] Pass read_csv kwargs to _infer_column (ray-project#1894)
  Addresses missed comments from multichunk object transfer PR. (ray-project#1908)
  Allow numpy arrays to be passed by value into tasks (and inlined in the task spec). (ray-project#1816)
  [xray] Lineage cache requests notifications from the GCS about remote tasks (ray-project#1834)
  Fix UI issue for non-json-serializable task arguments. (ray-project#1892)
  Remove unnecessary calls to .hex() for object IDs. (ray-project#1910)
  Allow multiple raylets to be started on a single machine. (ray-project#1904)

# Conflicts:
#	python/ray/rllib/__init__.py
#	python/ray/rllib/dqn/dqn.py
alok added a commit to alok/ray that referenced this pull request Apr 28, 2018
* master:
  updates (ray-project#1958)
  Pin Cython in autoscaler development example. (ray-project#1951)
  Incorporate C++ Buffer management and Seal global threadpool fix from arrow (ray-project#1950)
  [XRay] Add consistency check for protocol between node_manager and local_scheduler_client (ray-project#1944)
  Remove smart_open install. (ray-project#1943)
  [DataFrame] Fully implement append, concat and join (ray-project#1932)
  [DataFrame] Fix for __getitem__ string indexing (ray-project#1939)
  [DataFrame] Implementing write methods (ray-project#1918)
  [rllib] arr[end] was excluded when end is not None (ray-project#1931)
  [DataFrame] Implementing API correct groupby with aggregation methods (ray-project#1914)
  Handle interrupts correctly for ASIO synchronous reads and writes. (ray-project#1929)
  [DataFrame] Adding read methods and tests (ray-project#1712)
  Allow task_table_update to fail when tasks are finished. (ray-project#1927)
  [rllib] Contribute DDPG to RLlib (ray-project#1877)
  [xray] Workers blocked in a `ray.get` release their resources (ray-project#1920)
  Raylet task dispatch and throttling worker startup (ray-project#1912)
  [DataFrame] Eval fix (ray-project#1903)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants