diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 4df7a65b58b6..ce00ae20f831 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -141,13 +141,7 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self config.max_worker_port, config.worker_ports, gcs_client_, config.worker_commands, config.raylet_config, /*starting_worker_timeout_callback=*/ - [this]() { - if (RayConfig::instance().new_scheduler_enabled()) { - ScheduleAndDispatch(); - } else { - this->DispatchTasks(this->local_queues_.GetReadyTasksByClass()); - } - }), + [this] { cluster_task_manager_->ScheduleAndDispatchTasks(); }), scheduling_policy_(local_queues_), reconstruction_policy_( io_service_, @@ -177,7 +171,6 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self /*local_only=*/false); }, is_plasma_object_spillable), - new_scheduler_enabled_(RayConfig::instance().new_scheduler_enabled()), report_worker_backlog_(RayConfig::instance().report_worker_backlog()), last_local_gc_ns_(absl::GetCurrentTimeNanos()), local_gc_interval_ns_(RayConfig::instance().local_gc_interval_s() * 1e9), @@ -197,9 +190,9 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self RAY_CHECK_OK(object_manager_.SubscribeObjDeleted( [this](const ObjectID &object_id) { HandleObjectMissing(object_id); })); - if (new_scheduler_enabled_) { + if (RayConfig::instance().new_scheduler_enabled()) { SchedulingResources &local_resources = cluster_resource_map_[self_node_id_]; - new_resource_scheduler_ = + cluster_resource_scheduler_ = std::shared_ptr(new ClusterResourceScheduler( self_node_id_.Binary(), local_resources.GetTotalResources().GetResourceMap())); @@ -216,11 +209,23 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self PublishInfeasibleTaskError(task); }; cluster_task_manager_ = std::shared_ptr(new ClusterTaskManager( - self_node_id_, new_resource_scheduler_, dependency_manager_, is_owner_alive, - get_node_info_func, announce_infeasible_task)); + self_node_id_, + std::dynamic_pointer_cast(cluster_resource_scheduler_), + dependency_manager_, is_owner_alive, get_node_info_func, announce_infeasible_task, + worker_pool_, leased_workers_)); placement_group_resource_manager_ = - std::make_shared(new_resource_scheduler_); + std::make_shared( + std::dynamic_pointer_cast( + cluster_resource_scheduler_)); } else { + cluster_resource_scheduler_ = std::make_shared( + self_node_id_, local_available_resources_, cluster_resource_map_, + gcs_client_->NodeResources().GetLastResourceUsage()); + cluster_task_manager_ = std::shared_ptr(this, [](NodeManager *) { + // If new scheduler is not enabled, the `cluster_task_manager_` is this NodeManager + // itself. So we create a `shared_ptr` that points to `this` and the deleter does + // nothing. + }); placement_group_resource_manager_ = std::make_shared( local_available_resources_, cluster_resource_map_, self_node_id_); @@ -362,11 +367,7 @@ void NodeManager::HandleJobStarted(const JobID &job_id, const JobTableData &job_ // Tasks of this job may already arrived but failed to pop a worker because the job // config is not local yet. So we trigger dispatching again here to try to // reschedule these tasks. - if (new_scheduler_enabled_) { - ScheduleAndDispatch(); - } else { - DispatchTasks(local_queues_.GetReadyTasksByClass()); - } + cluster_task_manager_->ScheduleAndDispatchTasks(); } void NodeManager::HandleJobFinished(const JobID &job_id, const JobTableData &job_data) { @@ -439,65 +440,13 @@ void NodeManager::Heartbeat() { void NodeManager::ReportResourceUsage() { auto resources_data = std::make_shared(); - SchedulingResources &local_resources = cluster_resource_map_[self_node_id_]; resources_data->set_node_id(self_node_id_.Binary()); - - if (new_scheduler_enabled_) { - // Update local chche from gcs remote cache, this is needed when gcs restart. - // We should always keep the cache view consistent. - new_resource_scheduler_->UpdateLastResourceUsage( - gcs_client_->NodeResources().GetLastResourceUsage()); - new_resource_scheduler_->FillResourceUsage(resources_data); - cluster_task_manager_->FillResourceUsage(resources_data); - } else { - // TODO(atumanov): modify the heartbeat table protocol to use the ResourceSet - // directly. - // TODO(atumanov): implement a ResourceSet const_iterator. - // We only set fileds that changed. - auto last_heartbeat_resources = gcs_client_->NodeResources().GetLastResourceUsage(); - if (!last_heartbeat_resources->GetTotalResources().IsEqual( - local_resources.GetTotalResources())) { - for (const auto &resource_pair : - local_resources.GetTotalResources().GetResourceMap()) { - (*resources_data->mutable_resources_total())[resource_pair.first] = - resource_pair.second; - } - last_heartbeat_resources->SetTotalResources( - ResourceSet(local_resources.GetTotalResources())); - } - - if (!last_heartbeat_resources->GetAvailableResources().IsEqual( - local_resources.GetAvailableResources())) { - resources_data->set_resources_available_changed(true); - for (const auto &resource_pair : - local_resources.GetAvailableResources().GetResourceMap()) { - (*resources_data->mutable_resources_available())[resource_pair.first] = - resource_pair.second; - } - last_heartbeat_resources->SetAvailableResources( - ResourceSet(local_resources.GetAvailableResources())); - } - - local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad()); - if (!last_heartbeat_resources->GetLoadResources().IsEqual( - local_resources.GetLoadResources())) { - resources_data->set_resource_load_changed(true); - for (const auto &resource_pair : - local_resources.GetLoadResources().GetResourceMap()) { - (*resources_data->mutable_resource_load())[resource_pair.first] = - resource_pair.second; - } - last_heartbeat_resources->SetLoadResources( - ResourceSet(local_resources.GetLoadResources())); - } - } - - if (!new_scheduler_enabled_) { - // Add resource load by shape. This will be used by the new autoscaler. - auto resource_load = local_queues_.GetResourceLoadByShape( - RayConfig::instance().max_resource_shapes_per_load_report()); - resources_data->mutable_resource_load_by_shape()->Swap(&resource_load); - } + // Update local chche from gcs remote cache, this is needed when gcs restart. + // We should always keep the cache view consistent. + cluster_resource_scheduler_->UpdateLastResourceUsage( + gcs_client_->NodeResources().GetLastResourceUsage()); + cluster_resource_scheduler_->FillResourceUsage(resources_data); + cluster_task_manager_->FillResourceUsage(resources_data); // Set the global gc bit on the outgoing heartbeat message. if (should_global_gc_) { @@ -627,32 +576,14 @@ void NodeManager::WarnResourceDeadlock() { } } - if (new_scheduler_enabled_) { - // Check if any tasks are blocked on resource acquisition. - if (!cluster_task_manager_->AnyPendingTasks( - &exemplar, &any_pending, &pending_actor_creations, &pending_tasks)) { - // No pending tasks, no need to warn. - resource_deadlock_warned_ = 0; - return; - } - available_resources = new_resource_scheduler_->GetLocalResourceViewString(); - } else { - // See if any tasks are blocked trying to acquire resources. - for (const auto &task : local_queues_.GetTasks(TaskState::READY)) { - const TaskSpecification &spec = task.GetTaskSpecification(); - if (spec.IsActorCreationTask()) { - pending_actor_creations += 1; - } else { - pending_tasks += 1; - } - if (!any_pending) { - exemplar = task; - any_pending = true; - } - } - SchedulingResources &local_resources = cluster_resource_map_[self_node_id_]; - available_resources = local_resources.GetAvailableResources().ToString(); + // Check if any tasks are blocked on resource acquisition. + if (!cluster_task_manager_->AnyPendingTasks(&exemplar, &any_pending, + &pending_actor_creations, &pending_tasks)) { + // No pending tasks, no need to warn. + resource_deadlock_warned_ = 0; + return; } + available_resources = cluster_resource_scheduler_->GetLocalResourceViewString(); // Push an warning to the driver that a task is blocked trying to acquire resources. // To avoid spurious triggers, only take action starting with the second time. @@ -767,21 +698,12 @@ void NodeManager::NodeRemoved(const GcsNodeInfo &node_info) { // not be necessary. // Remove the node from the resource map. - if (0 == cluster_resource_map_.erase(node_id)) { + if (!cluster_resource_scheduler_->RemoveNode(node_id.Binary())) { RAY_LOG(DEBUG) << "Received NodeRemoved callback for an unknown node: " << node_id << "."; return; } - // Remove the node from the resource map. - if (new_scheduler_enabled_) { - if (!new_resource_scheduler_->RemoveNode(node_id.Binary())) { - RAY_LOG(DEBUG) << "Received NodeRemoved callback for an unknown node: " << node_id - << "."; - return; - } - } - // Remove the node manager address. const auto node_entry = remote_node_manager_addresses_.find(node_id); if (node_entry != remote_node_manager_addresses_.end()) { @@ -849,30 +771,19 @@ void NodeManager::ResourceCreateUpdated(const NodeID &node_id, << " with created or updated resources: " << createUpdatedResources.ToString() << ". Updating resource map."; - SchedulingResources &cluster_schedres = cluster_resource_map_[node_id]; - // Update local_available_resources_ and SchedulingResources for (const auto &resource_pair : createUpdatedResources.GetResourceMap()) { const std::string &resource_label = resource_pair.first; const double &new_resource_capacity = resource_pair.second; - - cluster_schedres.UpdateResourceCapacity(resource_label, new_resource_capacity); - if (node_id == self_node_id_) { - local_available_resources_.AddOrUpdateResource(resource_label, - new_resource_capacity); - } - if (new_scheduler_enabled_) { - new_resource_scheduler_->UpdateResourceCapacity(node_id.Binary(), resource_label, - new_resource_capacity); - } + cluster_resource_scheduler_->UpdateResourceCapacity(node_id.Binary(), resource_label, + new_resource_capacity); } RAY_LOG(DEBUG) << "[ResourceCreateUpdated] Updated cluster_resource_map."; if (node_id == self_node_id_) { // The resource update is on the local node, check if we can reschedule tasks. - TryLocalInfeasibleTaskScheduling(); + cluster_task_manager_->ScheduleInfeasibleTasks(); } - return; } void NodeManager::ResourceDeleted(const NodeID &node_id, @@ -887,17 +798,9 @@ void NodeManager::ResourceDeleted(const NodeID &node_id, << ". Updating resource map."; } - SchedulingResources &cluster_schedres = cluster_resource_map_[node_id]; - // Update local_available_resources_ and SchedulingResources for (const auto &resource_label : resource_names) { - cluster_schedres.DeleteResource(resource_label); - if (node_id == self_node_id_) { - local_available_resources_.DeleteResource(resource_label); - } - if (new_scheduler_enabled_) { - new_resource_scheduler_->DeleteResource(node_id.Binary(), resource_label); - } + cluster_resource_scheduler_->DeleteResource(node_id.Binary(), resource_label); } return; } @@ -925,71 +828,21 @@ void NodeManager::TryLocalInfeasibleTaskScheduling() { void NodeManager::UpdateResourceUsage(const NodeID &node_id, const rpc::ResourcesData &resource_data) { - // Locate the node id in remote node table and update available resources based on - // the received resource usage information. - auto it = cluster_resource_map_.find(node_id); - if (it == cluster_resource_map_.end()) { - // Haven't received the node registration for this node yet, skip this message. + if (!cluster_resource_scheduler_->UpdateNode(node_id.Binary(), resource_data)) { RAY_LOG(INFO) << "[UpdateResourceUsage]: received resource usage from unknown node id " << node_id; return; } + // Trigger local GC at the next heartbeat interval. if (resource_data.should_global_gc()) { should_local_gc_ = true; } - SchedulingResources &remote_resources = it->second; - - // We update remote resources only when related - // resources map in message changed. - if (resource_data.resources_total_size() > 0) { - ResourceSet remote_total(MapFromProtobuf(resource_data.resources_total())); - remote_resources.SetTotalResources(std::move(remote_total)); - } - if (resource_data.resources_available_changed()) { - ResourceSet remote_available(MapFromProtobuf(resource_data.resources_available())); - remote_resources.SetAvailableResources(std::move(remote_available)); - } - if (resource_data.resource_load_changed()) { - ResourceSet remote_load(MapFromProtobuf(resource_data.resource_load())); - // Extract the load information and save it locally. - remote_resources.SetLoadResources(std::move(remote_load)); - } - - if (new_scheduler_enabled_ && node_id != self_node_id_) { - new_resource_scheduler_->AddOrUpdateNode( - node_id.Binary(), remote_resources.GetTotalResources().GetResourceMap(), - remote_resources.GetAvailableResources().GetResourceMap()); - // TODO(swang): We could probably call this once per batch instead of once - // per node in the batch. - ScheduleAndDispatch(); - return; - } - - // Extract decision for this raylet. - auto decision = scheduling_policy_.SpillOver(remote_resources, - cluster_resource_map_[self_node_id_]); - std::unordered_set local_task_ids; - for (const auto &task_id : decision) { - // (See design_docs/task_states.rst for the state transition diagram.) - Task task; - TaskState state; - if (!local_queues_.RemoveTask(task_id, &task, &state)) { - return; - } - // Since we are spilling back from the ready and waiting queues, we need - // to unsubscribe the dependencies. - if (state != TaskState::INFEASIBLE) { - // Don't unsubscribe for infeasible tasks because we never subscribed in - // the first place. - dependency_manager_.RemoveTaskDependencies(task_id); - } - // Attempt to forward the task. If this fails to forward the task, - // the task will be resubmit locally. - ForwardTaskOrResubmit(task, node_id); - } + // If light resource usage report enabled, we update remote resources only when related + // resources map in heartbeat is not empty. + cluster_task_manager_->OnNodeResourceUsageUpdated(node_id, resource_data); } void NodeManager::ResourceUsageBatchReceived( @@ -1012,8 +865,8 @@ void NodeManager::ProcessNewClient(ClientConnection &client) { // A helper function to create a mapping from task scheduling class to // tasks with that class from a given list of tasks. -std::unordered_map> MakeTasksByClass( - const std::vector &tasks) { +std::unordered_map> NodeManager::MakeTasksByClass( + const std::vector &tasks) const { std::unordered_map> result; for (const auto &task : tasks) { auto spec = task.GetTaskSpecification(); @@ -1028,7 +881,6 @@ void NodeManager::DispatchTasks( // one class of tasks become stuck behind others in the queue, causing Ray to start // many workers. See #3644 for a more detailed description of this issue. std::vector> *> fair_order; - RAY_CHECK(new_scheduler_enabled_ == false); for (auto &it : tasks_by_class) { fair_order.emplace_back(&it); } @@ -1231,7 +1083,7 @@ void NodeManager::ProcessRegisterClientRequestMessage( // If the worker failed to register to Raylet, trigger task dispatching here to // allow new worker processes to be started (if capped by // maximum_startup_concurrency). - DispatchTasks(local_queues_.GetReadyTasksByClass()); + cluster_task_manager_->ScheduleAndDispatchTasks(); } } else { // Register the new driver. @@ -1306,14 +1158,10 @@ void NodeManager::HandleWorkerAvailable(const std::shared_ptr & } // Local resource availability changed: invoke scheduling policy for local node. - if (new_scheduler_enabled_) { - ScheduleAndDispatch(); - } else { - cluster_resource_map_[self_node_id_].SetLoadResources( - local_queues_.GetTotalResourceLoad()); - // Call task dispatch to assign work to the new worker. - DispatchTasks(local_queues_.GetReadyTasksByClass()); - } + cluster_resource_map_[self_node_id_].SetLoadResources( + local_queues_.GetTotalResourceLoad()); + + cluster_task_manager_->ScheduleAndDispatchTasks(); } void NodeManager::ProcessDisconnectClientMessage( @@ -1396,32 +1244,10 @@ void NodeManager::ProcessDisconnectClientMessage( worker_pool_.DisconnectWorker(worker); // Return the resources that were being used by this worker. - if (new_scheduler_enabled_) { - new_resource_scheduler_->FreeLocalTaskResources(worker->GetAllocatedInstances()); - worker->ClearAllocatedInstances(); - new_resource_scheduler_->FreeLocalTaskResources( - worker->GetLifetimeAllocatedInstances()); - worker->ClearLifetimeAllocatedInstances(); - } else { - auto const &task_resources = worker->GetTaskResourceIds(); - local_available_resources_.ReleaseConstrained( - task_resources, cluster_resource_map_[self_node_id_].GetTotalResources()); - cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet()); - worker->ResetTaskResourceIds(); - - auto const &lifetime_resources = worker->GetLifetimeResourceIds(); - local_available_resources_.ReleaseConstrained( - lifetime_resources, cluster_resource_map_[self_node_id_].GetTotalResources()); - cluster_resource_map_[self_node_id_].Release(lifetime_resources.ToResourceSet()); - worker->ResetLifetimeResourceIds(); - } + cluster_task_manager_->ReleaseWorkerResources(worker); // Since some resources may have been released, we can try to dispatch more tasks. YYY - if (new_scheduler_enabled_) { - ScheduleAndDispatch(); - } else { - DispatchTasks(local_queues_.GetReadyTasksByClass()); - } + cluster_task_manager_->ScheduleAndDispatchTasks(); } else if (is_driver) { // The client is a driver. const auto job_id = worker->GetAssignedJobId(); @@ -1596,12 +1422,6 @@ void NodeManager::ProcessSubmitTaskMessage(const uint8_t *message_data) { SubmitTask(Task(task_message)); } -void NodeManager::ScheduleAndDispatch() { - RAY_CHECK(new_scheduler_enabled_); - cluster_task_manager_->SchedulePendingTasks(); - cluster_task_manager_->DispatchScheduledTasksToWorkers(worker_pool_, leased_workers_); -} - void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest &request, rpc::RequestWorkerLeaseReply *reply, rpc::SendReplyCallback send_reply_callback) { @@ -1632,77 +1452,7 @@ void NodeManager::HandleRequestWorkerLease(const rpc::RequestWorkerLeaseRequest worker_pool_.PrestartWorkers(task_spec, request.backlog_size()); } - if (new_scheduler_enabled_) { - auto task_spec = task.GetTaskSpecification(); - cluster_task_manager_->QueueTask(task, reply, [send_reply_callback]() { - send_reply_callback(Status::OK(), nullptr, nullptr); - }); - ScheduleAndDispatch(); - return; - } - - // Override the task dispatch to call back to the client instead of executing the - // task directly on the worker. - TaskID task_id = task.GetTaskSpecification().TaskId(); - rpc::Address owner_address = task.GetTaskSpecification().CallerAddress(); - task.OnDispatchInstead( - [this, owner_address, reply, send_reply_callback]( - const std::shared_ptr granted, const std::string &address, int port, - const WorkerID &worker_id, const ResourceIdSet &resource_ids) { - auto worker = std::static_pointer_cast(granted); - uint32_t worker_pid = static_cast(worker->GetProcess().GetId()); - - reply->mutable_worker_address()->set_ip_address(address); - reply->mutable_worker_address()->set_port(port); - reply->mutable_worker_address()->set_worker_id(worker_id.Binary()); - reply->mutable_worker_address()->set_raylet_id(self_node_id_.Binary()); - reply->set_worker_pid(worker_pid); - for (const auto &mapping : resource_ids.AvailableResources()) { - auto resource = reply->add_resource_mapping(); - resource->set_name(mapping.first); - for (const auto &id : mapping.second.WholeIds()) { - auto rid = resource->add_resource_ids(); - rid->set_index(id); - rid->set_quantity(1.0); - } - for (const auto &id : mapping.second.FractionalIds()) { - auto rid = resource->add_resource_ids(); - rid->set_index(id.first); - rid->set_quantity(id.second.ToDouble()); - } - } - - auto reply_failure_handler = [this, worker_id]() { - RAY_LOG(WARNING) - << "Failed to reply to GCS server, because it might have restarted. GCS " - "cannot obtain the information of the leased worker, so we need to " - "release the leased worker to avoid leakage."; - leased_workers_.erase(worker_id); - metrics_num_task_executed_ -= 1; - }; - metrics_num_task_executed_ += 1; - send_reply_callback(Status::OK(), nullptr, reply_failure_handler); - RAY_CHECK(leased_workers_.find(worker_id) == leased_workers_.end()) - << "Worker is already leased out " << worker_id; - - leased_workers_[worker_id] = worker; - }); - task.OnSpillbackInstead( - [this, reply, task_id, send_reply_callback](const NodeID &spillback_to, - const std::string &address, int port) { - RAY_LOG(DEBUG) << "Worker lease request SPILLBACK " << task_id; - reply->mutable_retry_at_raylet_address()->set_ip_address(address); - reply->mutable_retry_at_raylet_address()->set_port(port); - reply->mutable_retry_at_raylet_address()->set_raylet_id(spillback_to.Binary()); - metrics_num_task_spilled_back_ += 1; - send_reply_callback(Status::OK(), nullptr, nullptr); - }); - task.OnCancellationInstead([reply, task_id, send_reply_callback]() { - RAY_LOG(DEBUG) << "Task lease request canceled " << task_id; - reply->set_canceled(true); - send_reply_callback(Status::OK(), nullptr, nullptr); - }); - SubmitTask(task); + cluster_task_manager_->QueueAndScheduleTask(task, reply, send_reply_callback); } void NodeManager::HandlePrepareBundleResources( @@ -1715,12 +1465,6 @@ void NodeManager::HandlePrepareBundleResources( auto prepared = placement_group_resource_manager_->PrepareBundle(bundle_spec); reply->set_success(prepared); send_reply_callback(Status::OK(), nullptr, nullptr); - - if (!new_scheduler_enabled_) { - // Call task dispatch to assign work to the new group. - TryLocalInfeasibleTaskScheduling(); - DispatchTasks(local_queues_.GetReadyTasksByClass()); - } } void NodeManager::HandleCommitBundleResources( @@ -1732,15 +1476,8 @@ void NodeManager::HandleCommitBundleResources( placement_group_resource_manager_->CommitBundle(bundle_spec); send_reply_callback(Status::OK(), nullptr, nullptr); - if (new_scheduler_enabled_) { - // Schedule in case a lease request for this placement group arrived before the commit - // message. - ScheduleAndDispatch(); - } else { - // Call task dispatch to assign work to the new group. - TryLocalInfeasibleTaskScheduling(); - DispatchTasks(local_queues_.GetReadyTasksByClass()); - } + cluster_task_manager_->ScheduleInfeasibleTasks(); + cluster_task_manager_->ScheduleAndDispatchTasks(); } void NodeManager::HandleCancelResourceReserve( @@ -1774,17 +1511,8 @@ void NodeManager::HandleCancelResourceReserve( // Return bundle resources. placement_group_resource_manager_->ReturnBundle(bundle_spec); - - if (new_scheduler_enabled_) { - // Schedule in case a lease request for this placement group arrived before the commit - // message. - ScheduleAndDispatch(); - } else { - // Call task dispatch to assign work to the new group. - TryLocalInfeasibleTaskScheduling(); - DispatchTasks(local_queues_.GetReadyTasksByClass()); - } - + cluster_task_manager_->ScheduleInfeasibleTasks(); + cluster_task_manager_->ScheduleAndDispatchTasks(); send_reply_callback(Status::OK(), nullptr, nullptr); } @@ -1807,9 +1535,7 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request, if (worker->IsBlocked()) { HandleDirectCallTaskUnblocked(worker); } - if (new_scheduler_enabled_) { - cluster_task_manager_->HandleTaskFinished(worker); - } + cluster_task_manager_->ReturnWorkerResources(worker); HandleWorkerAvailable(worker); } } else { @@ -1847,38 +1573,7 @@ void NodeManager::HandleCancelWorkerLease(const rpc::CancelWorkerLeaseRequest &r rpc::CancelWorkerLeaseReply *reply, rpc::SendReplyCallback send_reply_callback) { const TaskID task_id = TaskID::FromBinary(request.task_id()); - Task removed_task; - TaskState removed_task_state; - bool canceled; - if (new_scheduler_enabled_) { - canceled = cluster_task_manager_->CancelTask(task_id); - if (!canceled) { - // There are 2 cases here. - // 1. We haven't received the lease request yet. It's the caller's job to - // retry the cancellation once we've received the request. - // 2. We have already granted the lease. The caller is now responsible - // for returning the lease, not cancelling it. - } - } else { - canceled = local_queues_.RemoveTask(task_id, &removed_task, &removed_task_state); - if (!canceled) { - // We do not have the task. This could be because we haven't received the - // lease request yet, or because we already granted the lease request and - // it has already been returned. - } else { - if (removed_task.OnDispatch()) { - // We have not yet granted the worker lease. Cancel it now. - removed_task.OnCancellation()(); - if (removed_task_state == TaskState::WAITING) { - dependency_manager_.RemoveTaskDependencies(task_id); - } - } else { - // We already granted the worker lease and sent the reply. Re-queue the - // task and wait for the requester to return the leased worker. - local_queues_.QueueTasks({removed_task}, removed_task_state); - } - } - } + bool canceled = cluster_task_manager_->CancelTask(task_id); // The task cancellation failed if we did not have the task queued, since // this means that we may not have received the task request yet. It is // successful if we did have the task queued, since we have now replied to @@ -2061,36 +1756,15 @@ void NodeManager::SubmitTask(const Task &task) { void NodeManager::HandleDirectCallTaskBlocked( const std::shared_ptr &worker, bool release_resources) { - if (new_scheduler_enabled_) { - if (!worker || worker->IsBlocked() || !release_resources) { - return; - } - std::vector cpu_instances; - if (worker->GetAllocatedInstances() != nullptr) { - cpu_instances = worker->GetAllocatedInstances()->GetCPUInstancesDouble(); - } - if (cpu_instances.size() > 0) { - std::vector overflow_cpu_instances = - new_resource_scheduler_->AddCPUResourceInstances(cpu_instances); - for (unsigned int i = 0; i < overflow_cpu_instances.size(); i++) { - RAY_CHECK(overflow_cpu_instances[i] == 0) << "Should not be overflow"; - } - worker->MarkBlocked(); - } - ScheduleAndDispatch(); - return; - } - - if (!worker || worker->GetAssignedTaskId().IsNil() || worker->IsBlocked() || + if (!worker || worker->IsBlocked() || worker->GetAssignedTaskId().IsNil() || !release_resources) { return; // The worker may have died or is no longer processing the task. } - auto const cpu_resource_ids = worker->ReleaseTaskCpuResources(); - local_available_resources_.Release(cpu_resource_ids); - cluster_resource_map_[self_node_id_].Release(cpu_resource_ids.ToResourceSet()); - worker->MarkBlocked(); - DispatchTasks(local_queues_.GetReadyTasksByClass()); + if (cluster_task_manager_->ReleaseCpuResourcesFromUnblockedWorker(worker)) { + worker->MarkBlocked(); + } + cluster_task_manager_->ScheduleAndDispatchTasks(); } void NodeManager::HandleDirectCallTaskUnblocked( @@ -2098,57 +1772,17 @@ void NodeManager::HandleDirectCallTaskUnblocked( if (!worker || worker->GetAssignedTaskId().IsNil()) { return; // The worker may have died or is no longer processing the task. } - TaskID task_id = worker->GetAssignedTaskId(); // First, always release task dependencies. This ensures we don't leak resources even // if we don't need to unblock the worker below. dependency_manager_.CancelGetRequest(worker->WorkerId()); - if (new_scheduler_enabled_) { - // Important: avoid double unblocking if the unblock RPC finishes after task end. - if (!worker || !worker->IsBlocked()) { - return; - } - std::vector cpu_instances; - if (worker->GetAllocatedInstances() != nullptr) { - cpu_instances = worker->GetAllocatedInstances()->GetCPUInstancesDouble(); - } - if (cpu_instances.size() > 0) { - // Important: we allow going negative here, since otherwise you can use infinite - // CPU resources by repeatedly blocking / unblocking a task. By allowing it to go - // negative, at most one task can "borrow" this worker's resources. - new_resource_scheduler_->SubtractCPUResourceInstances( - cpu_instances, /*allow_going_negative=*/true); + if (worker->IsBlocked()) { + if (cluster_task_manager_->ReturnCpuResourcesToBlockedWorker(worker)) { worker->MarkUnblocked(); } - ScheduleAndDispatch(); - return; + cluster_task_manager_->ScheduleAndDispatchTasks(); } - - if (!worker->IsBlocked()) { - return; // Don't need to unblock the worker. - } - - Task task = local_queues_.GetTaskOfState(task_id, TaskState::RUNNING); - const auto required_resources = task.GetTaskSpecification().GetRequiredResources(); - const ResourceSet cpu_resources = required_resources.GetNumCpus(); - bool oversubscribed = !local_available_resources_.Contains(cpu_resources); - if (!oversubscribed) { - // Reacquire the CPU resources for the worker. Note that care needs to be - // taken if the user is using the specific CPU IDs since the IDs that we - // reacquire here may be different from the ones that the task started with. - auto const resource_ids = local_available_resources_.Acquire(cpu_resources); - worker->AcquireTaskCpuResources(resource_ids); - cluster_resource_map_[self_node_id_].Acquire(cpu_resources); - } else { - // In this case, we simply don't reacquire the CPU resources for the worker. - // The worker can keep running and when the task finishes, it will simply - // not have any CPU resources to release. - RAY_LOG(WARNING) - << "Resources oversubscribed: " - << cluster_resource_map_[self_node_id_].GetAvailableResources().ToString(); - } - worker->MarkUnblocked(); } void NodeManager::AsyncResolveObjects( @@ -2348,23 +1982,7 @@ bool NodeManager::FinishAssignedTask(const std::shared_ptr &wor RAY_LOG(DEBUG) << "Finished task " << task_id; Task task; - if (new_scheduler_enabled_) { - task = worker.GetAssignedTask(); - // leased_workers_.erase(worker.WorkerId()); // Maybe RAY_CHECK ? - if (worker.GetAllocatedInstances() != nullptr) { - cluster_task_manager_->HandleTaskFinished(worker_ptr); - } - } else { - // (See design_docs/task_states.rst for the state transition diagram.) - RAY_CHECK(local_queues_.RemoveTask(task_id, &task)) << task_id; - - // Release task's resources. The worker's lifetime resources are still held. - auto const &task_resources = worker.GetTaskResourceIds(); - local_available_resources_.ReleaseConstrained( - task_resources, cluster_resource_map_[self_node_id_].GetTotalResources()); - cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet()); - worker.ResetTaskResourceIds(); - } + cluster_task_manager_->TaskFinished(worker_ptr, &task); const auto &spec = task.GetTaskSpecification(); // if ((spec.IsActorCreationTask())) { @@ -2479,36 +2097,7 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) { RAY_LOG(DEBUG) << "Object local " << object_id << ", " << " on " << self_node_id_ << ", " << ready_task_ids.size() << " tasks ready"; - // Transition the tasks whose dependencies are now fulfilled to the ready state. - if (new_scheduler_enabled_) { - cluster_task_manager_->TasksUnblocked(ready_task_ids); - ScheduleAndDispatch(); - } else { - if (ready_task_ids.size() > 0) { - std::unordered_set ready_task_id_set(ready_task_ids.begin(), - ready_task_ids.end()); - - // First filter out the tasks that should not be moved to READY. - local_queues_.FilterState(ready_task_id_set, TaskState::BLOCKED); - local_queues_.FilterState(ready_task_id_set, TaskState::RUNNING); - local_queues_.FilterState(ready_task_id_set, TaskState::DRIVER); - - // Make sure that the remaining tasks are all WAITING or direct call - // actors. - auto ready_task_id_set_copy = ready_task_id_set; - local_queues_.FilterState(ready_task_id_set_copy, TaskState::WAITING); - // Filter out direct call actors. These are not tracked by the raylet and - // their assigned task ID is the actor ID. - for (const auto &id : ready_task_id_set_copy) { - ready_task_id_set.erase(id); - } - - // Queue and dispatch the tasks that are ready to run (i.e., WAITING). - auto ready_tasks = local_queues_.RemoveTasks(ready_task_id_set); - local_queues_.QueueTasks(ready_tasks, TaskState::READY); - DispatchTasks(MakeTasksByClass(ready_tasks)); - } - } + cluster_task_manager_->TasksUnblocked(ready_task_ids); } bool NodeManager::IsActorCreationTask(const TaskID &task_id) { @@ -2536,50 +2125,7 @@ void NodeManager::HandleObjectMissing(const ObjectID &object_id) { } RAY_LOG(DEBUG) << result.str(); - // We don't need to do anything if the new scheduler is enabled because tasks - // will get moved back to waiting once they reach the front of the dispatch - // queue. - if (!new_scheduler_enabled_) { - // Transition any tasks that were in the runnable state and are dependent on - // this object to the waiting state. - if (!waiting_task_ids.empty()) { - std::unordered_set waiting_task_id_set(waiting_task_ids.begin(), - waiting_task_ids.end()); - - // NOTE(zhijunfu): For direct actors, the worker is initially assigned actor - // creation task ID, which will not be reset after the task finishes. And later - // tasks of this actor will reuse this task ID to require objects from plasma with - // FetchOrReconstruct, since direct actor task IDs are not known to raylet. - // To support actor reconstruction for direct actor, raylet marks actor creation - // task as completed and removes it from `local_queues_` when it receives `TaskDone` - // message from worker. This is necessary because the actor creation task will be - // re-submitted during reconstruction, if the task is not removed previously, the - // new submitted task will be marked as duplicate and thus ignored. So here we check - // for direct actor creation task explicitly to allow this case. - auto iter = waiting_task_id_set.begin(); - while (iter != waiting_task_id_set.end()) { - if (IsActorCreationTask(*iter)) { - RAY_LOG(DEBUG) << "Ignoring direct actor creation task " << *iter - << " when handling object missing for " << object_id; - iter = waiting_task_id_set.erase(iter); - } else { - ++iter; - } - } - - // First filter out any tasks that can't be transitioned to READY. These - // are running workers or drivers, now blocked in a get. - local_queues_.FilterState(waiting_task_id_set, TaskState::RUNNING); - local_queues_.FilterState(waiting_task_id_set, TaskState::DRIVER); - // Transition the tasks back to the waiting state. They will be made - // runnable once the deleted object becomes available again. - local_queues_.MoveTasks(waiting_task_id_set, TaskState::READY, TaskState::WAITING); - RAY_CHECK(waiting_task_id_set.empty()); - // Moving ready tasks to waiting may have changed the load, making space for placing - // new tasks locally. - ScheduleTasks(cluster_resource_map_); - } - } + cluster_task_manager_->OnObjectMissing(object_id, waiting_task_ids); } void NodeManager::ForwardTaskOrResubmit(const Task &task, const NodeID &node_manager_id) { @@ -2750,7 +2296,7 @@ std::string NodeManager::DebugString() const { result << "\nInitialConfigResources: " << initial_config_.resource_config.ToString(); if (cluster_task_manager_ != nullptr) { result << "\nClusterTaskManager:\n"; - result << cluster_task_manager_->DebugString(); + result << cluster_task_manager_->DebugStr(); } result << "\nClusterResources:"; for (auto &pair : cluster_resource_map_) { @@ -2838,9 +2384,8 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_request, rpc::GetNodeStatsReply *reply, rpc::SendReplyCallback send_reply_callback) { - if (new_scheduler_enabled_) { - cluster_task_manager_->FillPendingActorInfo(reply); - } + cluster_task_manager_->FillPendingActorInfo(reply); + for (const auto &task : local_queues_.GetTasks(TaskState::INFEASIBLE)) { if (task.GetTaskSpecification().IsActorCreationTask()) { auto infeasible_task = reply->add_infeasible_tasks(); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 402c11c1e9b5..4b9b98d19880 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -32,6 +32,8 @@ #include "ray/raylet/scheduling/scheduling_ids.h" #include "ray/raylet/scheduling/cluster_resource_scheduler.h" #include "ray/raylet/scheduling/cluster_task_manager.h" +#include "ray/raylet/scheduling/old_cluster_resource_scheduler.h" +#include "ray/raylet/scheduling/cluster_task_manager_interface.h" #include "ray/raylet/scheduling_policy.h" #include "ray/raylet/scheduling_queue.h" #include "ray/raylet/reconstruction_policy.h" @@ -110,7 +112,8 @@ struct NodeManagerConfig { int64_t min_spilling_size; }; -class NodeManager : public rpc::NodeManagerServiceHandler { +class NodeManager : public rpc::NodeManagerServiceHandler, + public ClusterTaskManagerInterface { public: /// Create a node manager. /// @@ -613,13 +616,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Dispatch tasks to available workers. void DispatchScheduledTasksToWorkers(); - /// For the pending task at the head of tasks_to_schedule_, return a node - /// in the system (local or remote) that has enough resources available to - /// run the task, if any such node exist. - /// Repeat the process as long as we can schedule a task. - /// NEW SCHEDULER_FUNCTION - void ScheduleAndDispatch(); - /// Whether a task is an actor creation task. bool IsActorCreationTask(const TaskID &task_id); @@ -635,6 +631,122 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// \param task Task that is infeasible void PublishInfeasibleTaskError(const Task &task) const; + std::unordered_map> MakeTasksByClass( + const std::vector &tasks) const; + + /////////////////////////////////////////////////////////////////////////////////////// + //////////////////// Begin of the override methods of ClusterTaskManager ////////////// + // The following methods are defined in node_manager.task.cc instead of node_manager.cc + + /// Return the resources that were being used by this worker. + void ReleaseWorkerResources(std::shared_ptr worker) override; + + /// When a task is blocked in ray.get or ray.wait, the worker who is executing the task + /// should give up the CPU resources allocated for the running task for the time being + /// and the worker itself should also be marked as blocked. + /// + /// \param worker The worker who will give up the CPU resources. + /// \return true if the cpu resources of the specified worker are released successfully, + /// else false. + bool ReleaseCpuResourcesFromUnblockedWorker( + std::shared_ptr worker) override; + + /// When a task is no longer blocked in a ray.get or ray.wait, the CPU resources that + /// the worker gave up should be returned to it. + /// + /// \param worker The blocked worker. + /// \return true if the cpu resources are returned back to the specified worker, else + /// false. + bool ReturnCpuResourcesToBlockedWorker( + std::shared_ptr worker) override; + + // Schedule and dispatch tasks. + void ScheduleAndDispatchTasks() override; + + /// Move tasks from waiting to ready for dispatch. Called when a task's + /// dependencies are resolved. + /// + /// \param readyIds: The tasks which are now ready to be dispatched. + void TasksUnblocked(const std::vector &ready_ids) override; + + /// Populate the relevant parts of the heartbeat table. This is intended for + /// sending raylet <-> gcs heartbeats. In particular, this should fill in + /// resource_load and resource_load_by_shape. + /// + /// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only + /// fields used. + void FillResourceUsage(std::shared_ptr data) override; + + /// Populate the list of pending or infeasible actor tasks for node stats. + /// + /// \param Output parameter. + void FillPendingActorInfo(rpc::GetNodeStatsReply *reply) const override; + + /// Return the finished task and relase the worker resources. + /// This method will be removed and can be replaced by `ReleaseWorkerResources` directly + /// once we remove the legacy scheduler. + /// + /// \param worker: The worker which was running the task. + /// \param task: Output parameter. + void TaskFinished(std::shared_ptr worker, Task *task) override; + + /// Return worker resources. + /// This method will be removed and can be replaced by `ReleaseWorkerResources` directly + /// once we remove the legacy scheduler. + /// + /// \param worker: The worker which was running the task. + void ReturnWorkerResources(std::shared_ptr worker) override; + + /// Attempt to cancel an already queued task. + /// + /// \param task_id: The id of the task to remove. + /// + /// \return True if task was successfully removed. This function will return + /// false if the task is already running. + bool CancelTask(const TaskID &task_id) override; + + /// Queue task and schedule. This hanppens when processing the worker lease request. + /// + /// \param task: The incoming task to be queued and scheduled. + /// \param reply: The reply of the lease request. + /// \param send_reply_callback: The function used during dispatching. + void QueueAndScheduleTask(const Task &task, rpc::RequestWorkerLeaseReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + + /// Schedule infeasible tasks. + void ScheduleInfeasibleTasks() override; + + /// Return if any tasks are pending resource acquisition. + /// + /// \param[in] exemplar An example task that is deadlocking. + /// \param[in] num_pending_actor_creation Number of pending actor creation tasks. + /// \param[in] num_pending_tasks Number of pending tasks. + /// \param[in] any_pending True if there's any pending exemplar. + /// \return True if any progress is any tasks are pending. + bool AnyPendingTasks(Task *exemplar, bool *any_pending, int *num_pending_actor_creation, + int *num_pending_tasks) const override; + + /// Handle the resource usage updated event of the specified node. + /// + /// \param node_id ID of the node which resources are updated. + /// \param resource_data The node resources. + void OnNodeResourceUsageUpdated(const NodeID &node_id, + const rpc::ResourcesData &resource_data) override; + + /// Handle the object missing event. + /// + /// \param object_id ID of the missing object. + /// \param waiting_task_ids IDs of tasks that are waitting for the specified missing + /// object. + void OnObjectMissing(const ObjectID &object_id, + const std::vector &waiting_task_ids) override; + + /// The helper to dump the debug state of the cluster task manater. + std::string DebugStr() const override; + + //////////////////// End of the Override of ClusterTaskManager ////////////////////// + /////////////////////////////////////////////////////////////////////////////////////// + /// ID of this node. NodeID self_node_id_; boost::asio::io_service &io_service_; @@ -732,9 +844,6 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// lease on. absl::flat_hash_map> leased_workers_by_owner_; - /// Whether new schedule is enabled. - const bool new_scheduler_enabled_; - /// Whether to report the worker's backlog size in the GCS heartbeat. const bool report_worker_backlog_; @@ -760,8 +869,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// responsible for maintaining a view of the cluster state w.r.t resource /// usage. ClusterTaskManager is responsible for queuing, spilling back, and /// dispatching tasks. - std::shared_ptr new_resource_scheduler_; - std::shared_ptr cluster_task_manager_; + std::shared_ptr cluster_resource_scheduler_; + std::shared_ptr cluster_task_manager_; absl::flat_hash_map> pinned_objects_; diff --git a/src/ray/raylet/node_manager.task.cc b/src/ray/raylet/node_manager.task.cc new file mode 100644 index 000000000000..150ecb02d2ba --- /dev/null +++ b/src/ray/raylet/node_manager.task.cc @@ -0,0 +1,357 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/raylet/node_manager.h" + +namespace ray { +namespace raylet { + +void NodeManager::ReleaseWorkerResources(std::shared_ptr worker) { + auto const &task_resources = worker->GetTaskResourceIds(); + local_available_resources_.ReleaseConstrained( + task_resources, cluster_resource_map_[self_node_id_].GetTotalResources()); + cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet()); + worker->ResetTaskResourceIds(); + + auto const &lifetime_resources = worker->GetLifetimeResourceIds(); + local_available_resources_.ReleaseConstrained( + lifetime_resources, cluster_resource_map_[self_node_id_].GetTotalResources()); + cluster_resource_map_[self_node_id_].Release(lifetime_resources.ToResourceSet()); + worker->ResetLifetimeResourceIds(); +} + +bool NodeManager::ReleaseCpuResourcesFromUnblockedWorker( + std::shared_ptr worker) { + if (!worker || worker->IsBlocked()) { + return false; + } + + auto const cpu_resource_ids = worker->ReleaseTaskCpuResources(); + local_available_resources_.Release(cpu_resource_ids); + cluster_resource_map_[self_node_id_].Release(cpu_resource_ids.ToResourceSet()); + return true; +} + +bool NodeManager::ReturnCpuResourcesToBlockedWorker( + std::shared_ptr worker) { + if (!worker || !worker->IsBlocked()) { + return false; + } + + const TaskID &task_id = worker->GetAssignedTaskId(); + const Task &task = local_queues_.GetTaskOfState(task_id, TaskState::RUNNING); + const auto &required_resources = task.GetTaskSpecification().GetRequiredResources(); + const ResourceSet cpu_resources = required_resources.GetNumCpus(); + bool oversubscribed = !local_available_resources_.Contains(cpu_resources); + if (!oversubscribed) { + // Reacquire the CPU resources for the worker. Note that care needs to be + // taken if the user is using the specific CPU IDs since the IDs that we + // reacquire here may be different from the ones that the task started with. + auto const resource_ids = local_available_resources_.Acquire(cpu_resources); + worker->AcquireTaskCpuResources(resource_ids); + cluster_resource_map_[self_node_id_].Acquire(cpu_resources); + } else { + // In this case, we simply don't reacquire the CPU resources for the worker. + // The worker can keep running and when the task finishes, it will simply + // not have any CPU resources to release. + RAY_LOG(WARNING) + << "Resources oversubscribed: " + << cluster_resource_map_[self_node_id_].GetAvailableResources().ToString(); + } + return true; +} + +void NodeManager::ScheduleAndDispatchTasks() { + DispatchTasks(local_queues_.GetReadyTasksByClass()); +} + +void NodeManager::TasksUnblocked(const std::vector &ready_ids) { + if (ready_ids.empty()) { + return; + } + + std::unordered_set ready_task_id_set(ready_ids.begin(), ready_ids.end()); + + // First filter out the tasks that should not be moved to READY. + local_queues_.FilterState(ready_task_id_set, TaskState::BLOCKED); + local_queues_.FilterState(ready_task_id_set, TaskState::RUNNING); + local_queues_.FilterState(ready_task_id_set, TaskState::DRIVER); + + // Make sure that the remaining tasks are all WAITING or direct call + // actors. + auto ready_task_id_set_copy = ready_task_id_set; + local_queues_.FilterState(ready_task_id_set_copy, TaskState::WAITING); + // Filter out direct call actors. These are not tracked by the raylet and + // their assigned task ID is the actor ID. + for (const auto &id : ready_task_id_set_copy) { + ready_task_id_set.erase(id); + } + + // Queue and dispatch the tasks that are ready to run (i.e., WAITING). + auto ready_tasks = local_queues_.RemoveTasks(ready_task_id_set); + local_queues_.QueueTasks(ready_tasks, TaskState::READY); + DispatchTasks(MakeTasksByClass(ready_tasks)); +} + +void NodeManager::FillResourceUsage(std::shared_ptr resources_data) { + SchedulingResources &local_resources = cluster_resource_map_[self_node_id_]; + local_resources.SetLoadResources(local_queues_.GetTotalResourceLoad()); + auto last_heartbeat_resources = gcs_client_->NodeResources().GetLastResourceUsage(); + if (!last_heartbeat_resources->GetLoadResources().IsEqual( + local_resources.GetLoadResources())) { + resources_data->set_resource_load_changed(true); + for (const auto &resource_pair : + local_resources.GetLoadResources().GetResourceMap()) { + (*resources_data->mutable_resource_load())[resource_pair.first] = + resource_pair.second; + } + last_heartbeat_resources->SetLoadResources( + ResourceSet(local_resources.GetLoadResources())); + } + + // Add resource load by shape. This will be used by the new autoscaler. + auto resource_load = local_queues_.GetResourceLoadByShape( + RayConfig::instance().max_resource_shapes_per_load_report()); + resources_data->mutable_resource_load_by_shape()->Swap(&resource_load); +} + +void NodeManager::TaskFinished(std::shared_ptr worker, Task *task) { + RAY_CHECK(worker != nullptr && task != nullptr); + const auto &task_id = worker->GetAssignedTaskId(); + // (See design_docs/task_states.rst for the state transition diagram.) + RAY_CHECK(local_queues_.RemoveTask(task_id, task)) << task_id; + + // Release task's resources. The worker's lifetime resources are still held. + auto const &task_resources = worker->GetTaskResourceIds(); + local_available_resources_.ReleaseConstrained( + task_resources, cluster_resource_map_[self_node_id_].GetTotalResources()); + cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet()); + worker->ResetTaskResourceIds(); +} + +void NodeManager::ReturnWorkerResources(std::shared_ptr worker) { + // Do nothing. +} + +bool NodeManager::CancelTask(const TaskID &task_id) { + Task removed_task; + TaskState removed_task_state; + bool canceled = local_queues_.RemoveTask(task_id, &removed_task, &removed_task_state); + if (!canceled) { + // We do not have the task. This could be because we haven't received the + // lease request yet, or because we already granted the lease request and + // it has already been returned. + } else { + if (removed_task.OnDispatch()) { + // We have not yet granted the worker lease. Cancel it now. + removed_task.OnCancellation()(); + if (removed_task_state == TaskState::WAITING) { + dependency_manager_.RemoveTaskDependencies(task_id); + } + } else { + // We already granted the worker lease and sent the reply. Re-queue the + // task and wait for the requester to return the leased worker. + local_queues_.QueueTasks({removed_task}, removed_task_state); + } + } + return canceled; +} + +void NodeManager::QueueAndScheduleTask(const Task &task, + rpc::RequestWorkerLeaseReply *reply, + rpc::SendReplyCallback send_reply_callback) { + // Override the task dispatch to call back to the client instead of executing the + // task directly on the worker. + TaskID task_id = task.GetTaskSpecification().TaskId(); + rpc::Address owner_address = task.GetTaskSpecification().CallerAddress(); + Task &mutable_task = const_cast(task); + mutable_task.OnDispatchInstead( + [this, owner_address, reply, send_reply_callback]( + const std::shared_ptr granted, const std::string &address, int port, + const WorkerID &worker_id, const ResourceIdSet &resource_ids) { + auto worker = std::static_pointer_cast(granted); + uint32_t worker_pid = static_cast(worker->GetProcess().GetId()); + + reply->mutable_worker_address()->set_ip_address(address); + reply->mutable_worker_address()->set_port(port); + reply->mutable_worker_address()->set_worker_id(worker_id.Binary()); + reply->mutable_worker_address()->set_raylet_id(self_node_id_.Binary()); + reply->set_worker_pid(worker_pid); + for (const auto &mapping : resource_ids.AvailableResources()) { + auto resource = reply->add_resource_mapping(); + resource->set_name(mapping.first); + for (const auto &id : mapping.second.WholeIds()) { + auto rid = resource->add_resource_ids(); + rid->set_index(id); + rid->set_quantity(1.0); + } + for (const auto &id : mapping.second.FractionalIds()) { + auto rid = resource->add_resource_ids(); + rid->set_index(id.first); + rid->set_quantity(id.second.ToDouble()); + } + } + + auto reply_failure_handler = [this, worker_id]() { + RAY_LOG(WARNING) + << "Failed to reply to GCS server, because it might have restarted. GCS " + "cannot obtain the information of the leased worker, so we need to " + "release the leased worker to avoid leakage."; + leased_workers_.erase(worker_id); + metrics_num_task_executed_ -= 1; + }; + metrics_num_task_executed_ += 1; + send_reply_callback(Status::OK(), nullptr, reply_failure_handler); + RAY_CHECK(leased_workers_.find(worker_id) == leased_workers_.end()) + << "Worker is already leased out " << worker_id; + + leased_workers_[worker_id] = worker; + }); + mutable_task.OnSpillbackInstead( + [this, reply, task_id, send_reply_callback](const NodeID &spillback_to, + const std::string &address, int port) { + RAY_LOG(DEBUG) << "Worker lease request SPILLBACK " << task_id; + reply->mutable_retry_at_raylet_address()->set_ip_address(address); + reply->mutable_retry_at_raylet_address()->set_port(port); + reply->mutable_retry_at_raylet_address()->set_raylet_id(spillback_to.Binary()); + metrics_num_task_spilled_back_ += 1; + send_reply_callback(Status::OK(), nullptr, nullptr); + }); + mutable_task.OnCancellationInstead([reply, task_id, send_reply_callback]() { + RAY_LOG(DEBUG) << "Task lease request canceled " << task_id; + reply->set_canceled(true); + send_reply_callback(Status::OK(), nullptr, nullptr); + }); + SubmitTask(task); +} + +void NodeManager::ScheduleInfeasibleTasks() { TryLocalInfeasibleTaskScheduling(); } + +/// Return if any tasks are pending resource acquisition. +/// +/// \param[in] exemplar An example task that is deadlocking. +/// \param[in] num_pending_actor_creation Number of pending actor creation tasks. +/// \param[in] num_pending_tasks Number of pending tasks. +/// \param[in] any_pending True if there's any pending exemplar. +/// \return True if any progress is any tasks are pending. +bool NodeManager::AnyPendingTasks(Task *exemplar, bool *any_pending, + int *num_pending_actor_creation, + int *num_pending_tasks) const { + // See if any tasks are blocked trying to acquire resources. + for (const auto &task : local_queues_.GetTasks(TaskState::READY)) { + const TaskSpecification &spec = task.GetTaskSpecification(); + if (spec.IsActorCreationTask()) { + *num_pending_actor_creation += 1; + } else { + *num_pending_tasks += 1; + } + if (!any_pending) { + *exemplar = task; + *any_pending = true; + } + } + return *any_pending; +} + +void NodeManager::OnNodeResourceUsageUpdated(const NodeID &node_id, + const rpc::ResourcesData &resource_data) { + auto it = cluster_resource_map_.find(node_id); + if (it == cluster_resource_map_.end()) { + return; + } + + // Extract decision for this raylet. + auto decision = + scheduling_policy_.SpillOver(it->second, cluster_resource_map_[self_node_id_]); + std::unordered_set local_task_ids; + for (const auto &task_id : decision) { + // (See design_docs/task_states.rst for the state transition diagram.) + Task task; + TaskState state; + if (!local_queues_.RemoveTask(task_id, &task, &state)) { + return; + } + // Since we are spilling back from the ready and waiting queues, we need + // to unsubscribe the dependencies. + if (state != TaskState::INFEASIBLE) { + // Don't unsubscribe for infeasible tasks because we never subscribed in + // the first place. + dependency_manager_.RemoveTaskDependencies(task_id); + } + // Attempt to forward the task. If this fails to forward the task, + // the task will be resubmit locally. + ForwardTaskOrResubmit(task, node_id); + } +} + +void NodeManager::FillPendingActorInfo(rpc::GetNodeStatsReply *reply) const { + // TODO(Shanly): Implement. +} + +void NodeManager::OnObjectMissing(const ObjectID &object_id, + const std::vector &waiting_task_ids) { + RAY_UNUSED(object_id); + // Transition any tasks that were in the runnable state and are dependent on + // this object to the waiting state. + if (!waiting_task_ids.empty()) { + std::unordered_set waiting_task_id_set(waiting_task_ids.begin(), + waiting_task_ids.end()); + + // NOTE(zhijunfu): For direct actors, the worker is initially assigned actor + // creation task ID, which will not be reset after the task finishes. And later + // tasks of this actor will reuse this task ID to require objects from plasma with + // FetchOrReconstruct, since direct actor task IDs are not known to raylet. + // To support actor reconstruction for direct actor, raylet marks actor creation + // task as completed and removes it from `local_queues_` when it receives `TaskDone` + // message from worker. This is necessary because the actor creation task will be + // re-submitted during reconstruction, if the task is not removed previously, the + // new submitted task will be marked as duplicate and thus ignored. So here we check + // for direct actor creation task explicitly to allow this case. + auto iter = waiting_task_id_set.begin(); + while (iter != waiting_task_id_set.end()) { + if (IsActorCreationTask(*iter)) { + RAY_LOG(DEBUG) << "Ignoring direct actor creation task " << *iter + << " when handling object missing for " << object_id; + iter = waiting_task_id_set.erase(iter); + } else { + ++iter; + } + } + + // First filter out any tasks that can't be transitioned to READY. These + // are running workers or drivers, now blocked in a get. + local_queues_.FilterState(waiting_task_id_set, TaskState::RUNNING); + local_queues_.FilterState(waiting_task_id_set, TaskState::DRIVER); + // Transition the tasks back to the waiting state. They will be made + // runnable once the deleted object becomes available again. + local_queues_.MoveTasks(waiting_task_id_set, TaskState::READY, TaskState::WAITING); + RAY_CHECK(waiting_task_id_set.empty()); + // Moving ready tasks to waiting may have changed the load, making space for placing + // new tasks locally. + ScheduleTasks(cluster_resource_map_); + } +} + +std::string NodeManager::DebugStr() const { + // As the NodeManager inherites from ClusterTaskManager and the + // `cluster_task_manager_->DebugString()` is invoked inside + // `NodeManager::DebugString()`, which will leads to infinite loop and cause stack + // overflow, so we should rename `ClusterTaskManager::DebugString` to + // `ClusterTaskManager::DebugStr` to avoid this. + return ""; +} + +} // namespace raylet + +} // namespace ray diff --git a/src/ray/raylet/placement_group_resource_manager.cc b/src/ray/raylet/placement_group_resource_manager.cc index ff5b327197af..fedcc607de38 100644 --- a/src/ray/raylet/placement_group_resource_manager.cc +++ b/src/ray/raylet/placement_group_resource_manager.cc @@ -229,7 +229,7 @@ void NewPlacementGroupResourceManager::ReturnBundle( // Return original resources to resource allocator `ClusterResourceScheduler`. auto original_resources = it->second->resources_; - cluster_resource_scheduler_->FreeLocalTaskResources(original_resources); + cluster_resource_scheduler_->ReleaseWorkerResources(original_resources); // Substract placement group resources from resource allocator // `ClusterResourceScheduler`. diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc index 90cf552cfd50..0532d470141e 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.cc @@ -14,6 +14,9 @@ #include "ray/raylet/scheduling/cluster_resource_scheduler.h" +#include "ray/common/grpc_util.h" +#include "ray/common/ray_config.h" + namespace ray { ClusterResourceScheduler::ClusterResourceScheduler( @@ -55,6 +58,44 @@ void ClusterResourceScheduler::AddOrUpdateNode(int64_t node_id, } } +bool ClusterResourceScheduler::UpdateNode(const std::string &node_id_string, + const rpc::ResourcesData &resource_data) { + auto node_id = string_to_int_map_.Insert(node_id_string); + if (!nodes_.contains(node_id)) { + return false; + } + + auto resources_total = MapFromProtobuf(resource_data.resources_total()); + auto resources_available = MapFromProtobuf(resource_data.resources_available()); + NodeResources node_resources = ResourceMapToNodeResources( + string_to_int_map_, resources_total, resources_available); + NodeResources local_view; + RAY_CHECK(GetNodeResources(node_id, &local_view)); + + if (resource_data.resources_total_size() > 0) { + for (size_t i = 0; i < node_resources.predefined_resources.size(); ++i) { + local_view.predefined_resources[i].total = + node_resources.predefined_resources[i].total; + } + for (auto &entry : node_resources.custom_resources) { + local_view.custom_resources[entry.first].total = entry.second.total; + } + } + + if (resource_data.resources_available_changed()) { + for (size_t i = 0; i < node_resources.predefined_resources.size(); ++i) { + local_view.predefined_resources[i].available = + node_resources.predefined_resources[i].available; + } + for (auto &entry : node_resources.custom_resources) { + local_view.custom_resources[entry.first].available = entry.second.available; + } + } + + AddOrUpdateNode(node_id, local_view); + return true; +} + bool ClusterResourceScheduler::RemoveNode(int64_t node_id) { auto it = nodes_.find(node_id); if (it == nodes_.end()) { @@ -324,6 +365,12 @@ bool ClusterResourceScheduler::GetNodeResources(int64_t node_id, } } +const NodeResources &ClusterResourceScheduler::GetLocalNodeResources() const { + const auto &node_it = nodes_.find(local_node_id_); + RAY_CHECK(node_it != nodes_.end()); + return node_it->second.GetLocalView(); +} + int64_t ClusterResourceScheduler::NumNodes() { return nodes_.size(); } void ClusterResourceScheduler::AddLocalResource(const std::string &resource_name, @@ -883,7 +930,7 @@ bool ClusterResourceScheduler::AllocateRemoteTaskResources( return SubtractRemoteNodeAvailableResources(node_id, task_request); } -void ClusterResourceScheduler::FreeLocalTaskResources( +void ClusterResourceScheduler::ReleaseWorkerResources( std::shared_ptr task_allocation) { if (task_allocation == nullptr || task_allocation->IsEmpty()) { return; diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler.h b/src/ray/raylet/scheduling/cluster_resource_scheduler.h index bce8b760ccf8..747fe6f6fba2 100644 --- a/src/ray/raylet/scheduling/cluster_resource_scheduler.h +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler.h @@ -22,10 +22,10 @@ #include "absl/container/flat_hash_set.h" #include "ray/common/task/scheduling_resources.h" #include "ray/raylet/scheduling/cluster_resource_data.h" +#include "ray/raylet/scheduling/cluster_resource_scheduler_interface.h" #include "ray/raylet/scheduling/fixed_point.h" #include "ray/raylet/scheduling/scheduling_ids.h" #include "ray/util/logging.h" - #include "src/ray/protobuf/gcs.pb.h" namespace ray { @@ -38,7 +38,7 @@ static std::unordered_set UnitInstanceResources{CPU, GPU, TPU}; /// Class encapsulating the cluster resources and the logic to assign /// tasks to nodes based on the task's constraints and the available /// resources at those nodes. -class ClusterResourceScheduler { +class ClusterResourceScheduler : public ClusterResourceSchedulerInterface { public: ClusterResourceScheduler(void){}; @@ -66,12 +66,19 @@ class ClusterResourceScheduler { const std::unordered_map &resource_map_total, const std::unordered_map &resource_map_available); + /// Update node resources. This hanppens when a node resource usage udpated. + /// + /// \param node_id_string ID of the node which resoruces need to be udpated. + /// \param resource_data The node resource data. + bool UpdateNode(const std::string &node_id_string, + const rpc::ResourcesData &resource_data) override; + /// Remove node from the cluster data structure. This happens /// when a node fails or it is removed from the cluster. /// /// \param ID of the node to be removed. bool RemoveNode(int64_t node_id); - bool RemoveNode(const std::string &node_id_string); + bool RemoveNode(const std::string &node_id_string) override; /// Check whether a task request is feasible on a given node. A node is /// feasible if it has the total resources needed to eventually execute the @@ -155,6 +162,9 @@ class ClusterResourceScheduler { /// If node_id not found, return false; otherwise return true. bool GetNodeResources(int64_t node_id, NodeResources *ret_resources) const; + /// Get local node resources. + const NodeResources &GetLocalNodeResources() const; + /// Get number of nodes in the cluster. int64_t NumNodes(); @@ -175,7 +185,8 @@ class ClusterResourceScheduler { /// \param resource_name: Resource which we want to update. /// \param resource_total: New capacity of the resource. void UpdateResourceCapacity(const std::string &node_name, - const std::string &resource_name, double resource_total); + const std::string &resource_name, + double resource_total) override; /// Delete a given resource from the local node. /// @@ -186,13 +197,14 @@ class ClusterResourceScheduler { /// /// \param node_name: Node whose resource we want to delete. /// \param resource_name: Resource we want to delete - void DeleteResource(const std::string &node_name, const std::string &resource_name); + void DeleteResource(const std::string &node_name, + const std::string &resource_name) override; /// Return local resources. NodeResourceInstances GetLocalResources() { return local_resources_; }; /// Return local resources in human-readable string form. - std::string GetLocalResourceViewString() const; + std::string GetLocalResourceViewString() const override; /// Create instances for each resource associated with the local node, given /// the node's resources. @@ -349,7 +361,7 @@ class ClusterResourceScheduler { const std::string &node_id, const std::unordered_map &task_resources); - void FreeLocalTaskResources(std::shared_ptr task_allocation); + void ReleaseWorkerResources(std::shared_ptr task_allocation); /// Update the available resources of the local node given /// the available instances of each resource of the local node. @@ -368,13 +380,14 @@ class ClusterResourceScheduler { /// /// \param Output parameter. `resources_available` and `resources_total` are the only /// fields used. - void FillResourceUsage(std::shared_ptr resources_data); + void FillResourceUsage(std::shared_ptr resources_data) override; /// Update last report resources local cache from gcs cache, /// this is needed when gcs fo. /// /// \param gcs_resources: The remote cache from gcs. - void UpdateLastResourceUsage(std::shared_ptr gcs_resources); + void UpdateLastResourceUsage( + std::shared_ptr gcs_resources) override; /// Return human-readable string for this scheduler state. std::string DebugString() const; diff --git a/src/ray/raylet/scheduling/cluster_resource_scheduler_interface.h b/src/ray/raylet/scheduling/cluster_resource_scheduler_interface.h new file mode 100644 index 000000000000..ca2ba5237d71 --- /dev/null +++ b/src/ray/raylet/scheduling/cluster_resource_scheduler_interface.h @@ -0,0 +1,70 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "ray/common/task/scheduling_resources.h" +#include "src/ray/protobuf/gcs.pb.h" + +namespace ray { +class ClusterResourceSchedulerInterface { + public: + virtual ~ClusterResourceSchedulerInterface() = default; + + /// Remove node from the cluster data structure. This happens + /// when a node fails or it is removed from the cluster. + /// + /// \param node_id_string ID of the node to be removed. + virtual bool RemoveNode(const std::string &node_id_string) = 0; + + /// Update node resources. This hanppens when a node resource usage udpated. + /// + /// \param node_id_string ID of the node which resoruces need to be udpated. + /// \param resource_data The node resource data. + virtual bool UpdateNode(const std::string &node_id_string, + const rpc::ResourcesData &resource_data) = 0; + + /// \param node_name: Node whose resource we want to update. + /// \param resource_name: Resource which we want to update. + /// \param resource_total: New capacity of the resource. + virtual void UpdateResourceCapacity(const std::string &node_id_string, + const std::string &resource_name, + double resource_total) = 0; + + /// Delete a given resource from a given node. + /// + /// \param node_name: Node whose resource we want to delete. + /// \param resource_name: Resource we want to delete + virtual void DeleteResource(const std::string &node_id_string, + const std::string &resource_name) = 0; + + /// Update last report resources local cache from gcs cache, + /// this is needed when gcs fo. + /// + /// \param gcs_resources: The remote cache from gcs. + virtual void UpdateLastResourceUsage( + std::shared_ptr gcs_resources) {} + + /// Populate the relevant parts of the heartbeat table. This is intended for + /// sending raylet <-> gcs heartbeats. In particular, this should fill in + /// resources_available and resources_total. + /// + /// \param Output parameter. `resources_available` and `resources_total` are the only + /// fields used. + virtual void FillResourceUsage(std::shared_ptr data) = 0; + + /// Return local resources in human-readable string form. + virtual std::string GetLocalResourceViewString() const = 0; +}; +} // namespace ray diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 5bda6456031e..a395e51b5077 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -18,7 +18,9 @@ ClusterTaskManager::ClusterTaskManager( TaskDependencyManagerInterface &task_dependency_manager, std::function is_owner_alive, NodeInfoGetter get_node_info, - std::function announce_infeasible_task) + std::function announce_infeasible_task, + WorkerPoolInterface &worker_pool, + std::unordered_map> &leased_workers) : self_node_id_(self_node_id), cluster_resource_scheduler_(cluster_resource_scheduler), task_dependency_manager_(task_dependency_manager), @@ -27,7 +29,9 @@ ClusterTaskManager::ClusterTaskManager( announce_infeasible_task_(announce_infeasible_task), max_resource_shapes_per_load_report_( RayConfig::instance().max_resource_shapes_per_load_report()), - report_worker_backlog_(RayConfig::instance().report_worker_backlog()) {} + report_worker_backlog_(RayConfig::instance().report_worker_backlog()), + worker_pool_(worker_pool), + leased_workers_(leased_workers) {} bool ClusterTaskManager::SchedulePendingTasks() { // Always try to schedule infeasible tasks in case they are now feasible. @@ -150,7 +154,7 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers( continue; } - std::shared_ptr worker = worker_pool.PopWorker(spec); + std::shared_ptr worker = worker_pool_.PopWorker(spec); if (!worker) { // No worker available, we won't be able to schedule any kind of task. return; @@ -163,7 +167,7 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers( if (!spec.IsDetachedActor() && !is_owner_alive_(owner_worker_id, owner_node_id)) { RAY_LOG(WARNING) << "Task: " << task.GetTaskSpecification().TaskId() << "'s caller is no longer running. Cancelling task."; - worker_pool.PushWorker(worker); + worker_pool_.PushWorker(worker); if (!spec.GetDependencies().empty()) { task_dependency_manager_.RemoveTaskDependencies( task.GetTaskSpecification().TaskId()); @@ -175,9 +179,9 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers( if (worker_leased) { auto reply = std::get<1>(*work_it); auto callback = std::get<2>(*work_it); - Dispatch(worker, leased_workers, task, reply, callback); + Dispatch(worker, leased_workers_, task, reply, callback); } else { - worker_pool.PushWorker(worker); + worker_pool_.PushWorker(worker); } if (remove) { if (!spec.GetDependencies().empty()) { @@ -242,10 +246,14 @@ bool ClusterTaskManager::AttemptDispatchWork(const Work &work, return dispatched; } -void ClusterTaskManager::QueueTask(const Task &task, rpc::RequestWorkerLeaseReply *reply, - std::function callback) { - RAY_LOG(DEBUG) << "Queuing task " << task.GetTaskSpecification().TaskId(); - Work work = std::make_tuple(task, reply, callback); +void ClusterTaskManager::QueueAndScheduleTask( + const Task &task, rpc::RequestWorkerLeaseReply *reply, + rpc::SendReplyCallback send_reply_callback) { + RAY_LOG(DEBUG) << "Queuing and scheduling task " + << task.GetTaskSpecification().TaskId(); + Work work = std::make_tuple(task, reply, [send_reply_callback] { + send_reply_callback(Status::OK(), nullptr, nullptr); + }); const auto &scheduling_class = task.GetTaskSpecification().GetSchedulingClass(); // If the scheduling class is infeasible, just add the work to the infeasible queue // directly. @@ -255,9 +263,19 @@ void ClusterTaskManager::QueueTask(const Task &task, rpc::RequestWorkerLeaseRepl tasks_to_schedule_[scheduling_class].push_back(work); } AddToBacklogTracker(task); + ScheduleAndDispatchTasks(); } -void ClusterTaskManager::TasksUnblocked(const std::vector ready_ids) { +void ClusterTaskManager::ScheduleInfeasibleTasks() { + // Do nothing. + // TODO(Shanly): This method will be removed once we remove the legacy scheduler. +} + +void ClusterTaskManager::TasksUnblocked(const std::vector &ready_ids) { + if (ready_ids.empty()) { + return; + } + for (const auto &task_id : ready_ids) { auto it = waiting_tasks_.find(task_id); if (it != waiting_tasks_.end()) { @@ -270,14 +288,22 @@ void ClusterTaskManager::TasksUnblocked(const std::vector ready_ids) { waiting_tasks_.erase(it); } } + ScheduleAndDispatchTasks(); } -void ClusterTaskManager::HandleTaskFinished(std::shared_ptr worker) { - cluster_resource_scheduler_->FreeLocalTaskResources(worker->GetAllocatedInstances()); - worker->ClearAllocatedInstances(); - cluster_resource_scheduler_->FreeLocalTaskResources( - worker->GetLifetimeAllocatedInstances()); - worker->ClearLifetimeAllocatedInstances(); +void ClusterTaskManager::TaskFinished(std::shared_ptr worker, + Task *task) { + RAY_CHECK(worker != nullptr && task != nullptr); + *task = worker->GetAssignedTask(); + if (worker->GetAllocatedInstances() != nullptr) { + ReleaseWorkerResources(worker); + } +} + +void ClusterTaskManager::ReturnWorkerResources(std::shared_ptr worker) { + // TODO(Shanly): This method will be removed and can be replaced by + // `ReleaseWorkerResources` directly once we remove the legacy scheduler. + ReleaseWorkerResources(worker); } void ReplyCancelled(Work &work) { @@ -394,8 +420,7 @@ void ClusterTaskManager::FillPendingActorInfo(rpc::GetNodeStatsReply *reply) con } } -void ClusterTaskManager::FillResourceUsage( - std::shared_ptr data) const { +void ClusterTaskManager::FillResourceUsage(std::shared_ptr data) { if (max_resource_shapes_per_load_report_ == 0) { return; } @@ -568,7 +593,7 @@ bool ClusterTaskManager::AnyPendingTasks(Task *exemplar, bool *any_pending, int *num_pending_actor_creation, int *num_pending_tasks) const { // We are guaranteed that these tasks are blocked waiting for resources after a - // call to ScheduleAndDispatch(). They may be waiting for workers as well, but + // call to ScheduleAndDispatchTasks(). They may be waiting for workers as well, but // this should be a transient condition only. for (const auto &shapes_it : boost::join(tasks_to_dispatch_, tasks_to_schedule_)) { auto &work_queue = shapes_it.second; @@ -590,7 +615,9 @@ bool ClusterTaskManager::AnyPendingTasks(Task *exemplar, bool *any_pending, return *any_pending; } -std::string ClusterTaskManager::DebugString() const { +std::string ClusterTaskManager::DebugStr() const { + // TODO(Shanly): This method will be replaced with `DebugString` once we remove the + // legacy scheduler. std::stringstream buffer; buffer << "========== Node: " << self_node_id_ << " =================\n"; buffer << "Schedule queue length: " << tasks_to_schedule_.size() << "\n"; @@ -747,5 +774,74 @@ void ClusterTaskManager::RemoveFromBacklogTracker(const Task &task) { } } +void ClusterTaskManager::ReleaseWorkerResources(std::shared_ptr worker) { + RAY_CHECK(worker != nullptr); + cluster_resource_scheduler_->ReleaseWorkerResources(worker->GetAllocatedInstances()); + worker->ClearAllocatedInstances(); + cluster_resource_scheduler_->ReleaseWorkerResources( + worker->GetLifetimeAllocatedInstances()); + worker->ClearLifetimeAllocatedInstances(); +} + +bool ClusterTaskManager::ReleaseCpuResourcesFromUnblockedWorker( + std::shared_ptr worker) { + if (!worker || worker->IsBlocked()) { + return false; + } + + if (worker->GetAllocatedInstances() != nullptr) { + auto cpu_instances = worker->GetAllocatedInstances()->GetCPUInstancesDouble(); + if (cpu_instances.size() > 0) { + std::vector overflow_cpu_instances = + cluster_resource_scheduler_->AddCPUResourceInstances(cpu_instances); + for (unsigned int i = 0; i < overflow_cpu_instances.size(); i++) { + RAY_CHECK(overflow_cpu_instances[i] == 0) << "Should not be overflow"; + } + return true; + } + } + + return false; +} + +bool ClusterTaskManager::ReturnCpuResourcesToBlockedWorker( + std::shared_ptr worker) { + if (!worker || !worker->IsBlocked()) { + return false; + } + if (worker->GetAllocatedInstances() != nullptr) { + auto cpu_instances = worker->GetAllocatedInstances()->GetCPUInstancesDouble(); + if (cpu_instances.size() > 0) { + // Important: we allow going negative here, since otherwise you can use infinite + // CPU resources by repeatedly blocking / unblocking a task. By allowing it to go + // negative, at most one task can "borrow" this worker's resources. + cluster_resource_scheduler_->SubtractCPUResourceInstances( + cpu_instances, /*allow_going_negative=*/true); + return true; + } + } + return false; +} + +void ClusterTaskManager::ScheduleAndDispatchTasks() { + SchedulePendingTasks(); + DispatchScheduledTasksToWorkers(worker_pool_, leased_workers_); +} + +void ClusterTaskManager::OnNodeResourceUsageUpdated( + const NodeID &node_id, const rpc::ResourcesData &resource_data) { + // TODO(Shanly): This method will be removed and can be replaced by + // `ScheduleAndDispatchTasks` directly once we remove the legacy scheduler. + ScheduleAndDispatchTasks(); +} + +void ClusterTaskManager::OnObjectMissing(const ObjectID &object_id, + const std::vector &waiting_task_ids) { + // We don't need to do anything if the new scheduler is enabled because tasks + // will get moved back to waiting once they reach the front of the dispatch + // queue. + // TODO(Shanly): This method will be removed once we remove the legacy scheduler. +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/scheduling/cluster_task_manager.h b/src/ray/raylet/scheduling/cluster_task_manager.h index ebbf4770c5d6..f632357e10f4 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.h +++ b/src/ray/raylet/scheduling/cluster_task_manager.h @@ -6,6 +6,7 @@ #include "ray/common/task/task_common.h" #include "ray/raylet/dependency_manager.h" #include "ray/raylet/scheduling/cluster_resource_scheduler.h" +#include "ray/raylet/scheduling/cluster_task_manager_interface.h" #include "ray/raylet/worker.h" #include "ray/raylet/worker_pool.h" #include "ray/rpc/grpc_client.h" @@ -37,7 +38,7 @@ typedef std::function(const NodeID &node_id)> /// there is a new worker which can dispatch the tasks. /// 5. When a worker finishes executing its task(s), the requester will return /// it and we should release the resources in our view of the node's state. -class ClusterTaskManager { +class ClusterTaskManager : public ClusterTaskManagerInterface { public: /// fullfills_dependencies_func Should return if all dependencies are /// fulfilled and unsubscribe from dependencies only if they're fulfilled. If @@ -51,51 +52,48 @@ class ClusterTaskManager { /// \param is_owner_alive: A callback which returns if the owner process is alive /// (according to our ownership model). /// \param gcs_client: A gcs client. - ClusterTaskManager(const NodeID &self_node_id, - std::shared_ptr cluster_resource_scheduler, - TaskDependencyManagerInterface &task_dependency_manager, - std::function is_owner_alive, - NodeInfoGetter get_node_info, - std::function announce_infeasible_task); - - /// (Step 2) For each task in tasks_to_schedule_, pick a node in the system - /// (local or remote) that has enough resources available to run the task, if - /// any such node exist. Skip tasks which are not schedulable. - /// - /// \return True if any tasks are ready for dispatch. - bool SchedulePendingTasks(); - - /// (Step 3) Attempts to dispatch all tasks which are ready to run. A task - /// will be dispatched if it is on `tasks_to_dispatch_` and there are still - /// available resources on the node. - /// - /// If there are not enough resources locally, up to one task per resource - /// shape (the task at the head of the queue) will get spilled back to a - /// different node. - /// - /// \param worker_pool: The pool of workers which will be dispatched to. - /// `worker_pool` state will be modified (idle workers will be popped) during - /// dispatching. - void DispatchScheduledTasksToWorkers( + ClusterTaskManager( + const NodeID &self_node_id, + std::shared_ptr cluster_resource_scheduler, + TaskDependencyManagerInterface &task_dependency_manager, + std::function is_owner_alive, + NodeInfoGetter get_node_info, + std::function announce_infeasible_task, WorkerPoolInterface &worker_pool, std::unordered_map> &leased_workers); - /// (Step 1) Queue tasks for scheduling. - /// \param fn: The function used during dispatching. - /// \param task: The incoming task to schedule. - void QueueTask(const Task &task, rpc::RequestWorkerLeaseReply *reply, - std::function); + /// (Step 1) Queue tasks and schedule. + /// Queue task and schedule. This hanppens when processing the worker lease request. + /// + /// \param task: The incoming task to be queued and scheduled. + /// \param reply: The reply of the lease request. + /// \param send_reply_callback: The function used during dispatching. + void QueueAndScheduleTask(const Task &task, rpc::RequestWorkerLeaseReply *reply, + rpc::SendReplyCallback send_reply_callback) override; + + /// Schedule infeasible tasks. + void ScheduleInfeasibleTasks() override; /// Move tasks from waiting to ready for dispatch. Called when a task's /// dependencies are resolved. /// /// \param readyIds: The tasks which are now ready to be dispatched. - void TasksUnblocked(const std::vector ready_ids); + void TasksUnblocked(const std::vector &ready_ids) override; - /// (Step 5) Call once a task finishes (i.e. a worker is returned). + /// Return the finished task and relase the worker resources. + /// This method will be removed and can be replaced by `ReleaseWorkerResources` directly + /// once we remove the legacy scheduler. + /// + /// \param worker: The worker which was running the task. + /// \param task: Output parameter. + void TaskFinished(std::shared_ptr worker, Task *task) override; + + /// Return worker resources. + /// This method will be removed and can be replaced by `ReleaseWorkerResources` directly + /// once we remove the legacy scheduler. /// /// \param worker: The worker which was running the task. - void HandleTaskFinished(std::shared_ptr worker); + void ReturnWorkerResources(std::shared_ptr worker) override; /// Attempt to cancel an already queued task. /// @@ -103,12 +101,12 @@ class ClusterTaskManager { /// /// \return True if task was successfully removed. This function will return /// false if the task is already running. - bool CancelTask(const TaskID &task_id); + bool CancelTask(const TaskID &task_id) override; /// Populate the list of pending or infeasible actor tasks for node stats. /// /// \param Output parameter. - void FillPendingActorInfo(rpc::GetNodeStatsReply *reply) const; + void FillPendingActorInfo(rpc::GetNodeStatsReply *reply) const override; /// Populate the relevant parts of the heartbeat table. This is intended for /// sending resource usage of raylet to gcs. In particular, this should fill in @@ -116,7 +114,7 @@ class ClusterTaskManager { /// /// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only /// fields used. - void FillResourceUsage(std::shared_ptr data) const; + void FillResourceUsage(std::shared_ptr data) override; /// Return if any tasks are pending resource acquisition. /// @@ -126,11 +124,72 @@ class ClusterTaskManager { /// \param[in] any_pending True if there's any pending exemplar. /// \return True if any progress is any tasks are pending. bool AnyPendingTasks(Task *exemplar, bool *any_pending, int *num_pending_actor_creation, - int *num_pending_tasks) const; + int *num_pending_tasks) const override; - std::string DebugString() const; + /// (Step 5) Call once a task finishes (i.e. a worker is returned). + /// + /// \param worker: The worker which was running the task. + void ReleaseWorkerResources(std::shared_ptr worker) override; + + /// When a task is blocked in ray.get or ray.wait, the worker who is executing the task + /// should give up the CPU resources allocated for the running task for the time being + /// and the worker itself should also be marked as blocked. + /// + /// \param worker The worker who will give up the CPU resources. + /// \return true if the cpu resources of the specified worker are released successfully, + /// else false. + bool ReleaseCpuResourcesFromUnblockedWorker( + std::shared_ptr worker) override; + + /// When a task is no longer blocked in a ray.get or ray.wait, the CPU resources that + /// the worker gave up should be returned to it. + /// + /// \param worker The blocked worker. + /// \return true if the cpu resources are returned back to the specified worker, else + /// false. + bool ReturnCpuResourcesToBlockedWorker( + std::shared_ptr worker) override; + + // Schedule and dispatch tasks. + void ScheduleAndDispatchTasks() override; + + /// Handle the resource usage updated event of the specified node. + /// + /// \param node_id ID of the node which resources are updated. + /// \param resource_data The node resources. + void OnNodeResourceUsageUpdated(const NodeID &node_id, + const rpc::ResourcesData &resource_data) override; + + /// Handle the object missing event. + /// + /// \param object_id ID of the missing object. + /// \param waiting_task_ids IDs of tasks that are waitting for the specified missing + /// object. + void OnObjectMissing(const ObjectID &object_id, + const std::vector &waiting_task_ids) override; + + /// The helper to dump the debug state of the cluster task manater. + std::string DebugStr() const override; private: + /// (Step 2) For each task in tasks_to_schedule_, pick a node in the system + /// (local or remote) that has enough resources available to run the task, if + /// any such node exist. Skip tasks which are not schedulable. + /// + /// \return True if any tasks are ready for dispatch. + bool SchedulePendingTasks(); + + /// (Step 3) Attempts to dispatch all tasks which are ready to run. A task + /// will be dispatched if it is on `tasks_to_dispatch_` and there are still + /// available resources on the node. + /// + /// If there are not enough resources locally, up to one task per resource + /// shape (the task at the head of the queue) will get spilled back to a + /// different node. + void DispatchScheduledTasksToWorkers( + WorkerPoolInterface &worker_pool, + std::unordered_map> &leased_workers); + /// Helper method to try dispatching a single task from the queue to an /// available worker. Returns whether the task should be removed from the /// queue and whether the worker was successfully leased to execute the work. @@ -184,6 +243,11 @@ class ClusterTaskManager { /// Track the cumulative backlog of all workers requesting a lease to this raylet. std::unordered_map backlog_tracker_; + /// TODO(Shanly): Remove `worker_pool_` and `leased_workers_` and make them as + /// parameters of methods if necessary once we remove the legacy scheduler. + WorkerPoolInterface &worker_pool_; + std::unordered_map> &leased_workers_; + /// Determine whether a task should be immediately dispatched, /// or placed on a wait queue. /// diff --git a/src/ray/raylet/scheduling/cluster_task_manager_interface.h b/src/ray/raylet/scheduling/cluster_task_manager_interface.h new file mode 100644 index 000000000000..950741e54be3 --- /dev/null +++ b/src/ray/raylet/scheduling/cluster_task_manager_interface.h @@ -0,0 +1,140 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "ray/raylet/worker.h" +#include "ray/rpc/server_call.h" +#include "src/ray/protobuf/node_manager.pb.h" + +namespace ray { +namespace raylet { +class ClusterTaskManagerInterface { + public: + virtual ~ClusterTaskManagerInterface() = default; + + /// Return the resources that were being used by this worker. + virtual void ReleaseWorkerResources(std::shared_ptr worker) = 0; + + /// When a task is blocked in ray.get or ray.wait, the worker who is executing the task + /// should give up the CPU resources allocated for the running task for the time being + /// and the worker itself should also be marked as blocked. + /// + /// \param worker The worker who will give up the CPU resources. + /// \return true if the cpu resources of the specified worker are released successfully, + /// else false. + virtual bool ReleaseCpuResourcesFromUnblockedWorker( + std::shared_ptr worker) = 0; + + /// When a task is no longer blocked in a ray.get or ray.wait, the CPU resources that + /// the worker gave up should be returned to it. + /// + /// \param worker The blocked worker. + /// \return true if the cpu resources are returned back to the specified worker, else + /// false. + virtual bool ReturnCpuResourcesToBlockedWorker( + std::shared_ptr worker) = 0; + + // Schedule and dispatch tasks. + virtual void ScheduleAndDispatchTasks() = 0; + + /// Move tasks from waiting to ready for dispatch. Called when a task's + /// dependencies are resolved. + /// + /// \param readyIds: The tasks which are now ready to be dispatched. + virtual void TasksUnblocked(const std::vector &ready_ids) = 0; + + /// Populate the relevant parts of the heartbeat table. This is intended for + /// sending raylet <-> gcs heartbeats. In particular, this should fill in + /// resource_load and resource_load_by_shape. + /// + /// \param Output parameter. `resource_load` and `resource_load_by_shape` are the only + /// fields used. + virtual void FillResourceUsage(std::shared_ptr data) = 0; + + /// Populate the list of pending or infeasible actor tasks for node stats. + /// + /// \param Output parameter. + virtual void FillPendingActorInfo(rpc::GetNodeStatsReply *reply) const = 0; + + /// Return the finished task and relase the worker resources. + /// This method will be removed and can be replaced by `ReleaseWorkerResources` directly + /// once we remove the legacy scheduler. + /// + /// \param worker: The worker which was running the task. + /// \param task: Output parameter. + virtual void TaskFinished(std::shared_ptr worker, Task *task) = 0; + + /// Return worker resources. + /// This method will be removed and can be replaced by `ReleaseWorkerResources` directly + /// once we remove the legacy scheduler + /// + /// \param worker: The worker which was running the task. + virtual void ReturnWorkerResources(std::shared_ptr worker) = 0; + + /// Attempt to cancel an already queued task. + /// + /// \param task_id: The id of the task to remove. + /// + /// \return True if task was successfully removed. This function will return + /// false if the task is already running. + virtual bool CancelTask(const TaskID &task_id) = 0; + + /// Queue task and schedule. This hanppens when processing the worker lease request. + /// + /// \param task: The incoming task to be queued and scheduled. + /// \param reply: The reply of the lease request. + /// \param send_reply_callback: The function used during dispatching. + virtual void QueueAndScheduleTask(const Task &task, rpc::RequestWorkerLeaseReply *reply, + rpc::SendReplyCallback send_reply_callback) = 0; + + /// Schedule infeasible tasks. + virtual void ScheduleInfeasibleTasks() = 0; + + /// Return if any tasks are pending resource acquisition. + /// + /// \param[in] exemplar An example task that is deadlocking. + /// \param[in] num_pending_actor_creation Number of pending actor creation tasks. + /// \param[in] num_pending_tasks Number of pending tasks. + /// \param[in] any_pending True if there's any pending exemplar. + /// \return True if any progress is any tasks are pending. + virtual bool AnyPendingTasks(Task *exemplar, bool *any_pending, + int *num_pending_actor_creation, + int *num_pending_tasks) const = 0; + + /// Handle the resource usage updated event of the specified node. + /// + /// \param node_id ID of the node which resources are updated. + /// \param resource_data The node resources. + virtual void OnNodeResourceUsageUpdated(const NodeID &node_id, + const rpc::ResourcesData &resource_data) = 0; + + /// Handle the object missing event. + /// + /// \param object_id ID of the missing object. + /// \param waiting_task_ids IDs of tasks that are waitting for the specified missing + /// object. + virtual void OnObjectMissing(const ObjectID &object_id, + const std::vector &waiting_task_ids) = 0; + + /// The helper to dump the debug state of the cluster task manater. + /// + /// As the NodeManager inherites from ClusterTaskManager and the + /// `cluster_task_manager_->DebugString()` is invoked inside + /// `NodeManager::DebugString()`, which will leads to infinite loop and cause stack + /// overflow, so we should rename `DebugString` to `DebugStr` to avoid this. + virtual std::string DebugStr() const = 0; +}; +} // namespace raylet +} // namespace ray diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 604ca1b1dd72..7c5f00820839 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -129,7 +129,8 @@ class ClusterTaskManagerTest : public ::testing::Test { node_info_calls_++; return node_info_[node_id]; }, - [this](const Task &task) { announce_infeasible_task_calls_++; }) {} + [this](const Task &task) { announce_infeasible_task_calls_++; }, + pool_, leased_workers_) {} void SetUp() {} @@ -180,11 +181,12 @@ TEST_F(ClusterTaskManagerTest, BasicTest) { rpc::RequestWorkerLeaseReply reply; bool callback_occurred = false; bool *callback_occurred_ptr = &callback_occurred; - auto callback = [callback_occurred_ptr]() { *callback_occurred_ptr = true; }; + auto callback = [callback_occurred_ptr](Status, std::function, + std::function) { + *callback_occurred_ptr = true; + }; - task_manager_.QueueTask(task, &reply, callback); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.QueueAndScheduleTask(task, &reply, callback); ASSERT_FALSE(callback_occurred); ASSERT_EQ(leased_workers_.size(), 0); @@ -194,7 +196,7 @@ TEST_F(ClusterTaskManagerTest, BasicTest) { std::make_shared(WorkerID::FromRandom(), 1234); pool_.PushWorker(std::dynamic_pointer_cast(worker)); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.ScheduleAndDispatchTasks(); ASSERT_TRUE(callback_occurred); ASSERT_EQ(leased_workers_.size(), 1); @@ -214,11 +216,12 @@ TEST_F(ClusterTaskManagerTest, NoFeasibleNodeTest) { bool callback_called = false; bool *callback_called_ptr = &callback_called; - auto callback = [callback_called_ptr]() { *callback_called_ptr = true; }; + auto callback = [callback_called_ptr](Status, std::function, + std::function) { + *callback_called_ptr = true; + }; - task_manager_.QueueTask(task, &reply, callback); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.QueueAndScheduleTask(task, &reply, callback); ASSERT_FALSE(callback_called); ASSERT_EQ(leased_workers_.size(), 0); @@ -243,7 +246,8 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) { rpc::RequestWorkerLeaseReply reply; int num_callbacks = 0; int *num_callbacks_ptr = &num_callbacks; - auto callback = [num_callbacks_ptr]() { + auto callback = [num_callbacks_ptr](Status, std::function, + std::function) { (*num_callbacks_ptr) = *num_callbacks_ptr + 1; }; @@ -252,9 +256,7 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) { auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 1); std::unordered_set expected_subscribed_tasks = { task.GetTaskSpecification().TaskId()}; - task_manager_.QueueTask(task, &reply, callback); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.QueueAndScheduleTask(task, &reply, callback); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); ASSERT_EQ(num_callbacks, 0); @@ -263,9 +265,7 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) { /* This task can run */ auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 5}}); - task_manager_.QueueTask(task2, &reply, callback); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.QueueAndScheduleTask(task2, &reply, callback); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); ASSERT_EQ(num_callbacks, 1); @@ -277,8 +277,6 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) { auto id = task.GetTaskSpecification().TaskId(); std::vector unblocked = {id}; task_manager_.TasksUnblocked(unblocked); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); ASSERT_EQ(num_callbacks, 1); @@ -287,10 +285,9 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) { /* Second task finishes, making space for the original task */ leased_workers_.clear(); - task_manager_.HandleTaskFinished(worker); + task_manager_.ReleaseWorkerResources(worker); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.ScheduleAndDispatchTasks(); ASSERT_TRUE(dependency_manager_.subscribed_tasks.empty()); // Task2 is now done so task can run. @@ -313,14 +310,14 @@ TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) { AddNode(remote_node_id, 5); int num_callbacks = 0; - auto callback = [&]() { num_callbacks++; }; + auto callback = [&](Status, std::function, std::function) { + num_callbacks++; + }; /* Blocked on starting a worker. */ auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}); rpc::RequestWorkerLeaseReply local_reply; - task_manager_.QueueTask(task, &local_reply, callback); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.QueueAndScheduleTask(task, &local_reply, callback); ASSERT_EQ(num_callbacks, 0); ASSERT_EQ(leased_workers_.size(), 0); @@ -328,9 +325,7 @@ TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) { /* This task can run but not at the same time as the first */ auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 5}}); rpc::RequestWorkerLeaseReply spillback_reply; - task_manager_.QueueTask(task2, &spillback_reply, callback); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.QueueAndScheduleTask(task2, &spillback_reply, callback); ASSERT_EQ(num_callbacks, 0); ASSERT_EQ(leased_workers_.size(), 0); @@ -339,7 +334,7 @@ TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) { // longer available for the second. pool_.PushWorker(std::dynamic_pointer_cast(worker)); pool_.PushWorker(std::dynamic_pointer_cast(worker)); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.ScheduleAndDispatchTasks(); // Check that both tasks got removed from the queue. ASSERT_EQ(num_callbacks, 2); // The first task was dispatched. @@ -353,47 +348,33 @@ TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) { TEST_F(ClusterTaskManagerTest, TaskCancellationTest) { std::shared_ptr worker = std::make_shared(WorkerID::FromRandom(), 1234); - pool_.PushWorker(std::dynamic_pointer_cast(worker)); - Task task = CreateTask({{ray::kCPU_ResourceLabel, 1}}); rpc::RequestWorkerLeaseReply reply; bool callback_called = false; bool *callback_called_ptr = &callback_called; - auto callback = [callback_called_ptr]() { *callback_called_ptr = true; }; + auto callback = [callback_called_ptr](Status, std::function, + std::function) { + *callback_called_ptr = true; + }; // Task not queued so we can't cancel it. ASSERT_FALSE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId())); - task_manager_.QueueTask(task, &reply, callback); + task_manager_.QueueAndScheduleTask(task, &reply, callback); - // Task is now queued so cancellation works. + // Task is now in dispatch queue. callback_called = false; reply.Clear(); ASSERT_TRUE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId())); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.ScheduleAndDispatchTasks(); // Task will not execute. ASSERT_TRUE(callback_called); ASSERT_TRUE(reply.canceled()); ASSERT_EQ(leased_workers_.size(), 0); - ASSERT_EQ(pool_.workers.size(), 1); - task_manager_.QueueTask(task, &reply, callback); - task_manager_.SchedulePendingTasks(); - - // We can still cancel the task if it's on the dispatch queue. - callback_called = false; - reply.Clear(); - ASSERT_TRUE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId())); - // Task will not execute. - ASSERT_TRUE(reply.canceled()); - ASSERT_TRUE(callback_called); - ASSERT_EQ(leased_workers_.size(), 0); - ASSERT_EQ(pool_.workers.size(), 1); - - task_manager_.QueueTask(task, &reply, callback); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + pool_.PushWorker(std::dynamic_pointer_cast(worker)); + task_manager_.QueueAndScheduleTask(task, &reply, callback); // Task is now running so we can't cancel it. callback_called = false; @@ -418,14 +399,16 @@ TEST_F(ClusterTaskManagerTest, TaskCancelInfeasibleTask) { bool callback_called = false; bool *callback_called_ptr = &callback_called; - auto callback = [callback_called_ptr]() { *callback_called_ptr = true; }; + auto callback = [callback_called_ptr](Status, std::function, + std::function) { + *callback_called_ptr = true; + }; - task_manager_.QueueTask(task, &reply, callback); + task_manager_.QueueAndScheduleTask(task, &reply, callback); // Task is now queued so cancellation works. ASSERT_TRUE(task_manager_.CancelTask(task.GetTaskSpecification().TaskId())); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.ScheduleAndDispatchTasks(); // Task will not execute. ASSERT_TRUE(callback_called); ASSERT_TRUE(reply.canceled()); @@ -436,8 +419,7 @@ TEST_F(ClusterTaskManagerTest, TaskCancelInfeasibleTask) { // cancelled. auto remote_node_id = NodeID::FromRandom(); AddNode(remote_node_id, 12); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.ScheduleAndDispatchTasks(); ASSERT_TRUE(callback_called); ASSERT_TRUE(reply.canceled()); ASSERT_EQ(leased_workers_.size(), 0); @@ -456,11 +438,12 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) { bool callback_called = false; bool *callback_called_ptr = &callback_called; - auto callback = [callback_called_ptr]() { *callback_called_ptr = true; }; + auto callback = [callback_called_ptr](Status, std::function, + std::function) { + *callback_called_ptr = true; + }; - task_manager_.QueueTask(task, &reply, callback); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.QueueAndScheduleTask(task, &reply, callback); ASSERT_TRUE(callback_called); // Now {CPU: 7, GPU: 4, MEM:128} } @@ -471,11 +454,12 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) { bool callback_called = false; bool *callback_called_ptr = &callback_called; - auto callback = [callback_called_ptr]() { *callback_called_ptr = true; }; + auto callback = [callback_called_ptr](Status, std::function, + std::function) { + *callback_called_ptr = true; + }; - task_manager_.QueueTask(task, &reply, callback); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.QueueAndScheduleTask(task, &reply, callback); ASSERT_FALSE(callback_called); // No worker available. // Now {CPU: 7, GPU: 4, MEM:128} with 1 queued task. } @@ -486,11 +470,12 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) { bool callback_called = false; bool *callback_called_ptr = &callback_called; - auto callback = [callback_called_ptr]() { *callback_called_ptr = true; }; + auto callback = [callback_called_ptr](Status, std::function, + std::function) { + *callback_called_ptr = true; + }; - task_manager_.QueueTask(task, &reply, callback); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.QueueAndScheduleTask(task, &reply, callback); ASSERT_FALSE(callback_called); // Infeasible. // Now there is also an infeasible task {CPU: 9}. } @@ -501,11 +486,12 @@ TEST_F(ClusterTaskManagerTest, HeartbeatTest) { bool callback_called = false; bool *callback_called_ptr = &callback_called; - auto callback = [callback_called_ptr]() { *callback_called_ptr = true; }; + auto callback = [callback_called_ptr](Status, std::function, + std::function) { + *callback_called_ptr = true; + }; - task_manager_.QueueTask(task, &reply, callback); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.QueueAndScheduleTask(task, &reply, callback); ASSERT_FALSE(callback_called); // Infeasible. // Now there is also an infeasible task {CPU: 10}. } @@ -563,7 +549,10 @@ TEST_F(ClusterTaskManagerTest, BacklogReportTest) { rpc::RequestWorkerLeaseReply reply; bool callback_occurred = false; bool *callback_occurred_ptr = &callback_occurred; - auto callback = [callback_occurred_ptr]() { *callback_occurred_ptr = true; }; + auto callback = [callback_occurred_ptr](Status, std::function, + std::function) { + *callback_occurred_ptr = true; + }; std::shared_ptr worker = std::make_shared(WorkerID::FromRandom(), 1234); @@ -574,11 +563,9 @@ TEST_F(ClusterTaskManagerTest, BacklogReportTest) { for (int i = 0; i < 10; i++) { Task task = CreateTask({{ray::kCPU_ResourceLabel, 100}}); task.SetBacklogSize(i); - task_manager_.QueueTask(task, &reply, callback); + task_manager_.QueueAndScheduleTask(task, &reply, callback); to_cancel.push_back(task.GetTaskSpecification().TaskId()); } - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); ASSERT_FALSE(callback_occurred); ASSERT_EQ(leased_workers_.size(), 0); @@ -620,24 +607,24 @@ TEST_F(ClusterTaskManagerTest, OwnerDeadTest) { rpc::RequestWorkerLeaseReply reply; bool callback_occurred = false; bool *callback_occurred_ptr = &callback_occurred; - auto callback = [callback_occurred_ptr]() { *callback_occurred_ptr = true; }; + auto callback = [callback_occurred_ptr](Status, std::function, + std::function) { + *callback_occurred_ptr = true; + }; std::shared_ptr worker = std::make_shared(WorkerID::FromRandom(), 1234); pool_.PushWorker(std::dynamic_pointer_cast(worker)); - task_manager_.QueueTask(task, &reply, callback); - task_manager_.SchedulePendingTasks(); - is_owner_alive_ = false; - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.QueueAndScheduleTask(task, &reply, callback); ASSERT_FALSE(callback_occurred); ASSERT_EQ(leased_workers_.size(), 0); ASSERT_EQ(pool_.workers.size(), 1); is_owner_alive_ = true; - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.ScheduleAndDispatchTasks(); ASSERT_FALSE(callback_occurred); ASSERT_EQ(leased_workers_.size(), 0); @@ -653,20 +640,21 @@ TEST_F(ClusterTaskManagerTest, TestInfeasibleTaskWarning) { Task task = CreateTask({{ray::kCPU_ResourceLabel, 12}}); rpc::RequestWorkerLeaseReply reply; std::shared_ptr callback_occurred = std::make_shared(false); - auto callback = [callback_occurred]() { *callback_occurred = true; }; - task_manager_.QueueTask(task, &reply, callback); - task_manager_.SchedulePendingTasks(); + auto callback = [callback_occurred](Status, std::function, + std::function) { + *callback_occurred = true; + }; + task_manager_.QueueAndScheduleTask(task, &reply, callback); ASSERT_EQ(announce_infeasible_task_calls_, 1); // Infeasible warning shouldn't be reprinted when the previous task is still infeasible // after adding a new node. AddNode(NodeID::FromRandom(), 8); - task_manager_.SchedulePendingTasks(); std::shared_ptr worker = std::make_shared(WorkerID::FromRandom(), 1234); pool_.PushWorker(std::dynamic_pointer_cast(worker)); + task_manager_.ScheduleAndDispatchTasks(); // Task shouldn't be scheduled yet. - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); ASSERT_EQ(announce_infeasible_task_calls_, 1); ASSERT_FALSE(*callback_occurred); ASSERT_EQ(leased_workers_.size(), 0); @@ -676,8 +664,7 @@ TEST_F(ClusterTaskManagerTest, TestInfeasibleTaskWarning) { // task is spillbacked properly. auto remote_node_id = NodeID::FromRandom(); AddNode(remote_node_id, 12); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.ScheduleAndDispatchTasks(); // Make sure nothing happens locally. ASSERT_EQ(announce_infeasible_task_calls_, 1); ASSERT_TRUE(*callback_occurred); @@ -697,18 +684,22 @@ TEST_F(ClusterTaskManagerTest, TestMultipleInfeasibleTasksWarnOnce) { Task task = CreateTask({{ray::kCPU_ResourceLabel, 12}}); rpc::RequestWorkerLeaseReply reply; std::shared_ptr callback_occurred = std::make_shared(false); - auto callback = [callback_occurred]() { *callback_occurred = true; }; - task_manager_.QueueTask(task, &reply, callback); - task_manager_.SchedulePendingTasks(); + auto callback = [callback_occurred](Status, std::function, + std::function) { + *callback_occurred = true; + }; + task_manager_.QueueAndScheduleTask(task, &reply, callback); ASSERT_EQ(announce_infeasible_task_calls_, 1); // Make sure the same shape infeasible task won't be announced. Task task2 = CreateTask({{ray::kCPU_ResourceLabel, 12}}); rpc::RequestWorkerLeaseReply reply2; std::shared_ptr callback_occurred2 = std::make_shared(false); - auto callback2 = [callback_occurred2]() { *callback_occurred2 = true; }; - task_manager_.QueueTask(task2, &reply2, callback2); - task_manager_.SchedulePendingTasks(); + auto callback2 = [callback_occurred2](Status, std::function, + std::function) { + *callback_occurred2 = true; + }; + task_manager_.QueueAndScheduleTask(task2, &reply2, callback2); ASSERT_EQ(announce_infeasible_task_calls_, 1); } @@ -716,18 +707,19 @@ TEST_F(ClusterTaskManagerTest, TestAnyPendingTasks) { /* Check if the manager can correctly identify pending tasks. */ + std::shared_ptr worker = + std::make_shared(WorkerID::FromRandom(), 1234); + pool_.PushWorker(std::dynamic_pointer_cast(worker)); // task1: running Task task = CreateTask({{ray::kCPU_ResourceLabel, 6}}); rpc::RequestWorkerLeaseReply reply; std::shared_ptr callback_occurred = std::make_shared(false); - auto callback = [callback_occurred]() { *callback_occurred = true; }; - task_manager_.QueueTask(task, &reply, callback); - task_manager_.SchedulePendingTasks(); - std::shared_ptr worker = - std::make_shared(WorkerID::FromRandom(), 1234); - pool_.PushWorker(std::dynamic_pointer_cast(worker)); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + auto callback = [callback_occurred](Status, std::function, + std::function) { + *callback_occurred = true; + }; + task_manager_.QueueAndScheduleTask(task, &reply, callback); ASSERT_TRUE(*callback_occurred); ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(pool_.workers.size(), 0); @@ -744,9 +736,11 @@ TEST_F(ClusterTaskManagerTest, TestAnyPendingTasks) { Task task2 = CreateTask({{ray::kCPU_ResourceLabel, 6}}); rpc::RequestWorkerLeaseReply reply2; std::shared_ptr callback_occurred2 = std::make_shared(false); - auto callback2 = [callback_occurred2]() { *callback_occurred2 = true; }; - task_manager_.QueueTask(task2, &reply2, callback2); - task_manager_.SchedulePendingTasks(); + auto callback2 = [callback_occurred2](Status, std::function, + std::function) { + *callback_occurred2 = true; + }; + task_manager_.QueueAndScheduleTask(task2, &reply2, callback2); ASSERT_FALSE(*callback_occurred2); ASSERT_TRUE(task_manager_.AnyPendingTasks(&exemplar, &any_pending, &pending_actor_creations, &pending_tasks)); @@ -764,7 +758,8 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) { rpc::RequestWorkerLeaseReply reply; int num_callbacks = 0; int *num_callbacks_ptr = &num_callbacks; - auto callback = [num_callbacks_ptr]() { + auto callback = [num_callbacks_ptr](Status, std::function, + std::function) { (*num_callbacks_ptr) = *num_callbacks_ptr + 1; }; @@ -773,9 +768,7 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) { auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 2); std::unordered_set expected_subscribed_tasks = { task.GetTaskSpecification().TaskId()}; - task_manager_.QueueTask(task, &reply, callback); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.QueueAndScheduleTask(task, &reply, callback); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); ASSERT_EQ(num_callbacks, 0); ASSERT_EQ(leased_workers_.size(), 0); @@ -785,8 +778,6 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) { pool_.workers.clear(); auto id = task.GetTaskSpecification().TaskId(); task_manager_.TasksUnblocked({id}); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); ASSERT_EQ(num_callbacks, 0); ASSERT_EQ(leased_workers_.size(), 0); @@ -794,22 +785,72 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) { /* Task argument gets evicted */ dependency_manager_.task_ready_ = false; pool_.PushWorker(std::dynamic_pointer_cast(worker)); - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.ScheduleAndDispatchTasks(); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); ASSERT_EQ(num_callbacks, 0); ASSERT_EQ(leased_workers_.size(), 0); /* Worker available and arguments available */ - task_manager_.TasksUnblocked({id}); dependency_manager_.task_ready_ = true; - task_manager_.SchedulePendingTasks(); - task_manager_.DispatchScheduledTasksToWorkers(pool_, leased_workers_); + task_manager_.TasksUnblocked({id}); ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(leased_workers_.size(), 1); AssertNoLeaks(); } +TEST_F(ClusterTaskManagerTest, RleaseAndReturnWorkerCpuResources) { + const NodeResources &node_resources = scheduler_->GetLocalNodeResources(); + ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::CPU].available, 8); + ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::GPU].available, 4); + + auto worker = std::make_shared(WorkerID::FromRandom(), 1234); + + // Check failed as the worker has no allocated resource instances. + ASSERT_FALSE(task_manager_.ReleaseCpuResourcesFromUnblockedWorker(worker)); + + auto node_resource_instances = scheduler_->GetLocalResources(); + auto available_resource_instances = + node_resource_instances.GetAvailableResourceInstances(); + + auto allocated_instances = std::make_shared(); + const std::unordered_map task_spec = {{"CPU", 1.}, {"GPU", 1.}}; + ASSERT_TRUE(scheduler_->AllocateLocalTaskResources(task_spec, allocated_instances)); + worker->SetAllocatedInstances(allocated_instances); + + // Check that the resoruces are allocated successfully. + ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::CPU].available, 7); + ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::GPU].available, 3); + + // Check that the cpu resources are released successfully. + ASSERT_TRUE(task_manager_.ReleaseCpuResourcesFromUnblockedWorker(worker)); + + // Check that only cpu resources are released. + ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::CPU].available, 8); + ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::GPU].available, 3); + + // Mark worker as blocked. + worker->MarkBlocked(); + // Check failed as the worker is blocked. + ASSERT_FALSE(task_manager_.ReleaseCpuResourcesFromUnblockedWorker(worker)); + // Check nothing will be changed. + ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::CPU].available, 8); + ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::GPU].available, 3); + + // Check that the cpu resources are returned back to worker successfully. + ASSERT_TRUE(task_manager_.ReturnCpuResourcesToBlockedWorker(worker)); + + // Check that only cpu resources are returned back to the worker. + ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::CPU].available, 7); + ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::GPU].available, 3); + + // Mark worker as unblocked. + worker->MarkUnblocked(); + ASSERT_FALSE(task_manager_.ReturnCpuResourcesToBlockedWorker(worker)); + // Check nothing will be changed. + ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::CPU].available, 7); + ASSERT_EQ(node_resources.predefined_resources[PredefinedResources::GPU].available, 3); +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); diff --git a/src/ray/raylet/scheduling/old_cluster_resource_scheduler.cc b/src/ray/raylet/scheduling/old_cluster_resource_scheduler.cc new file mode 100644 index 000000000000..9d5c5a9e95e9 --- /dev/null +++ b/src/ray/raylet/scheduling/old_cluster_resource_scheduler.cc @@ -0,0 +1,122 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ray/raylet/scheduling/old_cluster_resource_scheduler.h" + +#include "ray/common/grpc_util.h" +#include "ray/common/ray_config.h" + +namespace ray { +OldClusterResourceScheduler::OldClusterResourceScheduler( + const NodeID &self_node_id, ResourceIdSet &local_available_resources, + std::unordered_map &cluster_resource_map, + std::shared_ptr last_heartbeat_resources) + : self_node_id_string_(self_node_id.Binary()), + local_available_resources_(local_available_resources), + cluster_resource_map_(cluster_resource_map), + last_heartbeat_resources_(last_heartbeat_resources) {} + +bool OldClusterResourceScheduler::RemoveNode(const std::string &node_id_string) { + return cluster_resource_map_.erase(NodeID::FromBinary(node_id_string)) != 0; +} + +void OldClusterResourceScheduler::UpdateResourceCapacity( + const std::string &node_id_string, const std::string &resource_name, + double resource_total) { + if (node_id_string == self_node_id_string_) { + local_available_resources_.AddOrUpdateResource(resource_name, resource_total); + } + auto iter = cluster_resource_map_.find(NodeID::FromBinary(node_id_string)); + if (iter != cluster_resource_map_.end()) { + auto &scheduling_resources = iter->second; + scheduling_resources.UpdateResourceCapacity(resource_name, resource_total); + } +} + +void OldClusterResourceScheduler::DeleteResource(const std::string &node_id_string, + const std::string &resource_name) { + if (node_id_string == self_node_id_string_) { + local_available_resources_.DeleteResource(resource_name); + } + auto iter = cluster_resource_map_.find(NodeID::FromBinary(node_id_string)); + if (iter != cluster_resource_map_.end()) { + auto &scheduling_resources = iter->second; + scheduling_resources.DeleteResource(resource_name); + } +} + +void OldClusterResourceScheduler::FillResourceUsage( + std::shared_ptr resources_data) { + // TODO(atumanov): modify the heartbeat table protocol to use the ResourceSet + // directly. + // TODO(atumanov): implement a ResourceSet const_iterator. + // If light resource usage report enabled, we only set filed that represent resources + // changed. + auto &local_resources = cluster_resource_map_[NodeID::FromBinary(self_node_id_string_)]; + if (!last_heartbeat_resources_->GetTotalResources().IsEqual( + local_resources.GetTotalResources())) { + for (const auto &resource_pair : + local_resources.GetTotalResources().GetResourceMap()) { + (*resources_data->mutable_resources_total())[resource_pair.first] = + resource_pair.second; + } + last_heartbeat_resources_->SetTotalResources( + ResourceSet(local_resources.GetTotalResources())); + } + + if (!last_heartbeat_resources_->GetAvailableResources().IsEqual( + local_resources.GetAvailableResources())) { + resources_data->set_resources_available_changed(true); + for (const auto &resource_pair : + local_resources.GetAvailableResources().GetResourceMap()) { + (*resources_data->mutable_resources_available())[resource_pair.first] = + resource_pair.second; + } + last_heartbeat_resources_->SetAvailableResources( + ResourceSet(local_resources.GetAvailableResources())); + } +} + +bool OldClusterResourceScheduler::UpdateNode(const std::string &node_id_string, + const rpc::ResourcesData &resource_data) { + NodeID node_id = NodeID::FromBinary(node_id_string); + auto iter = cluster_resource_map_.find(node_id); + if (iter == cluster_resource_map_.end()) { + return false; + } + + SchedulingResources &remote_resources = iter->second; + if (resource_data.resources_total_size() > 0) { + ResourceSet remote_total(MapFromProtobuf(resource_data.resources_total())); + remote_resources.SetTotalResources(std::move(remote_total)); + } + if (resource_data.resources_available_changed()) { + ResourceSet remote_available(MapFromProtobuf(resource_data.resources_available())); + remote_resources.SetAvailableResources(std::move(remote_available)); + } + if (resource_data.resource_load_changed()) { + ResourceSet remote_load(MapFromProtobuf(resource_data.resource_load())); + // Extract the load information and save it locally. + remote_resources.SetLoadResources(std::move(remote_load)); + } + return true; +} + +std::string OldClusterResourceScheduler::GetLocalResourceViewString() const { + SchedulingResources &local_resources = + cluster_resource_map_[NodeID::FromBinary(self_node_id_string_)]; + return local_resources.GetAvailableResources().ToString(); +} + +} // namespace ray diff --git a/src/ray/raylet/scheduling/old_cluster_resource_scheduler.h b/src/ray/raylet/scheduling/old_cluster_resource_scheduler.h new file mode 100644 index 000000000000..288a85c1c37a --- /dev/null +++ b/src/ray/raylet/scheduling/old_cluster_resource_scheduler.h @@ -0,0 +1,72 @@ +// Copyright 2017 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "ray/common/task/scheduling_resources.h" +#include "ray/raylet/scheduling/cluster_resource_scheduler_interface.h" + +namespace ray { +class OldClusterResourceScheduler : public ClusterResourceSchedulerInterface { + public: + explicit OldClusterResourceScheduler( + const NodeID &self_node_id, ResourceIdSet &local_available_resources, + std::unordered_map &cluster_resource_map, + std::shared_ptr last_heartbeat_resources); + + /// Remove node from the cluster data structure. This happens + /// when a node fails or it is removed from the cluster. + /// + /// \param node_id_string ID of the node to be removed. + bool RemoveNode(const std::string &node_id_string) override; + + /// Update node resources. This hanppens when a node resource usage udpated. + /// + /// \param node_id_string ID of the node which resoruces need to be udpated. + /// \param resource_data The node resource data. + bool UpdateNode(const std::string &node_id_string, + const rpc::ResourcesData &resource_data) override; + + /// \param node_name: Node whose resource we want to update. + /// \param resource_name: Resource which we want to update. + /// \param resource_total: New capacity of the resource. + void UpdateResourceCapacity(const std::string &node_id_string, + const std::string &resource_name, + double resource_total) override; + + /// Delete a given resource from a given node. + /// + /// \param node_name: Node whose resource we want to delete. + /// \param resource_name: Resource we want to delete + void DeleteResource(const std::string &node_id_string, + const std::string &resource_name) override; + + /// Populate the relevant parts of the heartbeat table. This is intended for + /// sending raylet <-> gcs heartbeats. In particular, this should fill in + /// resources_available and resources_total. + /// + /// \param Output parameter. `resources_available` and `resources_total` are the only + /// fields used. + void FillResourceUsage(std::shared_ptr data) override; + + /// Return local resources in human-readable string form. + std::string GetLocalResourceViewString() const override; + + private: + std::string self_node_id_string_; + ResourceIdSet &local_available_resources_; + std::unordered_map &cluster_resource_map_; + std::shared_ptr last_heartbeat_resources_; +}; +} // namespace ray diff --git a/src/ray/raylet/test/util.h b/src/ray/raylet/test/util.h index 61c94f1658a0..8527220e3df8 100644 --- a/src/ray/raylet/test/util.h +++ b/src/ray/raylet/test/util.h @@ -59,12 +59,9 @@ class MockWorker : public WorkerInterface { RAY_CHECK(false) << "Method unused"; return false; } - void MarkBlocked() { RAY_CHECK(false) << "Method unused"; } - void MarkUnblocked() { RAY_CHECK(false) << "Method unused"; } - bool IsBlocked() const { - RAY_CHECK(false) << "Method unused"; - return false; - } + void MarkBlocked() { blocked_ = true; } + void MarkUnblocked() { blocked_ = false; } + bool IsBlocked() const { return blocked_; } Process GetProcess() const { return Process::CreateNewDummy(); } void SetProcess(Process proc) { RAY_CHECK(false) << "Method unused"; } @@ -190,6 +187,7 @@ class MockWorker : public WorkerInterface { std::vector borrowed_cpu_instances_; bool is_detached_actor_; BundleID bundle_id_; + bool blocked_ = false; }; } // namespace raylet