-
Notifications
You must be signed in to change notification settings - Fork 6.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Raylet task dispatch and throttling worker startup #1912
Raylet task dispatch and throttling worker startup #1912
Conversation
…h locally available resournces
…extraneous workers
Test FAILed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! I left a few low-level comments.
Just so we can start thinking about it, does it make sense to put ScheduleTasks
on a timer in the future?
@@ -306,7 +306,7 @@ def stop(): | |||
subprocess.call( | |||
[ | |||
"killall global_scheduler plasma_store plasma_manager " | |||
"local_scheduler raylet" | |||
"local_scheduler raylet raylet_monitor" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, thanks :)
src/ray/raylet/node_manager.cc
Outdated
return; | ||
} | ||
// Early return if there are no resources available. | ||
const ClientID &my_client_id = gcs_client_->client_table().GetLocalClientId(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it make sense to move this inside the for loop, in case the resources go to zero after assignment of a task?
src/ray/raylet/node_manager.cc
Outdated
@@ -386,7 +404,8 @@ void NodeManager::ScheduleTasks() { | |||
|
|||
// Extract decision for this local scheduler. | |||
std::unordered_set<TaskID, UniqueIDHasher> local_task_ids; | |||
// Iterate over (taskid, clientid) pairs, extract tasks to run on the local client. | |||
// Iterate over (taskid, clientid) pairs, extract tasks assigned to the local node. | |||
// TODO(atumanov): move the assigned tasks to scheduled and call DispatchTasks(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this TODO still valid?
src/ray/raylet/worker_pool.cc
Outdated
return static_cast<uint32_t>(actor_pool_.size() + pool_.size()); | ||
} | ||
|
||
void WorkerPool::StartWorker(bool force_start) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The force_start
parameter doesn't seem to be used. Can we remove this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's not used yet, but I think it will be useful soon. The idea here is to let the caller decide if they want to start a worker no matter what (i.e., if we want raylet to pre-start num_cpus
workers, without considering in-flight worker pool. Otherwise, starting num_cpus
workers will be serialized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, to be clear, the implementation does use this flag.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Couldn't we just do the check for started_worker_pids.empty()
outside of WorkerPool::Start()
though, through your modified WorkerPool::Size()
? And in the pre-start case, we can just call StartWorker
without checking the worker pool size at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about it and decided against it, because I'd prefer to encapsulate this as an implementation detail and not expose the in-flight worker status through the public interface of the worker pool. I just think it'd be preferable for us to keep thinking about the worker pool as an opaque, self-managing container of workers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know that this is how we do it in the local_scheduler, but we're in the proper OO land now :) If I had to choose, I'd rather drop the flag than expose the in-flight worker status. The disadvantage of dropping the flag is that all calls to StartWorker
will serialize starting k workers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see. Okay, I'm fine with this. Can you document the force_start
parameter in the header file though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by the way, with the latest change, the force_start
is now used in the constructor to speed up starting num_workers
workers.
src/ray/raylet/node_manager.cc
Outdated
} | ||
// We have enough resources for this task. Assign task. | ||
// TODO(atumanov): perform the task state/queue transition inside AssignTask. | ||
auto scheduled_tasks = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we call this something other than scheduled_tasks
? It's a little confusing given the other scheduled_tasks
local variable here.
retest this please |
src/ray/raylet/worker_pool.cc
Outdated
return static_cast<uint32_t>(actor_pool_.size() + pool_.size()); | ||
} | ||
|
||
void WorkerPool::StartWorker(bool force_start) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Couldn't we just do the check for started_worker_pids.empty()
outside of WorkerPool::Start()
though, through your modified WorkerPool::Size()
? And in the pre-start case, we can just call StartWorker
without checking the worker pool size at all.
src/ray/raylet/node_manager.cc
Outdated
cluster_resource_map_[my_client_id].GetAvailableResources(); | ||
if (local_resources.IsEmpty()) { | ||
// Early return if there are no resources available. | ||
return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I just realized there's an issue here where a task that requires zero resources (e.g., an actor task) might not get scheduled. Was that why you had this check outside of the loop at first? Although even having it outside of the loop wouldn't fix this issue...
Not sure what the best approach here is. Should we just always loop through all the tasks for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
honestly, I think zero-resource tasks are just wrong... How can you have an active thread of execution running without having it acquire at least some CPU resource?
Yes, this is why I was asking about actor tasks. Yes, I think we have no choice but to drop the early termination check.
Test PASSed. |
Test PASSed. |
Test FAILed. |
Test PASSed. |
Test PASSed. |
Test FAILed. |
@@ -16,27 +16,46 @@ WorkerPool::WorkerPool(int num_workers, const std::vector<std::string> &worker_c | |||
// become zombies instead of dying gracefully. | |||
signal(SIGCHLD, SIG_IGN); | |||
for (int i = 0; i < num_workers; i++) { | |||
StartWorker(); | |||
// Force-start num_workers workers. | |||
StartWorker(true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I learned this tip from Zongheng to write this as StartWorker(/*force_start*/=true)
to make it clear to the reader what the parameter is. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doesn't this introduce C-style comments? I'd rather not, if there's a cleaner, pythonic way, that'd be great.
src/ray/raylet/worker_pool.cc
Outdated
WorkerPool::~WorkerPool() { | ||
// Kill all registered workers. NOTE(swang): This assumes that the registered | ||
// workers were started by the pool. | ||
// TODO(atumanov): remove killed workers from the pool. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this TODO mean we should also kill all the PIDs in started_worker_pids_
? Either way, we should probably do that here in this PR.
src/ray/raylet/worker_pool_test.cc
Outdated
: WorkerPool(worker_command) {} | ||
|
||
void StartWorker(pid_t pid, bool force_start = false) { | ||
AddStartedWorker(pid); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably just want to keep the second call to AddStartedWorker
, right?
Test PASSed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you remove the TODO that you fixed? Looks good!
Test PASSed. |
Test PASSed. |
Test PASSed. |
* master: Handle interrupts correctly for ASIO synchronous reads and writes. (ray-project#1929) [DataFrame] Adding read methods and tests (ray-project#1712) Allow task_table_update to fail when tasks are finished. (ray-project#1927) [rllib] Contribute DDPG to RLlib (ray-project#1877) [xray] Workers blocked in a `ray.get` release their resources (ray-project#1920) Raylet task dispatch and throttling worker startup (ray-project#1912) [DataFrame] Eval fix (ray-project#1903) [tune] Polishing docs (ray-project#1846) [tune] [rllib] Automatically determine RLlib resources and add queueing mechanism for autoscaling (ray-project#1848) Preemptively push local arguments for actor tasks (ray-project#1901) [tune] Allow fetching pinned objects from trainable functions (ray-project#1895) Multithreading refactor for ObjectManager. (ray-project#1911) Add slice functionality (ray-project#1832) [DataFrame] Pass read_csv kwargs to _infer_column (ray-project#1894) Addresses missed comments from multichunk object transfer PR. (ray-project#1908) Allow numpy arrays to be passed by value into tasks (and inlined in the task spec). (ray-project#1816) [xray] Lineage cache requests notifications from the GCS about remote tasks (ray-project#1834) Fix UI issue for non-json-serializable task arguments. (ray-project#1892) Remove unnecessary calls to .hex() for object IDs. (ray-project#1910) Allow multiple raylets to be started on a single machine. (ray-project#1904) # Conflicts: # python/ray/rllib/__init__.py # python/ray/rllib/dqn/dqn.py
* master: updates (ray-project#1958) Pin Cython in autoscaler development example. (ray-project#1951) Incorporate C++ Buffer management and Seal global threadpool fix from arrow (ray-project#1950) [XRay] Add consistency check for protocol between node_manager and local_scheduler_client (ray-project#1944) Remove smart_open install. (ray-project#1943) [DataFrame] Fully implement append, concat and join (ray-project#1932) [DataFrame] Fix for __getitem__ string indexing (ray-project#1939) [DataFrame] Implementing write methods (ray-project#1918) [rllib] arr[end] was excluded when end is not None (ray-project#1931) [DataFrame] Implementing API correct groupby with aggregation methods (ray-project#1914) Handle interrupts correctly for ASIO synchronous reads and writes. (ray-project#1929) [DataFrame] Adding read methods and tests (ray-project#1712) Allow task_table_update to fail when tasks are finished. (ray-project#1927) [rllib] Contribute DDPG to RLlib (ray-project#1877) [xray] Workers blocked in a `ray.get` release their resources (ray-project#1920) Raylet task dispatch and throttling worker startup (ray-project#1912) [DataFrame] Eval fix (ray-project#1903)
What do these changes do?
There are two parts to this PR:
Minor:
ray stop