Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raylet task dispatch and throttling worker startup #1912

Merged
merged 12 commits into from
Apr 18, 2018
2 changes: 1 addition & 1 deletion python/ray/scripts/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ def stop():
subprocess.call(
[
"killall global_scheduler plasma_store plasma_manager "
"local_scheduler raylet"
"local_scheduler raylet raylet_monitor"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, thanks :)

],
shell=True)

Expand Down
63 changes: 38 additions & 25 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,31 @@ void NodeManager::ProcessNewClient(std::shared_ptr<LocalClientConnection> client
client->ProcessMessages();
}

void NodeManager::DispatchTasks() {
// Work with a copy of scheduled tasks.
auto scheduled_tasks = local_queues_.GetScheduledTasks();
// Return if there are no tasks to schedule.
if (scheduled_tasks.empty()) {
return;
}
const ClientID &my_client_id = gcs_client_->client_table().GetLocalClientId();

for (const auto &task : scheduled_tasks) {
const auto &local_resources =
cluster_resource_map_[my_client_id].GetAvailableResources();
const auto &task_resources = task.GetTaskSpecification().GetRequiredResources();
if (!task_resources.IsSubset(local_resources)) {
// Not enough local resources for this task right now, skip this task.
continue;
}
// We have enough resources for this task. Assign task.
// TODO(atumanov): perform the task state/queue transition inside AssignTask.
auto dispatched_task =
local_queues_.RemoveTasks({task.GetTaskSpecification().TaskId()});
AssignTask(dispatched_task.front());
}
}

void NodeManager::ProcessClientMessage(std::shared_ptr<LocalClientConnection> client,
int64_t message_type,
const uint8_t *message_data) {
Expand All @@ -285,21 +310,9 @@ void NodeManager::ProcessClientMessage(std::shared_ptr<LocalClientConnection> cl
}
// Return the worker to the idle pool.
worker_pool_.PushWorker(worker);
// Check if there is a scheduled task that can now be assigned to the newly
// idle worker.
auto scheduled_tasks = local_queues_.GetScheduledTasks();
if (!scheduled_tasks.empty()) {
// Find a scheduled task that whose actor ID matches that of the newly
// idle worker.
auto worker_actor_id = worker->GetActorId();
for (const auto &task : scheduled_tasks) {
if (task.GetTaskSpecification().ActorId() == worker_actor_id) {
auto scheduled_tasks =
local_queues_.RemoveTasks({task.GetTaskSpecification().TaskId()});
AssignTask(scheduled_tasks.front());
}
}
}
// Call task dispatch to assign work to the new worker.
DispatchTasks();

} break;
case protocol::MessageType_DisconnectClient: {
// Remove the dead worker from the pool and stop listening for messages.
Expand Down Expand Up @@ -374,6 +387,7 @@ void NodeManager::HandleWaitingTaskReady(const TaskID &task_id) {
}

void NodeManager::ScheduleTasks() {
// This method performs the transition of tasks from PENDING to SCHEDULED.
auto policy_decision = scheduling_policy_.Schedule(
cluster_resource_map_, gcs_client_->client_table().GetLocalClientId(),
remote_clients_);
Expand All @@ -386,7 +400,7 @@ void NodeManager::ScheduleTasks() {

// Extract decision for this local scheduler.
std::unordered_set<TaskID, UniqueIDHasher> local_task_ids;
// Iterate over (taskid, clientid) pairs, extract tasks to run on the local client.
// Iterate over (taskid, clientid) pairs, extract tasks assigned to the local node.
for (const auto &task_schedule : policy_decision) {
TaskID task_id = task_schedule.first;
ClientID client_id = task_schedule.second;
Expand All @@ -402,11 +416,10 @@ void NodeManager::ScheduleTasks() {
}
}

// Assign the tasks to workers.
// Transition locally scheduled tasks to SCHEDULED and dispatch scheduled tasks.
std::vector<Task> tasks = local_queues_.RemoveTasks(local_task_ids);
for (auto &task : tasks) {
AssignTask(task);
}
local_queues_.QueueScheduledTasks(tasks);
DispatchTasks();
}

void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineage) {
Expand Down Expand Up @@ -481,11 +494,6 @@ void NodeManager::AssignTask(Task &task) {
}
}

// Resource accounting: acquire resources for the scheduled task.
const ClientID &my_client_id = gcs_client_->client_table().GetLocalClientId();
RAY_CHECK(
this->cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources()));

// Try to get an idle worker that can execute this task.
std::shared_ptr<Worker> worker = worker_pool_.PopWorker(spec.ActorId());
if (worker == nullptr) {
Expand All @@ -509,6 +517,11 @@ void NodeManager::AssignTask(Task &task) {
auto status = worker->Connection()->WriteMessage(protocol::MessageType_ExecuteTask,
fbb.GetSize(), fbb.GetBufferPointer());
if (status.ok()) {
// Resource accounting: acquire resources for the assigned task.
const ClientID &my_client_id = gcs_client_->client_table().GetLocalClientId();
RAY_CHECK(
this->cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources()));

// We successfully assigned the task to the worker.
worker->AssignTaskId(spec.TaskId());
// If the task was an actor task, then record this execution to guarantee
Expand Down
3 changes: 3 additions & 0 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ class NodeManager {
/// Handler for a heartbeat notification from the GCS.
void HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &id,
const HeartbeatTableDataT &data);
/// Dispatch locally scheduled tasks. This attempts the transition from "scheduled" to
/// "running" task state.
void DispatchTasks();

boost::asio::io_service &io_service_;
ObjectManager &object_manager_;
Expand Down
11 changes: 11 additions & 0 deletions src/ray/raylet/scheduling_resources.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ bool ResourceSet::operator==(const ResourceSet &rhs) const {
return (this->IsSubset(rhs) && rhs.IsSubset(*this));
}

bool ResourceSet::IsEmpty() const {
// Check whether the capacity of each resource type is zero. Exit early if not.
if (resource_capacity_.empty()) return true;
for (const auto &resource_pair : resource_capacity_) {
if (resource_pair.second > 0) {
return false;
}
}
return true;
}

bool ResourceSet::IsSubset(const ResourceSet &other) const {
// Check to make sure all keys of this are in other.
for (const auto &resource_pair : resource_capacity_) {
Expand Down
5 changes: 5 additions & 0 deletions src/ray/raylet/scheduling_resources.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ class ResourceSet {
/// False otherwise.
bool GetResource(const std::string &resource_name, double *value) const;

/// Return true if the resource set is empty. False otherwise.
///
/// \return True if the resource capacity is zero. False otherwise.
bool IsEmpty() const;

// TODO(atumanov): implement const_iterator class for the ResourceSet container.
const std::unordered_map<std::string, double> &GetResourceMap() const;

Expand Down
40 changes: 38 additions & 2 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ WorkerPool::WorkerPool(int num_workers, const std::vector<std::string> &worker_c
// become zombies instead of dying gracefully.
signal(SIGCHLD, SIG_IGN);
for (int i = 0; i < num_workers; i++) {
StartWorker();
// Force-start num_workers workers.
StartWorker(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I learned this tip from Zongheng to write this as StartWorker(/*force_start*/=true) to make it clear to the reader what the parameter is. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this introduce C-style comments? I'd rather not, if there's a cleaner, pythonic way, that'd be great.

}
}

/// A constructor that initializes an empty worker pool with zero workers.
WorkerPool::WorkerPool(const std::vector<std::string> &worker_command)
: worker_command_(worker_command) {}

WorkerPool::~WorkerPool() {
// Kill all registered workers. NOTE(swang): This assumes that the registered
// workers were started by the pool.
Expand All @@ -28,15 +33,39 @@ WorkerPool::~WorkerPool() {
kill(worker->Pid(), SIGKILL);
waitpid(worker->Pid(), NULL, 0);
}
// Kill all the workers that have been started but not registered.
for (const auto &pid : started_worker_pids_) {
RAY_CHECK(pid > 0);
kill(pid, SIGKILL);
waitpid(pid, NULL, 0);
}

pool_.clear();
actor_pool_.clear();
registered_workers_.clear();
started_worker_pids_.clear();
}

uint32_t WorkerPool::Size() const {
return static_cast<uint32_t>(actor_pool_.size() + pool_.size());
}

void WorkerPool::StartWorker() {
void WorkerPool::StartWorker(bool force_start) {
RAY_CHECK(!worker_command_.empty()) << "No worker command provided";
if (!started_worker_pids_.empty() && !force_start) {
// Workers have been started, but not registered. Force start disabled -- returning.
RAY_LOG(DEBUG) << started_worker_pids_.size() << " workers pending registration";
return;
}
// Either there are no workers pending registration or the worker start is being forced.
RAY_LOG(DEBUG) << "starting worker, actor pool " << actor_pool_.size() << " task pool "
<< pool_.size();

// Launch the process to create the worker.
pid_t pid = fork();
if (pid != 0) {
RAY_LOG(DEBUG) << "Started worker with pid " << pid;
started_worker_pids_.insert(pid);
return;
}

Expand All @@ -60,6 +89,8 @@ void WorkerPool::StartWorker() {
void WorkerPool::RegisterWorker(std::shared_ptr<Worker> worker) {
RAY_LOG(DEBUG) << "Registering worker with pid " << worker->Pid();
registered_workers_.push_back(worker);
RAY_CHECK(started_worker_pids_.count(worker->Pid()) > 0);
started_worker_pids_.erase(worker->Pid());
}

std::shared_ptr<Worker> WorkerPool::GetRegisteredWorker(
Expand Down Expand Up @@ -119,6 +150,11 @@ bool WorkerPool::DisconnectWorker(std::shared_ptr<Worker> worker) {
return removeWorker(pool_, worker);
}

// Protected WorkerPool methods.
void WorkerPool::AddStartedWorker(pid_t pid) { started_worker_pids_.insert(pid); }

uint32_t WorkerPool::NumStartedWorkers() const { return started_worker_pids_.size(); }

} // namespace raylet

} // namespace ray
33 changes: 31 additions & 2 deletions src/ray/raylet/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <inttypes.h>
#include <list>
#include <unordered_map>
#include <unordered_set>

#include "ray/common/client_connection.h"
#include "ray/raylet/worker.h"
Expand All @@ -26,16 +27,26 @@ class WorkerPool {
/// pool.
///
/// \param num_workers The number of workers to start.
/// \param worker_command The command used to start the worker process.
WorkerPool(int num_workers, const std::vector<std::string> &worker_command);

/// Create a pool with zero workers.
///
/// \param num_workers The number of workers to start.
/// \param worker_command The command used to start the worker process.
WorkerPool(const std::vector<std::string> &worker_command);

/// Destructor responsible for freeing a set of workers owned by this class.
~WorkerPool();
virtual ~WorkerPool();

/// Asynchronously start a new worker process. Once the worker process has
/// registered with an external server, the process should create and
/// register a new Worker, then add itself to the pool. Failure to start
/// the worker process is a fatal error.
void StartWorker();
///
/// \param force_start Controls whether to force starting a worker regardless of any
/// workers that have already been started but not yet registered.
void StartWorker(bool force_start = false);

/// Register a new worker. The Worker should be added by the caller to the
/// pool after it becomes idle (e.g., requests a work assignment).
Expand Down Expand Up @@ -70,6 +81,23 @@ class WorkerPool {
/// such worker exists.
std::shared_ptr<Worker> PopWorker(const ActorID &actor_id);

/// Return the current size of the worker pool. Counts only the workers that registered
/// and requested a task.
///
/// \return The total count of all workers (actor and non-actor) in the pool.
uint32_t Size() const;

protected:
/// Add started worker PID to the internal list of started workers (for testing).
///
/// \param pid A process identifier for the worker being started.
void AddStartedWorker(pid_t pid);

/// Return a number of workers currently started but not registered.
///
/// \return The number of worker PIDs stored for started workers.
uint32_t NumStartedWorkers() const;

private:
std::vector<std::string> worker_command_;
/// The pool of idle workers.
Expand All @@ -80,6 +108,7 @@ class WorkerPool {
/// idle and executing.
// TODO(swang): Make this a map to make GetRegisteredWorker faster.
std::list<std::shared_ptr<Worker>> registered_workers_;
std::unordered_set<pid_t> started_worker_pids_;
};

} // namespace raylet
Expand Down
22 changes: 20 additions & 2 deletions src/ray/raylet/worker_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,26 @@ namespace ray {

namespace raylet {

class WorkerPoolMock : public WorkerPool {
public:
WorkerPoolMock(const std::vector<std::string> &worker_command)
: WorkerPool(worker_command) {}

void StartWorker(pid_t pid, bool force_start = false) {
if (NumStartedWorkers() > 0 && !force_start) {
// Workers have been started, but not registered. Force start disabled -- returning.
RAY_LOG(DEBUG) << NumStartedWorkers() << " workers pending registration";
return;
}
// Either no workers are pending registration or the worker start is being forced.
RAY_LOG(DEBUG) << "starting worker, worker pool size " << Size();
AddStartedWorker(pid);
}
};

class WorkerPoolTest : public ::testing::Test {
public:
WorkerPoolTest() : worker_pool_(0, {}), io_service_() {}
WorkerPoolTest() : worker_pool_({}), io_service_() {}

std::shared_ptr<Worker> CreateWorker(pid_t pid) {
std::function<void(std::shared_ptr<LocalClientConnection>)> client_handler = [this](
Expand All @@ -23,11 +40,12 @@ class WorkerPoolTest : public ::testing::Test {
boost::asio::local::stream_protocol::socket socket(io_service_);
auto client =
LocalClientConnection::Create(client_handler, message_handler, std::move(socket));
worker_pool_.StartWorker(pid);
return std::shared_ptr<Worker>(new Worker(pid, client));
}

protected:
WorkerPool worker_pool_;
WorkerPoolMock worker_pool_;
boost::asio::io_service io_service_;

private:
Expand Down