Skip to content

Commit 534b0e4

Browse files
authored
[core][metric] Redefine gcs STATS using metric interface (#56201)
This PR is in the series of unifying all metric definition infra. This PR migrates all GCS metrics to use the metric interface. It does that by creating the metric object inside gcs_server and pass them down as interfaces to sub-components. Purely refactoring code and repetitive patterns, easier to review than the number of file changed tells you. Test: - CI <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Refactors GCS and core worker to use injected MetricInterface objects for all metrics, adding new metric helpers and rewiring constructors, server startup, storage client, and tests accordingly. > > - **Metrics Infrastructure**: > - Introduce metric helpers in `src/ray/common/metrics.h` and `src/ray/gcs/metrics.h` (gauges/histograms/counters for actors, jobs, placement groups, task events, and GCS storage). > - Replace direct `stats` usage with `MetricInterface` across GCS and core worker; rename helpers (e.g., `GetTaskMetric` -> `GetTaskByStateGaugeMetric`, `GetRayEventRecorderDroppedEventsMetric` -> `GetRayEventRecorderDroppedEventsCounterMetric`). > - **GCS Server Refactor**: > - `GcsServer` now constructs/accepts metric instances and passes them to subcomponents via `Start`/`DoStart` and init methods. > - `GcsActorManager`, `GcsJobManager`, `GcsPlacementGroupManager`, and `GcsTaskManager` constructors updated to receive and record via `MetricInterface`. > - `ObservableStoreClient` wraps delegate and records storage metrics via injected interfaces. > - **Core Worker**: > - `TaskCounter` and `CoreWorker` updated to use task/actor state gauges via injected `MetricInterface`. > - **Tests/Mocks/Build**: > - Update mocks and tests to use `FakeGauge`/`FakeCounter`/`FakeHistogram`; validate metric tags/values. > - Add Bazel targets/deps for new metric headers and fakes; minor BUILD wiring adjustments. > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit bd5ff5a. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Signed-off-by: Cuong Nguyen <can@anyscale.com>
1 parent bc9723a commit 534b0e4

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1065
-362
lines changed

BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ ray_cc_library(
119119
],
120120
),
121121
deps = [
122+
"//src/ray/observability:fake_metric",
122123
"//src/ray/observability:fake_ray_event_recorder",
123124
],
124125
)

src/mock/ray/gcs/gcs_actor_manager.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include <gmock/gmock.h>
1818

1919
#include "ray/gcs/gcs_actor_manager.h"
20+
#include "ray/observability/fake_metric.h"
2021
#include "ray/observability/fake_ray_event_recorder.h"
2122

2223
namespace ray {
@@ -38,7 +39,9 @@ class MockGcsActorManager : public GcsActorManager {
3839
[](const ActorID &) {},
3940
worker_client_pool,
4041
/*ray_event_recorder=*/fake_ray_event_recorder_,
41-
/*session_name=*/"") {}
42+
/*session_name=*/"",
43+
/*actor_by_state_gauge=*/fake_actor_by_state_gauge_,
44+
/*gcs_actor_by_state_gauge=*/fake_gcs_actor_by_state_gauge_) {}
4245

4346
MOCK_METHOD(void,
4447
HandleRegisterActor,
@@ -85,6 +88,8 @@ class MockGcsActorManager : public GcsActorManager {
8588

8689
instrumented_io_context mock_io_context_do_not_use_;
8790
observability::FakeRayEventRecorder fake_ray_event_recorder_;
91+
observability::FakeGauge fake_actor_by_state_gauge_;
92+
observability::FakeGauge fake_gcs_actor_by_state_gauge_;
8893
};
8994

9095
} // namespace gcs

src/mock/ray/gcs/gcs_placement_group_manager.h

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,27 @@
1717
#include <gmock/gmock.h>
1818

1919
#include "ray/gcs/gcs_placement_group_manager.h"
20+
#include "ray/observability/fake_metric.h"
2021

2122
namespace ray {
2223
namespace gcs {
2324

2425
class MockGcsPlacementGroupManager : public GcsPlacementGroupManager {
2526
public:
26-
explicit MockGcsPlacementGroupManager(GcsResourceManager &gcs_resource_manager)
27-
: GcsPlacementGroupManager(context_, gcs_resource_manager) {}
27+
explicit MockGcsPlacementGroupManager(
28+
GcsResourceManager &gcs_resource_manager,
29+
ray::observability::MetricInterface &placement_group_gauge,
30+
ray::observability::MetricInterface
31+
&placement_group_creation_latency_in_ms_histogram,
32+
ray::observability::MetricInterface
33+
&placement_group_scheduling_latency_in_ms_histogram,
34+
ray::observability::MetricInterface &placement_group_count_gauge)
35+
: GcsPlacementGroupManager(context_,
36+
gcs_resource_manager,
37+
placement_group_gauge,
38+
placement_group_creation_latency_in_ms_histogram,
39+
placement_group_scheduling_latency_in_ms_histogram,
40+
placement_group_count_gauge) {}
2841
MOCK_METHOD(void,
2942
HandleCreatePlacementGroup,
3043
(rpc::CreatePlacementGroupRequest request,

src/ray/common/BUILD.bazel

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,3 +420,11 @@ ray_cc_library(
420420
"//src/ray/common:status",
421421
],
422422
)
423+
424+
ray_cc_library(
425+
name = "metrics",
426+
hdrs = ["metrics.h"],
427+
deps = [
428+
"//src/ray/stats:stats_lib",
429+
],
430+
)

src/ray/common/metrics.h

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// Copyright 2025 The Ray Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include "ray/stats/metric.h"
18+
19+
namespace ray {
20+
21+
inline ray::stats::Gauge GetActorByStateGaugeMetric() {
22+
/// Tracks actors by state, including pending, running, and idle actors.
23+
///
24+
/// To avoid metric collection conflicts between components reporting on the same actor,
25+
/// we use the "Source" required label.
26+
return ray::stats::Gauge{
27+
/*name=*/"actors",
28+
/*description=*/
29+
"An actor can be in one of DEPENDENCIES_UNREADY, PENDING_CREATION, ALIVE, "
30+
"ALIVE_IDLE, ALIVE_RUNNING_TASKS, RESTARTING, or DEAD states. "
31+
"An actor is considered ALIVE_IDLE if it is not executing any tasks.",
32+
/*unit=*/"",
33+
// State: the actor state, which is from rpc::ActorTableData::ActorState,
34+
// For ALIVE actor the sub-state can be IDLE, RUNNING_TASK,
35+
// RUNNING_IN_RAY_GET, and RUNNING_IN_RAY_WAIT.
36+
// Name: the name of actor class (Keep in sync with the TASK_OR_ACTOR_NAME_TAG_KEY
37+
// in python/ray/_private/telemetry/metric_cardinality.py) Source: component
38+
// reporting, e.g., "gcs" or "executor".
39+
/*tag_keys=*/{"State", "Name", "Source", "JobId"},
40+
};
41+
}
42+
43+
} // namespace ray

src/ray/core_worker/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ ray_cc_library(
3232
":reference_counter",
3333
":shutdown_coordinator",
3434
":task_event_buffer",
35+
"//src/ray/common:metrics",
3536
"//src/ray/common:protobuf_utils",
3637
"//src/ray/core_worker/task_execution:task_receiver",
3738
"//src/ray/core_worker/task_submission:normal_task_submitter",

src/ray/core_worker/core_worker.cc

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,10 @@ JobID GetProcessJobID(const CoreWorkerOptions &options) {
165165
return options.job_id;
166166
}
167167

168-
TaskCounter::TaskCounter(ray::observability::MetricInterface &task_by_state_counter)
169-
: task_by_state_counter_(task_by_state_counter) {
168+
TaskCounter::TaskCounter(ray::observability::MetricInterface &task_by_state_gauge,
169+
ray::observability::MetricInterface &actor_by_state_gauge)
170+
: task_by_state_gauge_(task_by_state_gauge),
171+
actor_by_state_gauge_(actor_by_state_gauge) {
170172
counter_.SetOnChangeCallback(
171173
[this](const std::tuple<std::string, TaskStatusType, bool>
172174
&key) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) mutable {
@@ -181,31 +183,31 @@ TaskCounter::TaskCounter(ray::observability::MetricInterface &task_by_state_coun
181183
const auto is_retry_label = is_retry ? "1" : "0";
182184
// RUNNING_IN_RAY_GET/WAIT are sub-states of RUNNING, so we need to subtract
183185
// them out to avoid double-counting.
184-
task_by_state_counter_.Record(
186+
task_by_state_gauge_.Record(
185187
running_total - num_in_get - num_in_wait,
186188
{{"State"sv, rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING)},
187189
{"Name"sv, func_name},
188190
{"IsRetry"sv, is_retry_label},
189191
{"JobId"sv, job_id_},
190192
{"Source"sv, "executor"}});
191193
// Negate the metrics recorded from the submitter process for these tasks.
192-
task_by_state_counter_.Record(
194+
task_by_state_gauge_.Record(
193195
-running_total,
194196
{{"State"sv, rpc::TaskStatus_Name(rpc::TaskStatus::SUBMITTED_TO_WORKER)},
195197
{"Name"sv, func_name},
196198
{"IsRetry"sv, is_retry_label},
197199
{"JobId"sv, job_id_},
198200
{"Source"sv, "executor"}});
199201
// Record sub-state for get.
200-
task_by_state_counter_.Record(
202+
task_by_state_gauge_.Record(
201203
num_in_get,
202204
{{"State"sv, rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING_IN_RAY_GET)},
203205
{"Name"sv, func_name},
204206
{"IsRetry"sv, is_retry_label},
205207
{"JobId"sv, job_id_},
206208
{"Source"sv, "executor"}});
207209
// Record sub-state for wait.
208-
task_by_state_counter_.Record(
210+
task_by_state_gauge_.Record(
209211
num_in_wait,
210212
{{"State"sv, rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING_IN_RAY_WAIT)},
211213
{"Name"sv, func_name},
@@ -226,16 +228,16 @@ void TaskCounter::RecordMetrics() {
226228
} else {
227229
running_tasks = 1.0;
228230
}
229-
ray::stats::STATS_actors.Record(idle,
230-
{{"State", "ALIVE_IDLE"},
231-
{"Name", actor_name_},
232-
{"Source", "executor"},
233-
{"JobId", job_id_}});
234-
ray::stats::STATS_actors.Record(running_tasks,
235-
{{"State", "ALIVE_RUNNING_TASKS"},
236-
{"Name", actor_name_},
237-
{"Source", "executor"},
238-
{"JobId", job_id_}});
231+
actor_by_state_gauge_.Record(idle,
232+
{{"State"sv, "ALIVE_IDLE"},
233+
{"Name"sv, actor_name_},
234+
{"Source"sv, "executor"},
235+
{"JobId"sv, job_id_}});
236+
actor_by_state_gauge_.Record(running_tasks,
237+
{{"State"sv, "ALIVE_RUNNING_TASKS"},
238+
{"Name"sv, actor_name_},
239+
{"Source"sv, "executor"},
240+
{"JobId"sv, job_id_}});
239241
}
240242
}
241243

@@ -303,7 +305,8 @@ CoreWorker::CoreWorker(
303305
instrumented_io_context &task_execution_service,
304306
std::unique_ptr<worker::TaskEventBuffer> task_event_buffer,
305307
uint32_t pid,
306-
ray::observability::MetricInterface &task_by_state_counter)
308+
ray::observability::MetricInterface &task_by_state_gauge,
309+
ray::observability::MetricInterface &actor_by_state_gauge)
307310
: options_(std::move(options)),
308311
get_call_site_(RayConfig::instance().record_ref_creation_sites()
309312
? options_.get_lang_stack
@@ -342,7 +345,7 @@ CoreWorker::CoreWorker(
342345
task_execution_service_(task_execution_service),
343346
exiting_detail_(std::nullopt),
344347
max_direct_call_object_size_(RayConfig::instance().max_direct_call_object_size()),
345-
task_counter_(task_by_state_counter),
348+
task_counter_(task_by_state_gauge, actor_by_state_gauge),
346349
task_event_buffer_(std::move(task_event_buffer)),
347350
pid_(pid),
348351
actor_shutdown_callback_(options_.actor_shutdown_callback),

src/ray/core_worker/core_worker.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ class TaskCounter {
6969
enum class TaskStatusType { kPending, kRunning, kFinished };
7070

7171
public:
72-
explicit TaskCounter(ray::observability::MetricInterface &task_by_state_counter);
72+
explicit TaskCounter(ray::observability::MetricInterface &task_by_state_gauge,
73+
ray::observability::MetricInterface &actor_by_state_gauge);
7374

7475
void BecomeActor(const std::string &actor_name) {
7576
absl::MutexLock l(&mu_);
@@ -134,7 +135,8 @@ class TaskCounter {
134135
// - Name: the name of the function called
135136
// - IsRetry: whether the task is a retry
136137
// - Source: component reporting, e.g., "core_worker", "executor", or "pull_manager"
137-
ray::observability::MetricInterface &task_by_state_counter_;
138+
ray::observability::MetricInterface &task_by_state_gauge_;
139+
ray::observability::MetricInterface &actor_by_state_gauge_;
138140
};
139141

140142
struct TaskToRetry {
@@ -199,7 +201,8 @@ class CoreWorker {
199201
instrumented_io_context &task_execution_service,
200202
std::unique_ptr<worker::TaskEventBuffer> task_event_buffer,
201203
uint32_t pid,
202-
ray::observability::MetricInterface &task_by_state_counter);
204+
ray::observability::MetricInterface &task_by_state_counter,
205+
ray::observability::MetricInterface &actor_by_state_counter);
203206

204207
CoreWorker(CoreWorker const &) = delete;
205208

src/ray/core_worker/core_worker_process.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
470470
return core_worker->core_worker_client_pool_->GetOrConnect(*addr);
471471
},
472472
gcs_client,
473-
task_by_state_counter_,
473+
task_by_state_gauge_,
474474
/*free_actor_object_callback=*/
475475
[this](const ObjectID &object_id) {
476476
auto core_worker = GetCoreWorker();
@@ -681,7 +681,8 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
681681
task_execution_service_,
682682
std::move(task_event_buffer),
683683
pid,
684-
task_by_state_counter_);
684+
task_by_state_gauge_,
685+
actor_by_state_gauge_);
685686
return core_worker;
686687
}
687688

src/ray/core_worker/core_worker_process.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <memory>
1919
#include <string>
2020

21+
#include "ray/common/metrics.h"
2122
#include "ray/core_worker/core_worker_options.h"
2223
#include "ray/core_worker/grpc_service.h"
2324
#include "ray/core_worker/metrics.h"
@@ -184,7 +185,8 @@ class CoreWorkerProcessImpl {
184185
/// The client to export metrics to the metrics agent.
185186
std::unique_ptr<ray::rpc::MetricsAgentClient> metrics_agent_client_;
186187

187-
ray::stats::Gauge task_by_state_counter_{GetTaskMetric()};
188+
ray::stats::Gauge task_by_state_gauge_{GetTaskByStateGaugeMetric()};
189+
ray::stats::Gauge actor_by_state_gauge_{GetActorByStateGaugeMetric()};
188190
};
189191
} // namespace core
190192
} // namespace ray

0 commit comments

Comments
 (0)