-
Notifications
You must be signed in to change notification settings - Fork 6.2k
[xray] Workers blocked in a ray.get
release their resources
#1920
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
29e8077
to
9192d9c
Compare
} | ||
} | ||
return true; | ||
return not oversubscribed; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you do that? don't you need to do !oversubscribed
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I didn't even realize I did that...apparently you can: http://en.cppreference.com/w/cpp/keyword/not.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change it to !oversubscribed
? I think that will be a lot more familiar to people.
src/ray/raylet/node_manager.cc
Outdated
const auto &task = tasks.front(); | ||
RAY_CHECK( | ||
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release( | ||
task.GetTaskSpecification().GetRequiredResources())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The behavior in legacy Ray is to release only CPU resources. I think we probably want to preserve that behavior.
src/ray/raylet/node_manager.cc
Outdated
|
||
auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()}); | ||
const auto &task = tasks.front(); | ||
bool ok = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of ok
, can we call this not_oversubscribed
?
// TODO(atumanov): Return failure if attempting to perform vector | ||
// subtraction with unknown labels. | ||
resource_capacity_[resource_label] -= resource_capacity; | ||
if (resource_capacity_.count(resource_label) < 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be resource_capacity_[resource_label] < 0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
resource_capacity_[resource_label] -= resource_capacity; | ||
// TODO(atumanov): Return failure if attempting to perform vector | ||
// subtraction with unknown labels. | ||
resource_capacity_[resource_label] -= resource_capacity; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to handle the case where resource_capacity_.count(resource_label) == 0)
(that is the local scheduler is not aware of this resource). Or is that case impossible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm yeah, it seemed like Alexey had left that as a todo, but I can just put a check here for now. I think we should just do a fatal check for that for now...
Test PASSed. |
Test FAILed. |
src/ray/raylet/node_manager.cc
Outdated
// it acquired for its assigned task while it is blocked. The resources | ||
// will be acquired again once the worker is unblocked. | ||
std::shared_ptr<Worker> worker = worker_pool_.GetRegisteredWorker(client); | ||
if (worker) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can combine these two if statements into one if (worker && !worker->IsBlocked())
. I personally would take it a step further and use the opposite logic with early exit : if (!worker || worker->IsBlocked()) break;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer not to use break;
statements whenever possible, since it is usually less error-prone if you return in only one place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I always prefer structuring my code with early termination logic. That way you don't have to scroll through code searching for any else
s or any other code paths executed for all cases. Early termination logic is easier to follow. It also saves on indentation and is more assembly-like.
src/ray/raylet/node_manager.cc
Outdated
// Get the CPU resources required by the running task. | ||
const auto required_resources = | ||
task.GetTaskSpecification().GetRequiredResources(); | ||
double required_cpus; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
initialize required_cpus
to zero. Valgrind will legitimately complain when you pass a pointer to uninitialized memory .
src/ray/raylet/node_manager.cc
Outdated
auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()}); | ||
const auto &task = tasks.front(); | ||
// Get the CPU resources required by the running task. | ||
const auto required_resources = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you probably want a const ref here const auto &required_resources
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't do that because GetRequiredResources()
returns a ResourceSet
and not a ResourceSet &
, since the data structure is built on the fly.
auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()}); | ||
const auto &task = tasks.front(); | ||
// Get the CPU resources required by the running task. | ||
const auto required_resources = task.GetTaskSpecification().GetRequiredResources(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
const ref: const auto &
src/ray/raylet/node_manager.cc
Outdated
// Get the CPU resources required by the running task. | ||
const auto required_resources = task.GetTaskSpecification().GetRequiredResources(); | ||
double required_cpus; | ||
RAY_CHECK(required_resources.GetResource(kCPU_ResourceLabel, required_cpus)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, bad, &required_cpus
!
Test PASSed. |
Test FAILed. |
@@ -37,6 +40,9 @@ class Worker { | |||
TaskID assigned_task_id_; | |||
/// The worker's actor ID. If this is nil, then the worker is not an actor. | |||
ActorID actor_id_; | |||
/// Whether the worker is blocked. Workers become blocked in a `ray.get`, if | |||
/// they require a data dependency while executing a task. | |||
bool blocked_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ideally, we would make it enum WorkerState
and transition the worker through its finite state diagram: STARTING -> REGISTERED->READY->EXECUTING->BLOCKED. Blocked state is just one possibility. That said, I think the only use case this state is currently useful for is tracking if it's blocked or not.
src/ray/raylet/node_manager.cc
Outdated
bool not_oversubscribed = | ||
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Acquire( | ||
ResourceSet(cpu_resources)); | ||
if (!not_oversubscribed) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor (ok to ignore) : would be good to get rid of the double negative, by using oversubscribed
variable instead (and changing the logic to match).
src/ray/raylet/scheduling_queue.h
Outdated
@@ -97,6 +103,12 @@ class SchedulingQueue { | |||
/// \param tasks The tasks to queue. | |||
void QueueRunningTasks(const std::vector<Task> &tasks); | |||
|
|||
/// Queue tasks in the blocked state. These are tasks that have been | |||
/// dispatched to a worker but are blocked on a missing data dependency. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: this description sounds like a pending task (pending missing data dependencies). Maybe qualify the description by saying "but are blocked on a missing data dependency discovered at runtime"?
actor_id_(ActorID::nil()), | ||
blocked_(false) {} | ||
|
||
void Worker::MarkBlocked() { blocked_ = true; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: ToggleBlocked(true/false)
to save on methods. But we can clean it up if we decide to go with worker states later.
Test PASSed. |
Test PASSed. |
Test PASSed. |
Test PASSed. |
* master: Handle interrupts correctly for ASIO synchronous reads and writes. (ray-project#1929) [DataFrame] Adding read methods and tests (ray-project#1712) Allow task_table_update to fail when tasks are finished. (ray-project#1927) [rllib] Contribute DDPG to RLlib (ray-project#1877) [xray] Workers blocked in a `ray.get` release their resources (ray-project#1920) Raylet task dispatch and throttling worker startup (ray-project#1912) [DataFrame] Eval fix (ray-project#1903) [tune] Polishing docs (ray-project#1846) [tune] [rllib] Automatically determine RLlib resources and add queueing mechanism for autoscaling (ray-project#1848) Preemptively push local arguments for actor tasks (ray-project#1901) [tune] Allow fetching pinned objects from trainable functions (ray-project#1895) Multithreading refactor for ObjectManager. (ray-project#1911) Add slice functionality (ray-project#1832) [DataFrame] Pass read_csv kwargs to _infer_column (ray-project#1894) Addresses missed comments from multichunk object transfer PR. (ray-project#1908) Allow numpy arrays to be passed by value into tasks (and inlined in the task spec). (ray-project#1816) [xray] Lineage cache requests notifications from the GCS about remote tasks (ray-project#1834) Fix UI issue for non-json-serializable task arguments. (ray-project#1892) Remove unnecessary calls to .hex() for object IDs. (ray-project#1910) Allow multiple raylets to be started on a single machine. (ray-project#1904) # Conflicts: # python/ray/rllib/__init__.py # python/ray/rllib/dqn/dqn.py
* master: updates (ray-project#1958) Pin Cython in autoscaler development example. (ray-project#1951) Incorporate C++ Buffer management and Seal global threadpool fix from arrow (ray-project#1950) [XRay] Add consistency check for protocol between node_manager and local_scheduler_client (ray-project#1944) Remove smart_open install. (ray-project#1943) [DataFrame] Fully implement append, concat and join (ray-project#1932) [DataFrame] Fix for __getitem__ string indexing (ray-project#1939) [DataFrame] Implementing write methods (ray-project#1918) [rllib] arr[end] was excluded when end is not None (ray-project#1931) [DataFrame] Implementing API correct groupby with aggregation methods (ray-project#1914) Handle interrupts correctly for ASIO synchronous reads and writes. (ray-project#1929) [DataFrame] Adding read methods and tests (ray-project#1712) Allow task_table_update to fail when tasks are finished. (ray-project#1927) [rllib] Contribute DDPG to RLlib (ray-project#1877) [xray] Workers blocked in a `ray.get` release their resources (ray-project#1920) Raylet task dispatch and throttling worker startup (ray-project#1912) [DataFrame] Eval fix (ray-project#1903)
What do these changes do?
A worker that is assigned a task and blocked in a
ray.get
should release its resources to allow another task to run. Without this, with the following task, you cannot have a recursion deeper than the number of cores available:Related issue number
This is similar to #286, but for the Raylet.