Skip to content

Commit

Permalink
Remove idle actor from worker pool. (ray-project#13523)
Browse files Browse the repository at this point in the history
  • Loading branch information
jovany-wang authored Jan 23, 2021
1 parent 01d74af commit 8ef835f
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 56 deletions.
32 changes: 11 additions & 21 deletions src/ray/raylet/worker_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,8 @@ Process WorkerPool::StartWorkerProcess(
return Process();
}
// Either there are no workers pending registration or the worker start is being forced.
RAY_LOG(DEBUG) << "Starting new worker process, current pool has "
<< state.idle_actor.size() << " actor workers, and " << state.idle.size()
<< " non-actor workers";
RAY_LOG(DEBUG) << "Starting new worker process, current pool has " << state.idle.size()
<< " workers";

int workers_to_start = 1;
if (dynamic_options.empty()) {
Expand Down Expand Up @@ -625,15 +624,11 @@ void WorkerPool::PushWorker(const std::shared_ptr<WorkerInterface> &worker) {
state.idle_dedicated_workers[task_id] = worker;
} else {
// The worker is not used for the actor creation task with dynamic options.
// Put the worker to the corresponding idle pool.
if (worker->GetActorId().IsNil()) {
state.idle.insert(worker);
int64_t now = current_time_ms();
idle_of_all_languages_.emplace_back(worker, now);
idle_of_all_languages_map_[worker] = now;
} else {
state.idle_actor[worker->GetActorId()] = worker;
}
// Put the worker to the idle pool.
state.idle.insert(worker);
int64_t now = current_time_ms();
idle_of_all_languages_.emplace_back(worker, now);
idle_of_all_languages_map_[worker] = now;
}
}

Expand Down Expand Up @@ -787,7 +782,10 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
state.tasks_to_dedicated_workers[task_spec.TaskId()] = proc;
}
}
} else if (!task_spec.IsActorTask()) {
} else if (task_spec.IsActorTask()) {
// Code path of actor task.
RAY_CHECK(false) << "Direct call shouldn't reach here.";
} else {
// Code path of normal task or actor creation task without dynamic worker options.
// Find an available worker which is already assigned to this job.
// Try to pop the most recently pushed worker.
Expand All @@ -812,14 +810,6 @@ std::shared_ptr<WorkerInterface> WorkerPool::PopWorker(
proc = StartWorkerProcess(task_spec.GetLanguage(), rpc::WorkerType::WORKER,
task_spec.JobId());
}
} else {
// Code path of actor task.
const auto &actor_id = task_spec.ActorId();
auto actor_entry = state.idle_actor.find(actor_id);
if (actor_entry != state.idle_actor.end()) {
worker = std::move(actor_entry->second);
state.idle_actor.erase(actor_entry);
}
}

if (worker == nullptr && proc.IsValid()) {
Expand Down
2 changes: 0 additions & 2 deletions src/ray/raylet/worker_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,6 @@ class WorkerPool : public WorkerPoolInterface, public IOWorkerPoolInterface {
std::unordered_map<TaskID, std::shared_ptr<WorkerInterface>> idle_dedicated_workers;
/// The pool of idle non-actor workers.
std::unordered_set<std::shared_ptr<WorkerInterface>> idle;
/// The pool of idle actor workers.
std::unordered_map<ActorID, std::shared_ptr<WorkerInterface>> idle_actor;
// States for io workers used for spilling objects.
IOWorkerState spill_io_worker_state;
// States for io workers used for restoring objects.
Expand Down
38 changes: 5 additions & 33 deletions src/ray/raylet/worker_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,28 +343,6 @@ TEST_F(WorkerPoolTest, HandleWorkerPushPop) {
ASSERT_EQ(popped_worker, nullptr);
}

TEST_F(WorkerPoolTest, PopActorWorker) {
// Create a worker.
auto worker = CreateWorker(Process::CreateNewDummy());
// Add the worker to the pool.
worker_pool_->PushWorker(worker);

// Assign an actor ID to the worker.
const auto task_spec = ExampleTaskSpec();
auto actor = worker_pool_->PopWorker(task_spec);
auto actor_id = ActorID::Of(JOB_ID, TaskID::ForDriverTask(JOB_ID), 1);
actor->AssignActorId(actor_id);
worker_pool_->PushWorker(actor);

// Check that there are no more non-actor workers.
ASSERT_EQ(worker_pool_->PopWorker(task_spec), nullptr);
// Check that we can pop the actor worker.
const auto actor_task_spec = ExampleTaskSpec(actor_id);
actor = worker_pool_->PopWorker(actor_task_spec);
ASSERT_EQ(actor, worker);
ASSERT_EQ(actor->GetActorId(), actor_id);
}

TEST_F(WorkerPoolTest, PopWorkersOfMultipleLanguages) {
// Create a Python Worker, and add it to the pool
auto py_worker = CreateWorker(Process::CreateNewDummy(), Language::PYTHON);
Expand Down Expand Up @@ -428,25 +406,19 @@ TEST_F(WorkerPoolTest, PopWorkerMultiTenancy) {
worker_pool_->PushWorker(worker);
}
}

std::unordered_set<WorkerID> worker_ids;
for (int round = 0; round < 2; round++) {
std::vector<std::shared_ptr<WorkerInterface>> workers;

// Pop workers for actor (creation) tasks.
// Pop workers for actor.
for (auto job_id : job_ids) {
auto actor_id = ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1);
// For the first round, we pop for actor creation tasks.
// For the second round, we pop for actor tasks.
auto task_spec =
ExampleTaskSpec(round == 0 ? ActorID::Nil() : actor_id, Language::PYTHON,
job_id, round == 0 ? actor_id : ActorID::Nil());
auto actor_creation_id = ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1);
// Pop workers for actor creation tasks.
auto task_spec = ExampleTaskSpec(/*actor_id=*/ActorID::Nil(), Language::PYTHON,
job_id, actor_creation_id);
auto worker = worker_pool_->PopWorker(task_spec);
ASSERT_TRUE(worker);
ASSERT_EQ(worker->GetAssignedJobId(), job_id);
if (round == 0) {
worker->AssignActorId(actor_id);
}
workers.push_back(worker);
}

Expand Down

0 comments on commit 8ef835f

Please sign in to comment.