Skip to content

Commit

Permalink
[Core/Autoscaler] Properly clean up resource backlog from (ray-projec…
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Wu authored Jan 27, 2021
1 parent 3644df4 commit c0fe816
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 30 deletions.
21 changes: 16 additions & 5 deletions src/ray/raylet/scheduling/cluster_task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -618,12 +618,21 @@ bool ClusterTaskManager::AnyPendingTasks(Task *exemplar, bool *any_pending,
std::string ClusterTaskManager::DebugStr() const {
// TODO(Shanly): This method will be replaced with `DebugString` once we remove the
// legacy scheduler.
auto accumulator = [](int state, const std::pair<int, std::deque<Work>> &pair) {
return state + pair.second.size();
};
int num_infeasible_tasks =
std::accumulate(infeasible_tasks_.begin(), infeasible_tasks_.end(), 0, accumulator);
int num_tasks_to_schedule = std::accumulate(tasks_to_schedule_.begin(),
tasks_to_schedule_.end(), 0, accumulator);
int num_tasks_to_dispatch = std::accumulate(tasks_to_dispatch_.begin(),
tasks_to_dispatch_.end(), 0, accumulator);
std::stringstream buffer;
buffer << "========== Node: " << self_node_id_ << " =================\n";
buffer << "Schedule queue length: " << tasks_to_schedule_.size() << "\n";
buffer << "Dispatch queue length: " << tasks_to_dispatch_.size() << "\n";
buffer << "Infeasible queue length: " << num_infeasible_tasks << "\n";
buffer << "Schedule queue length: " << num_tasks_to_schedule << "\n";
buffer << "Dispatch queue length: " << num_tasks_to_dispatch << "\n";
buffer << "Waiting tasks size: " << waiting_tasks_.size() << "\n";
buffer << "infeasible queue length size: " << infeasible_tasks_.size() << "\n";
buffer << "cluster_resource_scheduler state: "
<< cluster_resource_scheduler_->DebugString() << "\n";
buffer << "==================================================";
Expand Down Expand Up @@ -673,7 +682,6 @@ void ClusterTaskManager::Dispatch(
const Task &task, rpc::RequestWorkerLeaseReply *reply,
std::function<void(void)> send_reply_callback) {
const auto &task_spec = task.GetTaskSpecification();
RAY_LOG(DEBUG) << "Dispatching task " << task_spec.TaskId();
// Pass the contact info of the worker to use.
reply->set_worker_pid(worker->GetProcess().GetId());
reply->mutable_worker_address()->set_ip_address(worker->IpAddress());
Expand All @@ -683,6 +691,7 @@ void ClusterTaskManager::Dispatch(

RAY_CHECK(leased_workers.find(worker->WorkerId()) == leased_workers.end());
leased_workers[worker->WorkerId()] = worker;
RemoveFromBacklogTracker(task);

// Update our internal view of the cluster state.
std::shared_ptr<TaskResourceInstances> allocated_resources;
Expand Down Expand Up @@ -734,7 +743,9 @@ void ClusterTaskManager::Dispatch(
}

void ClusterTaskManager::Spillback(const NodeID &spillback_to, const Work &work) {
const auto &task_spec = std::get<0>(work).GetTaskSpecification();
const auto &task = std::get<0>(work);
const auto &task_spec = task.GetTaskSpecification();
RemoveFromBacklogTracker(task);
RAY_LOG(DEBUG) << "Spilling task " << task_spec.TaskId() << " to node " << spillback_to;

if (!cluster_resource_scheduler_->AllocateRemoteTaskResources(
Expand Down
71 changes: 46 additions & 25 deletions src/ray/raylet/scheduling/cluster_task_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -554,48 +554,69 @@ TEST_F(ClusterTaskManagerTest, BacklogReportTest) {
*callback_occurred_ptr = true;
};

std::shared_ptr<MockWorker> worker =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);
pool_.PushWorker(std::dynamic_pointer_cast<WorkerInterface>(worker));

std::vector<TaskID> to_cancel;

for (int i = 0; i < 10; i++) {
Task task = CreateTask({{ray::kCPU_ResourceLabel, 100}});
task.SetBacklogSize(i);
// Don't add these fist 2 tasks to `to_cancel`.
for (int i = 0; i < 1; i++) {
Task task = CreateTask({{ray::kCPU_ResourceLabel, 8}});
task.SetBacklogSize(10 - i);
task_manager_.QueueAndScheduleTask(task, &reply, callback);
}

for (int i = 1; i < 10; i++) {
Task task = CreateTask({{ray::kCPU_ResourceLabel, 8}});
task.SetBacklogSize(10 - i);
task_manager_.QueueAndScheduleTask(task, &reply, callback);
to_cancel.push_back(task.GetTaskSpecification().TaskId());
}

ASSERT_FALSE(callback_occurred);
ASSERT_EQ(leased_workers_.size(), 0);
ASSERT_EQ(pool_.workers.size(), 1);
ASSERT_EQ(pool_.workers.size(), 0);
ASSERT_EQ(node_info_calls_, 0);

auto data = std::make_shared<rpc::ResourcesData>();
task_manager_.FillResourceUsage(data);
{ // No tasks can run because the worker pool is empty.
auto data = std::make_shared<rpc::ResourcesData>();
task_manager_.FillResourceUsage(data);
auto resource_load_by_shape = data->resource_load_by_shape();
auto shape1 = resource_load_by_shape.resource_demands()[0];

ASSERT_EQ(shape1.backlog_size(), 55);
ASSERT_EQ(shape1.num_infeasible_requests_queued(), 0);
ASSERT_EQ(shape1.num_ready_requests_queued(), 10);
}

// Push a worker so the first task can run.
std::shared_ptr<MockWorker> worker =
std::make_shared<MockWorker>(WorkerID::FromRandom(), 1234);
pool_.PushWorker(worker);
task_manager_.ScheduleAndDispatchTasks();

auto resource_load_by_shape = data->resource_load_by_shape();
auto shape1 = resource_load_by_shape.resource_demands()[0];
{
auto data = std::make_shared<rpc::ResourcesData>();
task_manager_.FillResourceUsage(data);
auto resource_load_by_shape = data->resource_load_by_shape();
auto shape1 = resource_load_by_shape.resource_demands()[0];

ASSERT_EQ(shape1.backlog_size(), 45);
ASSERT_EQ(shape1.num_infeasible_requests_queued(), 10);
ASSERT_EQ(shape1.num_ready_requests_queued(), 0);
ASSERT_TRUE(callback_occurred);
ASSERT_EQ(shape1.backlog_size(), 45);
ASSERT_EQ(shape1.num_infeasible_requests_queued(), 0);
ASSERT_EQ(shape1.num_ready_requests_queued(), 9);
}

// Cancel the rest.
for (auto &task_id : to_cancel) {
ASSERT_TRUE(task_manager_.CancelTask(task_id));
}
RAY_LOG(ERROR) << "Finished cancelling tasks";

data = std::make_shared<rpc::ResourcesData>();
task_manager_.FillResourceUsage(data);

resource_load_by_shape = data->resource_load_by_shape();
shape1 = resource_load_by_shape.resource_demands()[0];

ASSERT_EQ(shape1.backlog_size(), 0);
ASSERT_EQ(shape1.num_infeasible_requests_queued(), 0);
ASSERT_EQ(shape1.num_ready_requests_queued(), 0);
AssertNoLeaks();
{
auto data = std::make_shared<rpc::ResourcesData>();
task_manager_.FillResourceUsage(data);
auto resource_load_by_shape = data->resource_load_by_shape();
ASSERT_EQ(resource_load_by_shape.resource_demands().size(), 0);
AssertNoLeaks();
}
}

TEST_F(ClusterTaskManagerTest, OwnerDeadTest) {
Expand Down

0 comments on commit c0fe816

Please sign in to comment.