Skip to content

[xray] Workers blocked in a ray.get release their resources #1920

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 19, 2018

Conversation

stephanie-wang
Copy link
Contributor

What do these changes do?

A worker that is assigned a task and blocked in a ray.get should release its resources to allow another task to run. Without this, with the following task, you cannot have a recursion deeper than the number of cores available:

def recurse(i):
  if i == 0:
    return i
  return ray.get(recurse.remote(i - 1))

Related issue number

This is similar to #286, but for the Raylet.

}
}
return true;
return not oversubscribed;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you do that? don't you need to do !oversubscribed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't even realize I did that...apparently you can: http://en.cppreference.com/w/cpp/keyword/not.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change it to !oversubscribed? I think that will be a lot more familiar to people.

const auto &task = tasks.front();
RAY_CHECK(
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
task.GetTaskSpecification().GetRequiredResources()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior in legacy Ray is to release only CPU resources. I think we probably want to preserve that behavior.


auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()});
const auto &task = tasks.front();
bool ok =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of ok, can we call this not_oversubscribed?

// TODO(atumanov): Return failure if attempting to perform vector
// subtraction with unknown labels.
resource_capacity_[resource_label] -= resource_capacity;
if (resource_capacity_.count(resource_label) < 0) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be resource_capacity_[resource_label] < 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

resource_capacity_[resource_label] -= resource_capacity;
// TODO(atumanov): Return failure if attempting to perform vector
// subtraction with unknown labels.
resource_capacity_[resource_label] -= resource_capacity;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to handle the case where resource_capacity_.count(resource_label) == 0) (that is the local scheduler is not aware of this resource). Or is that case impossible?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm yeah, it seemed like Alexey had left that as a todo, but I can just put a check here for now. I think we should just do a fatal check for that for now...

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4986/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4987/
Test FAILed.

// it acquired for its assigned task while it is blocked. The resources
// will be acquired again once the worker is unblocked.
std::shared_ptr<Worker> worker = worker_pool_.GetRegisteredWorker(client);
if (worker) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can combine these two if statements into one if (worker && !worker->IsBlocked()). I personally would take it a step further and use the opposite logic with early exit : if (!worker || worker->IsBlocked()) break;

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer not to use break; statements whenever possible, since it is usually less error-prone if you return in only one place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I always prefer structuring my code with early termination logic. That way you don't have to scroll through code searching for any elses or any other code paths executed for all cases. Early termination logic is easier to follow. It also saves on indentation and is more assembly-like.

// Get the CPU resources required by the running task.
const auto required_resources =
task.GetTaskSpecification().GetRequiredResources();
double required_cpus;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initialize required_cpus to zero. Valgrind will legitimately complain when you pass a pointer to uninitialized memory .

auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()});
const auto &task = tasks.front();
// Get the CPU resources required by the running task.
const auto required_resources =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you probably want a const ref here const auto &required_resources

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't do that because GetRequiredResources() returns a ResourceSet and not a ResourceSet &, since the data structure is built on the fly.

auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()});
const auto &task = tasks.front();
// Get the CPU resources required by the running task.
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const ref: const auto &

// Get the CPU resources required by the running task.
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
double required_cpus;
RAY_CHECK(required_resources.GetResource(kCPU_ResourceLabel, required_cpus));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, bad, &required_cpus !

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4988/
Test PASSed.

@AmplabJenkins
Copy link

Test FAILed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4990/
Test FAILed.

@@ -37,6 +40,9 @@ class Worker {
TaskID assigned_task_id_;
/// The worker's actor ID. If this is nil, then the worker is not an actor.
ActorID actor_id_;
/// Whether the worker is blocked. Workers become blocked in a `ray.get`, if
/// they require a data dependency while executing a task.
bool blocked_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally, we would make it enum WorkerState and transition the worker through its finite state diagram: STARTING -> REGISTERED->READY->EXECUTING->BLOCKED. Blocked state is just one possibility. That said, I think the only use case this state is currently useful for is tracking if it's blocked or not.

bool not_oversubscribed =
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Acquire(
ResourceSet(cpu_resources));
if (!not_oversubscribed) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor (ok to ignore) : would be good to get rid of the double negative, by using oversubscribed variable instead (and changing the logic to match).

@@ -97,6 +103,12 @@ class SchedulingQueue {
/// \param tasks The tasks to queue.
void QueueRunningTasks(const std::vector<Task> &tasks);

/// Queue tasks in the blocked state. These are tasks that have been
/// dispatched to a worker but are blocked on a missing data dependency.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: this description sounds like a pending task (pending missing data dependencies). Maybe qualify the description by saying "but are blocked on a missing data dependency discovered at runtime"?

actor_id_(ActorID::nil()),
blocked_(false) {}

void Worker::MarkBlocked() { blocked_ = true; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: ToggleBlocked(true/false) to save on methods. But we can clean it up if we decide to go with worker states later.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4991/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4992/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4995/
Test PASSed.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/Ray-PRB/4996/
Test PASSed.

@atumanov atumanov merged commit aa07f1c into ray-project:master Apr 19, 2018
@atumanov atumanov deleted the blocked-workers branch April 19, 2018 04:00
royf added a commit to royf/ray that referenced this pull request Apr 22, 2018
* master:
  Handle interrupts correctly for ASIO synchronous reads and writes. (ray-project#1929)
  [DataFrame] Adding read methods and tests (ray-project#1712)
  Allow task_table_update to fail when tasks are finished. (ray-project#1927)
  [rllib] Contribute DDPG to RLlib (ray-project#1877)
  [xray] Workers blocked in a `ray.get` release their resources (ray-project#1920)
  Raylet task dispatch and throttling worker startup (ray-project#1912)
  [DataFrame] Eval fix (ray-project#1903)
  [tune] Polishing docs (ray-project#1846)
  [tune] [rllib] Automatically determine RLlib resources and add queueing mechanism for autoscaling (ray-project#1848)
  Preemptively push local arguments for actor tasks (ray-project#1901)
  [tune] Allow fetching pinned objects from trainable functions (ray-project#1895)
  Multithreading refactor for ObjectManager. (ray-project#1911)
  Add slice functionality (ray-project#1832)
  [DataFrame] Pass read_csv kwargs to _infer_column (ray-project#1894)
  Addresses missed comments from multichunk object transfer PR. (ray-project#1908)
  Allow numpy arrays to be passed by value into tasks (and inlined in the task spec). (ray-project#1816)
  [xray] Lineage cache requests notifications from the GCS about remote tasks (ray-project#1834)
  Fix UI issue for non-json-serializable task arguments. (ray-project#1892)
  Remove unnecessary calls to .hex() for object IDs. (ray-project#1910)
  Allow multiple raylets to be started on a single machine. (ray-project#1904)

# Conflicts:
#	python/ray/rllib/__init__.py
#	python/ray/rllib/dqn/dqn.py
alok added a commit to alok/ray that referenced this pull request Apr 28, 2018
* master:
  updates (ray-project#1958)
  Pin Cython in autoscaler development example. (ray-project#1951)
  Incorporate C++ Buffer management and Seal global threadpool fix from arrow (ray-project#1950)
  [XRay] Add consistency check for protocol between node_manager and local_scheduler_client (ray-project#1944)
  Remove smart_open install. (ray-project#1943)
  [DataFrame] Fully implement append, concat and join (ray-project#1932)
  [DataFrame] Fix for __getitem__ string indexing (ray-project#1939)
  [DataFrame] Implementing write methods (ray-project#1918)
  [rllib] arr[end] was excluded when end is not None (ray-project#1931)
  [DataFrame] Implementing API correct groupby with aggregation methods (ray-project#1914)
  Handle interrupts correctly for ASIO synchronous reads and writes. (ray-project#1929)
  [DataFrame] Adding read methods and tests (ray-project#1712)
  Allow task_table_update to fail when tasks are finished. (ray-project#1927)
  [rllib] Contribute DDPG to RLlib (ray-project#1877)
  [xray] Workers blocked in a `ray.get` release their resources (ray-project#1920)
  Raylet task dispatch and throttling worker startup (ray-project#1912)
  [DataFrame] Eval fix (ray-project#1903)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants