Skip to content

Commit 1c6e1ea

Browse files
committed
[core][metric] Redefine more STATS using metric interface
Signed-off-by: Cuong Nguyen <can@anyscale.com>
1 parent 495e092 commit 1c6e1ea

18 files changed

+191
-100
lines changed

src/ray/core_worker/core_worker.cc

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,10 @@ JobID GetProcessJobID(const CoreWorkerOptions &options) {
167167
return options.job_id;
168168
}
169169

170-
TaskCounter::TaskCounter(ray::observability::MetricInterface &task_by_state_counter)
171-
: task_by_state_counter_(task_by_state_counter) {
170+
TaskCounter::TaskCounter(ray::observability::MetricInterface &task_by_state_counter,
171+
ray::observability::MetricInterface &actor_by_state_counter)
172+
: task_by_state_counter_(task_by_state_counter),
173+
actor_by_state_counter_(actor_by_state_counter) {
172174
counter_.SetOnChangeCallback(
173175
[this](const std::tuple<std::string, TaskStatusType, bool>
174176
&key) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) mutable {
@@ -234,26 +236,26 @@ void TaskCounter::RecordMetrics() {
234236
} else {
235237
idle = 1.0;
236238
}
237-
ray::stats::STATS_actors.Record(idle,
238-
{{"State", "IDLE"},
239-
{"Name", actor_name_},
240-
{"Source", "executor"},
241-
{"JobId", job_id_}});
242-
ray::stats::STATS_actors.Record(running,
243-
{{"State", "RUNNING_TASK"},
244-
{"Name", actor_name_},
245-
{"Source", "executor"},
246-
{"JobId", job_id_}});
247-
ray::stats::STATS_actors.Record(in_get,
248-
{{"State", "RUNNING_IN_RAY_GET"},
249-
{"Name", actor_name_},
250-
{"Source", "executor"},
251-
{"JobId", job_id_}});
252-
ray::stats::STATS_actors.Record(in_wait,
253-
{{"State", "RUNNING_IN_RAY_WAIT"},
254-
{"Name", actor_name_},
255-
{"Source", "executor"},
256-
{"JobId", job_id_}});
239+
actor_by_state_counter_.Record(idle,
240+
{{"State"sv, "IDLE"},
241+
{"Name"sv, actor_name_},
242+
{"Source"sv, "executor"},
243+
{"JobId"sv, job_id_}});
244+
actor_by_state_counter_.Record(running,
245+
{{"State"sv, "RUNNING_TASK"},
246+
{"Name"sv, actor_name_},
247+
{"Source"sv, "executor"},
248+
{"JobId"sv, job_id_}});
249+
actor_by_state_counter_.Record(in_get,
250+
{{"State"sv, "RUNNING_IN_RAY_GET"},
251+
{"Name"sv, actor_name_},
252+
{"Source"sv, "executor"},
253+
{"JobId"sv, job_id_}});
254+
actor_by_state_counter_.Record(in_wait,
255+
{{"State"sv, "RUNNING_IN_RAY_WAIT"},
256+
{"Name"sv, actor_name_},
257+
{"Source"sv, "executor"},
258+
{"JobId"sv, job_id_}});
257259
}
258260
}
259261

@@ -321,7 +323,8 @@ CoreWorker::CoreWorker(
321323
instrumented_io_context &task_execution_service,
322324
std::unique_ptr<worker::TaskEventBuffer> task_event_buffer,
323325
uint32_t pid,
324-
ray::observability::MetricInterface &task_by_state_counter)
326+
ray::observability::MetricInterface &task_by_state_counter,
327+
ray::observability::MetricInterface &actor_by_state_counter)
325328
: options_(std::move(options)),
326329
get_call_site_(RayConfig::instance().record_ref_creation_sites()
327330
? options_.get_lang_stack
@@ -360,7 +363,7 @@ CoreWorker::CoreWorker(
360363
task_execution_service_(task_execution_service),
361364
exiting_detail_(std::nullopt),
362365
max_direct_call_object_size_(RayConfig::instance().max_direct_call_object_size()),
363-
task_counter_(task_by_state_counter),
366+
task_counter_(task_by_state_counter, actor_by_state_counter),
364367
task_event_buffer_(std::move(task_event_buffer)),
365368
pid_(pid),
366369
actor_shutdown_callback_(std::move(options_.actor_shutdown_callback)),

src/ray/core_worker/core_worker.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ class TaskCounter {
8080
enum class TaskStatusType { kPending, kRunning, kFinished };
8181

8282
public:
83-
explicit TaskCounter(ray::observability::MetricInterface &task_by_state_counter);
83+
explicit TaskCounter(ray::observability::MetricInterface &task_by_state_counter,
84+
ray::observability::MetricInterface &actor_by_state_counter);
8485

8586
void BecomeActor(const std::string &actor_name) {
8687
absl::MutexLock l(&mu_);
@@ -139,6 +140,7 @@ class TaskCounter {
139140
std::string actor_name_ ABSL_GUARDED_BY(mu_);
140141
int64_t num_tasks_running_ ABSL_GUARDED_BY(mu_) = 0;
141142
ray::observability::MetricInterface &task_by_state_counter_;
143+
ray::observability::MetricInterface &actor_by_state_counter_;
142144
};
143145

144146
struct TaskToRetry {
@@ -203,7 +205,8 @@ class CoreWorker {
203205
instrumented_io_context &task_execution_service,
204206
std::unique_ptr<worker::TaskEventBuffer> task_event_buffer,
205207
uint32_t pid,
206-
ray::observability::MetricInterface &task_by_state_counter);
208+
ray::observability::MetricInterface &task_by_state_counter,
209+
ray::observability::MetricInterface &actor_by_state_counter);
207210

208211
CoreWorker(CoreWorker const &) = delete;
209212

src/ray/core_worker/core_worker_process.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -660,7 +660,8 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
660660
task_execution_service_,
661661
std::move(task_event_buffer),
662662
pid,
663-
task_by_state_counter_);
663+
task_by_state_counter_,
664+
actor_by_state_counter_);
664665
return core_worker;
665666
}
666667

src/ray/core_worker/core_worker_process.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ class CoreWorkerProcessImpl {
184184
std::unique_ptr<ray::rpc::MetricsAgentClient> metrics_agent_client_;
185185

186186
ray::stats::Gauge task_by_state_counter_{GetTaskMetric()};
187+
ray::stats::Gauge actor_by_state_counter_{GetActorMetric()};
187188
};
188189
} // namespace core
189190
} // namespace ray

src/ray/core_worker/metrics.h

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,5 +40,24 @@ inline ray::stats::Gauge GetTaskMetric() {
4040
};
4141
}
4242

43+
inline ray::stats::Gauge GetActorMetric() {
44+
/// Tracks actors by state, including pending, running, and idle actors.
45+
///
46+
/// To avoid metric collection conflicts between components reporting on the same actor,
47+
/// we use the "Source" required label.
48+
return ray::stats::Gauge{
49+
/*name=*/"actors",
50+
/*description=*/"Current number of actors currently in a particular state.",
51+
/*unit=*/"",
52+
// State: the actor state, which is from rpc::ActorTableData::ActorState,
53+
// For ALIVE actor the sub-state can be IDLE, RUNNING_TASK,
54+
// RUNNING_IN_RAY_GET, and RUNNING_IN_RAY_WAIT.
55+
// Name: the name of actor class (Keep in sync with the TASK_OR_ACTOR_NAME_TAG_KEY
56+
// in python/ray/_private/telemetry/metric_cardinality.py) Source: component
57+
// reporting, e.g., "gcs" or "executor".
58+
/*tag_keys=*/{"State", "Name", "Source", "JobId"},
59+
};
60+
}
61+
4362
} // namespace core
4463
} // namespace ray

src/ray/gcs/BUILD.bazel

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,11 @@ ray_cc_library(
1414
"//src/ray/util:time",
1515
],
1616
)
17+
18+
ray_cc_library(
19+
name = "metrics",
20+
hdrs = ["metrics.h"],
21+
deps = [
22+
"//src/ray/stats:stats_lib",
23+
],
24+
)

src/ray/gcs/gcs_server/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,7 @@ ray_cc_library(
252252
"//src/ray/gcs:gcs_pb_util",
253253
"//src/ray/gcs/pubsub:gcs_pub_sub_lib",
254254
"//src/ray/rpc:core_worker_client",
255+
"//src/ray/observability:metric_interface",
255256
"//src/ray/stats:stats_metric",
256257
"//src/ray/util:event",
257258
"//src/ray/util:thread_checker",
@@ -509,6 +510,8 @@ ray_cc_library(
509510
":gcs_worker_manager",
510511
":grpc_service_interfaces",
511512
":grpc_services",
513+
"//src/ray/core_worker:metrics",
514+
"//src/ray/gcs:metrics",
512515
"//src/ray/gcs/pubsub:gcs_pub_sub_lib",
513516
"//src/ray/gcs/store_client",
514517
"//src/ray/gcs/store_client:in_memory_store_client",

src/ray/gcs/gcs_server/gcs_actor_manager.cc

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,8 @@ bool OnInitializeActorShouldLoad(const ray::gcs::GcsInitData &gcs_init_data,
176176
namespace ray {
177177
namespace gcs {
178178

179+
using std::literals::operator""sv;
180+
179181
bool is_uuid(const std::string &str) {
180182
static const boost::regex e(
181183
"[a-f0-9]{8}-[a-f0-9]{4}-4[a-f0-9]{3}-[89aAbB][a-f0-9]{3}-[a-f0-9]{12}");
@@ -226,7 +228,8 @@ GcsActorManager::GcsActorManager(
226228
RuntimeEnvManager &runtime_env_manager,
227229
GCSFunctionManager &function_manager,
228230
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
229-
rpc::CoreWorkerClientPool &worker_client_pool)
231+
rpc::CoreWorkerClientPool &worker_client_pool,
232+
ray::observability::MetricInterface &actor_by_state_counter)
230233
: gcs_actor_scheduler_(std::move(scheduler)),
231234
gcs_table_storage_(gcs_table_storage),
232235
io_context_(io_context),
@@ -236,19 +239,20 @@ GcsActorManager::GcsActorManager(
236239
std::move(destroy_owned_placement_group_if_needed)),
237240
runtime_env_manager_(runtime_env_manager),
238241
function_manager_(function_manager),
239-
actor_gc_delay_(RayConfig::instance().gcs_actor_table_min_duration_ms()) {
242+
actor_gc_delay_(RayConfig::instance().gcs_actor_table_min_duration_ms()),
243+
actor_by_state_counter_(actor_by_state_counter) {
240244
RAY_CHECK(destroy_owned_placement_group_if_needed_);
241245
actor_state_counter_ = std::make_shared<
242246
CounterMap<std::pair<rpc::ActorTableData::ActorState, std::string>>>();
243247
actor_state_counter_->SetOnChangeCallback(
244248
[this](const std::pair<rpc::ActorTableData::ActorState, std::string> key) mutable {
245249
int64_t num_actors = actor_state_counter_->Get(key);
246-
ray::stats::STATS_actors.Record(
250+
actor_by_state_counter_.Record(
247251
num_actors,
248-
{{"State", rpc::ActorTableData::ActorState_Name(key.first)},
249-
{"Name", key.second},
250-
{"Source", "gcs"},
251-
{"JobId", ""}});
252+
{{"State"sv, rpc::ActorTableData::ActorState_Name(key.first)},
253+
{"Name"sv, key.second},
254+
{"Source"sv, "gcs"},
255+
{"JobId"sv, ""}});
252256
});
253257
}
254258

src/ray/gcs/gcs_server/gcs_actor_manager.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
#include "ray/gcs/gcs_server/grpc_service_interfaces.h"
3636
#include "ray/gcs/gcs_server/usage_stats_client.h"
3737
#include "ray/gcs/pubsub/gcs_pub_sub.h"
38+
#include "ray/observability/metric_interface.h"
3839
#include "ray/rpc/worker/core_worker_client.h"
3940
#include "ray/rpc/worker/core_worker_client_pool.h"
4041
#include "ray/util/counter_map.h"
@@ -104,7 +105,8 @@ class GcsActorManager : public rpc::ActorInfoGcsServiceHandler {
104105
RuntimeEnvManager &runtime_env_manager,
105106
GCSFunctionManager &function_manager,
106107
std::function<void(const ActorID &)> destroy_owned_placement_group_if_needed,
107-
rpc::CoreWorkerClientPool &worker_client_pool);
108+
rpc::CoreWorkerClientPool &worker_client_pool,
109+
ray::observability::MetricInterface &actor_by_state_counter);
108110

109111
~GcsActorManager() override = default;
110112

@@ -495,6 +497,7 @@ class GcsActorManager : public rpc::ActorInfoGcsServiceHandler {
495497
/// Counter of actors broken down by (State, ClassName).
496498
std::shared_ptr<CounterMap<std::pair<rpc::ActorTableData::ActorState, std::string>>>
497499
actor_state_counter_;
500+
ray::observability::MetricInterface &actor_by_state_counter_;
498501

499502
/// Total number of successfully created actors in the cluster lifetime.
500503
int64_t liftime_num_created_actors_ = 0;

src/ray/gcs/gcs_server/gcs_job_manager.cc

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
namespace ray {
3131
namespace gcs {
3232

33+
using std::literals::operator""sv;
34+
3335
void GcsJobManager::Initialize(const GcsInitData &gcs_init_data) {
3436
for (const auto &[job_id, job_table_data] : gcs_init_data.Jobs()) {
3537
cached_job_configs_[job_id] =
@@ -164,9 +166,9 @@ void GcsJobManager::MarkJobAsFinished(rpc::JobTableData job_table_data,
164166
auto iter = running_job_start_times_.find(job_id);
165167
if (iter != running_job_start_times_.end()) {
166168
running_job_start_times_.erase(iter);
167-
ray::stats::STATS_job_duration_s.Record(
169+
job_duration_in_seconds_counter_.Record(
168170
(job_table_data.end_time() - job_table_data.start_time()) / 1000.0,
169-
{{"JobId", job_id.Hex()}});
171+
{{"JobId"sv, job_id.Hex()}});
170172
++finished_jobs_count_;
171173
}
172174

@@ -494,12 +496,12 @@ void GcsJobManager::OnNodeDead(const NodeID &node_id) {
494496
}
495497

496498
void GcsJobManager::RecordMetrics() {
497-
ray::stats::STATS_running_jobs.Record(running_job_start_times_.size());
498-
ray::stats::STATS_finished_jobs.Record(finished_jobs_count_);
499+
running_job_counter_.Record(running_job_start_times_.size());
500+
finished_job_counter_.Record(finished_jobs_count_);
499501

500502
for (const auto &[job_id, start_time] : running_job_start_times_) {
501-
ray::stats::STATS_job_duration_s.Record((current_sys_time_ms() - start_time) / 1000.0,
502-
{{"JobId", job_id.Hex()}});
503+
job_duration_in_seconds_counter_.Record((current_sys_time_ms() - start_time) / 1000.0,
504+
{{"JobId"sv, job_id.Hex()}});
503505
}
504506
}
505507

0 commit comments

Comments
 (0)