Skip to content

[xray] Workers blocked in a ray.get release their resources #1920

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Apr 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions python/ray/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,7 @@ def start_raylet(redis_address,
plasma_store_name,
worker_path,
resources=None,
num_workers=0,
stdout_file=None,
stderr_file=None,
cleanup=True):
Expand Down Expand Up @@ -956,8 +957,15 @@ def start_raylet(redis_address,
plasma_store_name, raylet_name, redis_address))

command = [
RAYLET_EXECUTABLE, raylet_name, plasma_store_name, node_ip_address,
gcs_ip_address, gcs_port, start_worker_command, resource_argument
RAYLET_EXECUTABLE,
raylet_name,
plasma_store_name,
node_ip_address,
gcs_ip_address,
gcs_port,
str(num_workers),
start_worker_command,
resource_argument,
]
pid = subprocess.Popen(command, stdout=stdout_file, stderr=stderr_file)

Expand Down Expand Up @@ -1471,6 +1479,7 @@ def start_ray_processes(address_info=None,
object_store_addresses[i].name,
worker_path,
resources=resources[i],
num_workers=workers_per_local_scheduler[i],
stdout_file=raylet_stdout_file,
stderr_file=raylet_stderr_file,
cleanup=cleanup))
Expand Down
2 changes: 1 addition & 1 deletion python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ def get_object(self, object_ids):

# If there were objects that we weren't able to get locally, let the
# local scheduler know that we're now unblocked.
if was_blocked and not self.use_raylet:
if was_blocked:
self.local_scheduler_client.notify_unblocked()

assert len(final_results) == len(object_ids)
Expand Down
9 changes: 5 additions & 4 deletions src/ray/raylet/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,16 @@

#ifndef RAYLET_TEST
int main(int argc, char *argv[]) {
RAY_CHECK(argc == 8);
RAY_CHECK(argc == 9);

const std::string raylet_socket_name = std::string(argv[1]);
const std::string store_socket_name = std::string(argv[2]);
const std::string node_ip_address = std::string(argv[3]);
const std::string redis_address = std::string(argv[4]);
int redis_port = std::stoi(argv[5]);
const std::string worker_command = std::string(argv[6]);
const std::string static_resource_list = std::string(argv[7]);
int num_initial_workers = std::stoi(argv[6]);
const std::string worker_command = std::string(argv[7]);
const std::string static_resource_list = std::string(argv[8]);

// Configuration for the node manager.
ray::raylet::NodeManagerConfig node_manager_config;
Expand All @@ -33,7 +34,7 @@ int main(int argc, char *argv[]) {
ray::raylet::ResourceSet(std::move(static_resource_conf));
RAY_LOG(INFO) << "Starting raylet with static resource configuration: "
<< node_manager_config.resource_config.ToString();
node_manager_config.num_initial_workers = 0;
node_manager_config.num_initial_workers = num_initial_workers;
// Use a default worker that can execute empty tasks with dependencies.

std::stringstream worker_command_stream(worker_command);
Expand Down
59 changes: 59 additions & 0 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,65 @@ void NodeManager::ProcessClientMessage(std::shared_ptr<LocalClientConnection> cl
ObjectID object_id = from_flatbuf(*message->object_id());
RAY_LOG(DEBUG) << "reconstructing object " << object_id;
RAY_CHECK_OK(object_manager_.Pull(object_id));

// If the blocked client is a worker, and the worker isn't already blocked,
// then release any CPU resources that it acquired for its assigned task
// while it is blocked. The resources will be acquired again once the
// worker is unblocked.
std::shared_ptr<Worker> worker = worker_pool_.GetRegisteredWorker(client);
if (worker && !worker->IsBlocked()) {
RAY_CHECK(!worker->GetAssignedTaskId().is_nil());
auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()});
const auto &task = tasks.front();
// Get the CPU resources required by the running task.
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
double required_cpus = 0;
RAY_CHECK(required_resources.GetResource(kCPU_ResourceLabel, &required_cpus));
const std::unordered_map<std::string, double> cpu_resources = {
{kCPU_ResourceLabel, required_cpus}};
// Release the CPU resources.
RAY_CHECK(
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release(
ResourceSet(cpu_resources)));
// Mark the task as blocked.
local_queues_.QueueBlockedTasks(tasks);
worker->MarkBlocked();

// Try to dispatch more tasks since the blocked worker released some
// resources.
DispatchTasks();
}
} break;
case protocol::MessageType_NotifyUnblocked: {
std::shared_ptr<Worker> worker = worker_pool_.GetRegisteredWorker(client);
// Re-acquire the CPU resources for the task that was assigned to the
// unblocked worker.
if (worker) {
RAY_CHECK(worker->IsBlocked());
RAY_CHECK(!worker->GetAssignedTaskId().is_nil());

auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()});
const auto &task = tasks.front();
// Get the CPU resources required by the running task.
const auto required_resources = task.GetTaskSpecification().GetRequiredResources();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const ref: const auto &

double required_cpus = 0;
RAY_CHECK(required_resources.GetResource(kCPU_ResourceLabel, &required_cpus));
const std::unordered_map<std::string, double> cpu_resources = {
{kCPU_ResourceLabel, required_cpus}};
// Acquire the CPU resources.
bool oversubscribed =
!cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Acquire(
ResourceSet(cpu_resources));
if (oversubscribed) {
const SchedulingResources &local_resources =
cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()];
RAY_LOG(WARNING) << "Resources oversubscribed: "
<< local_resources.GetAvailableResources().ToString();
}
// Mark the task as running again.
local_queues_.QueueRunningTasks(tasks);
worker->MarkUnblocked();
}
} break;

default:
Expand Down
9 changes: 9 additions & 0 deletions src/ray/raylet/scheduling_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ const std::list<Task> &SchedulingQueue::GetRunningTasks() const {
return this->running_tasks_;
}

const std::list<Task> &SchedulingQueue::GetBlockedTasks() const {
return this->blocked_tasks_;
}

const std::list<Task> &SchedulingQueue::GetReadyMethods() const {
throw std::runtime_error("Method not implemented");
}
Expand Down Expand Up @@ -65,6 +69,7 @@ std::vector<Task> SchedulingQueue::RemoveTasks(
removeTasksFromQueue(ready_tasks_, task_ids, removed_tasks);
removeTasksFromQueue(scheduled_tasks_, task_ids, removed_tasks);
removeTasksFromQueue(running_tasks_, task_ids, removed_tasks);
removeTasksFromQueue(blocked_tasks_, task_ids, removed_tasks);
// TODO(swang): Remove from running methods.

RAY_CHECK(task_ids.size() == 0);
Expand All @@ -91,6 +96,10 @@ void SchedulingQueue::QueueRunningTasks(const std::vector<Task> &tasks) {
queueTasks(running_tasks_, tasks);
}

void SchedulingQueue::QueueBlockedTasks(const std::vector<Task> &tasks) {
queueTasks(blocked_tasks_, tasks);
}

} // namespace raylet

} // namespace ray
20 changes: 19 additions & 1 deletion src/ray/raylet/scheduling_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,13 @@ class SchedulingQueue {
/// executing on a worker.
const std::list<Task> &GetRunningTasks() const;

/// Get the tasks in the blocked state.
///
/// \return A const reference to the queue of tasks that have been dispatched
/// to a worker but are blocked on a data dependency discovered to be missing
/// at runtime.
const std::list<Task> &GetBlockedTasks() const;

/// Remove tasks from the task queue.
///
/// \param tasks The set of task IDs to remove from the queue. The
Expand All @@ -77,7 +84,8 @@ class SchedulingQueue {
/// \param tasks The tasks to queue.
void QueueUncreatedActorMethods(const std::vector<Task> &tasks);

/// Queue tasks in the waiting state.
/// Queue tasks in the waiting state. These are tasks that cannot yet be
/// scheduled since they are blocked on a missing data dependency.
///
/// \param tasks The tasks to queue.
void QueueWaitingTasks(const std::vector<Task> &tasks);
Expand All @@ -97,6 +105,13 @@ class SchedulingQueue {
/// \param tasks The tasks to queue.
void QueueRunningTasks(const std::vector<Task> &tasks);

/// Queue tasks in the blocked state. These are tasks that have been
/// dispatched to a worker but are blocked on a data dependency that was
/// discovered to be missing at runtime.
///
/// \param tasks The tasks to queue.
void QueueBlockedTasks(const std::vector<Task> &tasks);

private:
/// Tasks that are destined for actors that have not yet been created.
std::list<Task> uncreated_actor_methods_;
Expand All @@ -109,6 +124,9 @@ class SchedulingQueue {
std::list<Task> scheduled_tasks_;
/// Tasks that are running on a worker.
std::list<Task> running_tasks_;
/// Tasks that were dispatched to a worker but are blocked on a data
/// dependency that was missing at runtime.
std::list<Task> blocked_tasks_;
};

} // namespace raylet
Expand Down
16 changes: 8 additions & 8 deletions src/ray/raylet/scheduling_resources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,19 @@ bool ResourceSet::RemoveResource(const std::string &resource_name) {
throw std::runtime_error("Method not implemented");
}
bool ResourceSet::SubtractResources(const ResourceSet &other) {
// Return failure if attempting to perform vector subtraction with unknown labels.
// TODO(atumanov): make the implementation atomic. Currently, if false is returned
// the resource capacity may be partially mutated. To reverse, call AddResources.
// Subtract the resources and track whether a resource goes below zero.
bool oversubscribed = false;
for (const auto &resource_pair : other.GetResourceMap()) {
const std::string &resource_label = resource_pair.first;
const double &resource_capacity = resource_pair.second;
if (resource_capacity_.count(resource_label) == 0) {
return false;
} else {
resource_capacity_[resource_label] -= resource_capacity;
RAY_CHECK(resource_capacity_.count(resource_label) == 1)
<< "Attempt to acquire unknown resource: " << resource_label;
resource_capacity_[resource_label] -= resource_capacity;
if (resource_capacity_[resource_label] < 0) {
oversubscribed = true;
}
}
return true;
return !oversubscribed;
}

bool ResourceSet::AddResources(const ResourceSet &other) {
Expand Down
6 changes: 5 additions & 1 deletion src/ray/raylet/scheduling_resources.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ namespace ray {

namespace raylet {

const std::string kCPU_ResourceLabel = "CPU";

/// Resource availability status reports whether the resource requirement is
/// (1) infeasible, (2) feasible but currently unavailable, or (3) available.
typedef enum {
Expand Down Expand Up @@ -160,7 +162,9 @@ class SchedulingResources {
/// \brief Acquire the amount of resources specified.
///
/// \param resources: the amount of resources to be acquired.
/// \return True if resources were successfully acquired. False otherwise.
/// \return True if resources were acquired without oversubscription. If this
/// returns false, then the resources were still acquired, but we are now at
/// negative resources.
bool Acquire(const ResourceSet &resources);

private:
Expand Down
9 changes: 8 additions & 1 deletion src/ray/raylet/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@ Worker::Worker(pid_t pid, std::shared_ptr<LocalClientConnection> connection)
: pid_(pid),
connection_(connection),
assigned_task_id_(TaskID::nil()),
actor_id_(ActorID::nil()) {}
actor_id_(ActorID::nil()),
blocked_(false) {}

void Worker::MarkBlocked() { blocked_ = true; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: ToggleBlocked(true/false) to save on methods. But we can clean it up if we decide to go with worker states later.


void Worker::MarkUnblocked() { blocked_ = false; }

bool Worker::IsBlocked() const { return blocked_; }

pid_t Worker::Pid() const { return pid_; }

Expand Down
6 changes: 6 additions & 0 deletions src/ray/raylet/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ class Worker {
Worker(pid_t pid, std::shared_ptr<LocalClientConnection> connection);
/// A destructor responsible for freeing all worker state.
~Worker() {}
void MarkBlocked();
void MarkUnblocked();
bool IsBlocked() const;
/// Return the worker's PID.
pid_t Pid() const;
void AssignTaskId(const TaskID &task_id);
Expand All @@ -37,6 +40,9 @@ class Worker {
TaskID assigned_task_id_;
/// The worker's actor ID. If this is nil, then the worker is not an actor.
ActorID actor_id_;
/// Whether the worker is blocked. Workers become blocked in a `ray.get`, if
/// they require a data dependency while executing a task.
bool blocked_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ideally, we would make it enum WorkerState and transition the worker through its finite state diagram: STARTING -> REGISTERED->READY->EXECUTING->BLOCKED. Blocked state is just one possibility. That said, I think the only use case this state is currently useful for is tracking if it's blocked or not.

};

} // namespace raylet
Expand Down