Skip to content

Commit

Permalink
[core] NodeManager: code refactor and cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: hongchaodeng <hongchaodeng1@gmail.com>
  • Loading branch information
hongchaodeng committed May 12, 2024
1 parent 3bcdfd8 commit bb9ea4a
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 34 deletions.
40 changes: 21 additions & 19 deletions src/ray/raylet/node_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,30 @@

#include <cctype>
#include <csignal>
#include <filesystem>
#include <fstream>
#include <memory>
#include <utility>

#include "absl/functional/bind_front.h"
#include "absl/synchronization/notification.h"
#include "absl/time/clock.h"
#include "boost/system/error_code.hpp"
#include "ray/common/asio/asio_util.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/buffer.h"
#include "ray/common/common_protocol.h"
#include "ray/common/constants.h"
#include "ray/common/memory_monitor.h"
#include "ray/common/scheduling/scheduling_ids.h"
#include "ray/common/status.h"
#include "ray/common/task/task_common.h"
#include "ray/gcs/pb_util.h"
#include "ray/raylet/format/node_manager_generated.h"
#include "ray/raylet/raylet_util.h"
#include "ray/raylet/scheduling/cluster_task_manager.h"
#include "ray/raylet/worker_killing_policy.h"
#include "ray/rpc/node_manager/node_manager_client.h"
#include "ray/stats/metric_defs.h"
#include "ray/stats/stats.h"
#include "ray/util/event.h"
#include "ray/util/event_label.h"
#include "ray/util/sample.h"
#include "ray/util/util.h"

namespace {
Expand Down Expand Up @@ -299,13 +299,12 @@ NodeManager::NodeManager(instrumented_io_context &io_service,
// safely drained when this function reports zero.
int64_t bytes_used = local_object_manager_.GetPrimaryBytes();
// Report nonzero if we have objects spilled to the local filesystem.
if (bytes_used == 0) {
bytes_used = local_object_manager_.HasLocallySpilledObjects();
if (bytes_used == 0 && local_object_manager_.HasLocallySpilledObjects()) {
bytes_used = 1;
}
return bytes_used;
} else {
return object_manager_.GetUsedMemory();
}
return object_manager_.GetUsedMemory();
},
/*get_pull_manager_at_capacity*/
[this]() { return object_manager_.PullManagerHasPullsQueued(); },
Expand All @@ -321,8 +320,9 @@ NodeManager::NodeManager(instrumented_io_context &io_service,
RAY_CHECK(RayConfig::instance().max_task_args_memory_fraction() > 0 &&
RayConfig::instance().max_task_args_memory_fraction() <= 1)
<< "max_task_args_memory_fraction must be a nonzero fraction.";
int64_t max_task_args_memory = object_manager_.GetMemoryCapacity() *
RayConfig::instance().max_task_args_memory_fraction();
auto max_task_args_memory =
static_cast<int64_t>(static_cast<float>(object_manager_.GetMemoryCapacity()) *
RayConfig::instance().max_task_args_memory_fraction());
if (max_task_args_memory <= 0) {
RAY_LOG(WARNING)
<< "Max task args should be a fraction of the object store capacity, but object "
Expand Down Expand Up @@ -360,9 +360,9 @@ NodeManager::NodeManager(instrumented_io_context &io_service,
RayConfig::instance().worker_cap_initial_backoff_delay_ms(),
"NodeManager.ScheduleAndDispatchTasks");

RAY_CHECK_OK(store_client_->Connect(config.store_socket_name.c_str()));
RAY_CHECK_OK(store_client_->Connect(config.store_socket_name));
// Run the node manger rpc server.
node_manager_server_.RegisterService(node_manager_service_, false /* token_auth */);
node_manager_server_.RegisterService(node_manager_service_, false);
node_manager_server_.RegisterService(ray_syncer_service_);
node_manager_server_.Run();
// GCS will check the health of the service named with the node id.
Expand All @@ -379,7 +379,8 @@ NodeManager::NodeManager(instrumented_io_context &io_service,
config.node_manager_address,
config.runtime_env_agent_port, /*delay_executor=*/
[this](std::function<void()> task, uint32_t delay_ms) {
return execute_after(io_service_, task, std::chrono::milliseconds(delay_ms));
return execute_after(
io_service_, std::move(task), std::chrono::milliseconds(delay_ms));
});

worker_pool_.SetRuntimeEnvAgentClient(runtime_env_agent_client_);
Expand Down Expand Up @@ -468,7 +469,7 @@ ray::Status NodeManager::RegisterGcs() {
HandleUnexpectedWorkerFailure(worker_failure_data);
};
RAY_CHECK_OK(gcs_client_->Workers().AsyncSubscribeToWorkerFailures(
worker_failure_handler, /*done_callback=*/nullptr));
worker_failure_handler, nullptr));

// Subscribe to job updates.
const auto job_subscribe_handler = [this](const JobID &job_id,
Expand Down Expand Up @@ -2300,7 +2301,7 @@ void NodeManager::ProcessSubscribePlasmaReady(
<< "No worker exists for CoreWorker with client: " << client->DebugString();

auto message = flatbuffers::GetRoot<protocol::SubscribePlasmaReady>(message_data);
ObjectID id = from_flatbuf<ObjectID>(*message->object_id());
auto id = from_flatbuf<ObjectID>(*message->object_id());

if (dependency_manager_.CheckObjectLocal(id)) {
// Object is already local, so we directly fire the callback to tell the core worker
Expand Down Expand Up @@ -3009,9 +3010,10 @@ void NodeManager::SetTaskFailureReason(const TaskID &task_id,

void NodeManager::GCTaskFailureReason() {
for (const auto &entry : task_failure_reasons_) {
auto duration = (uint64_t)std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - entry.second.creation_time)
.count();
auto duration = static_cast<uint64_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - entry.second.creation_time)
.count());
if (duration > RayConfig::instance().task_failure_entry_ttl_ms()) {
RAY_LOG(INFO) << "Removing task failure reason since it expired, task: "
<< entry.first;
Expand Down
13 changes: 4 additions & 9 deletions src/ray/raylet/node_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@
#pragma once

// clang-format off
#include "ray/rpc/grpc_client.h"
#include "ray/rpc/node_manager/node_manager_server.h"
#include "ray/rpc/node_manager/node_manager_client.h"
#include "ray/common/id.h"
#include "ray/common/memory_monitor.h"
#include "ray/common/task/task.h"
#include "ray/common/ray_object.h"
#include "ray/common/ray_syncer/ray_syncer.h"
#include "ray/common/client_connection.h"
#include "ray/common/task/task_common.h"
#include "ray/common/task/task_util.h"
#include "ray/common/scheduling/resource_set.h"
#include "ray/pubsub/subscriber.h"
Expand All @@ -33,16 +30,13 @@
#include "ray/raylet/runtime_env_agent_client.h"
#include "ray/raylet_client/raylet_client.h"
#include "ray/raylet/local_object_manager.h"
#include "ray/common/scheduling/scheduling_ids.h"
#include "ray/raylet/scheduling/cluster_resource_scheduler.h"
#include "ray/raylet/scheduling/cluster_task_manager.h"
#include "ray/raylet/scheduling/cluster_task_manager_interface.h"
#include "ray/raylet/dependency_manager.h"
#include "ray/raylet/local_task_manager.h"
#include "ray/raylet/wait_manager.h"
#include "ray/raylet/worker_pool.h"
#include "ray/rpc/worker/core_worker_client_pool.h"
#include "ray/util/ordered_set.h"
#include "ray/util/throttler.h"
#include "ray/common/asio/instrumented_io_context.h"
#include "ray/common/bundle_spec.h"
Expand Down Expand Up @@ -128,8 +122,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
public:
/// Create a node manager.
///
/// \param resource_config The initial set of node resources.
/// \param object_manager A reference to the local object manager.
/// \param config Configuration of node manager, e.g. initial resources, ports, etc.
/// \param object_manager_config Configuration of object manager, e.g. initial memory
/// allocation.
NodeManager(instrumented_io_context &io_service,
const NodeID &self_node_id,
const std::string &self_node_name,
Expand Down Expand Up @@ -196,7 +191,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler,
/// \param object_ids The object ids to store error messages into.
/// \param job_id The optional job to push errors to if the writes fail.
void MarkObjectsAsFailed(const ErrorType &error_type,
const std::vector<rpc::ObjectReference> object_ids,
std::vector<rpc::ObjectReference> object_ids,
const JobID &job_id);

/// Stop this node manager.
Expand Down
10 changes: 6 additions & 4 deletions src/ray/raylet/raylet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
#include <boost/asio.hpp>
#include <boost/bind/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <iostream>

#include "ray/common/client_connection.h"
#include "ray/common/scheduling/resource_set.h"
#include "ray/common/status.h"
#include "ray/util/util.h"

Expand Down Expand Up @@ -100,11 +100,13 @@ Raylet::Raylet(instrumented_io_context &main_service,

// Setting up autoscaler related fields from ENV
auto instance_id = std::getenv(kNodeCloudInstanceIdEnv);
self_node_info_.set_instance_id(instance_id ? instance_id : "");
self_node_info_.set_instance_id((instance_id != nullptr) ? instance_id : "");
auto cloud_node_type_name = std::getenv(kNodeTypeNameEnv);
self_node_info_.set_node_type_name(cloud_node_type_name ? cloud_node_type_name : "");
self_node_info_.set_node_type_name(
(cloud_node_type_name != nullptr) ? cloud_node_type_name : "");
auto instance_type_name = std::getenv(kNodeCloudInstanceTypeNameEnv);
self_node_info_.set_instance_type_name(instance_type_name ? instance_type_name : "");
self_node_info_.set_instance_type_name(
(instance_type_name != nullptr) ? instance_type_name : "");
}

Raylet::~Raylet() {}
Expand Down
2 changes: 0 additions & 2 deletions src/ray/raylet/raylet.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@

#include <boost/asio.hpp>
#include <boost/asio/error.hpp>
#include <list>

// clang-format off
#include "ray/raylet/node_manager.h"
#include "ray/object_manager/object_manager.h"
#include "ray/common/scheduling/resource_set.h"
#include "ray/common/asio/instrumented_io_context.h"
// clang-format on

Expand Down

0 comments on commit bb9ea4a

Please sign in to comment.