Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ ray_cc_library(
],
),
deps = [
"//src/ray/observability:fake_metric",
"//src/ray/observability:fake_ray_event_recorder",
],
)
Expand Down
7 changes: 6 additions & 1 deletion src/mock/ray/gcs/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <gmock/gmock.h>

#include "ray/gcs/gcs_actor_manager.h"
#include "ray/observability/fake_metric.h"
#include "ray/observability/fake_ray_event_recorder.h"

namespace ray {
Expand All @@ -38,7 +39,9 @@ class MockGcsActorManager : public GcsActorManager {
[](const ActorID &) {},
worker_client_pool,
/*ray_event_recorder=*/fake_ray_event_recorder_,
/*session_name=*/"") {}
/*session_name=*/"",
/*actor_by_state_gauge=*/fake_actor_by_state_gauge_,
/*gcs_actor_by_state_gauge=*/fake_gcs_actor_by_state_gauge_) {}

MOCK_METHOD(void,
HandleRegisterActor,
Expand Down Expand Up @@ -85,6 +88,8 @@ class MockGcsActorManager : public GcsActorManager {

instrumented_io_context mock_io_context_do_not_use_;
observability::FakeRayEventRecorder fake_ray_event_recorder_;
observability::FakeGauge fake_actor_by_state_gauge_;
observability::FakeGauge fake_gcs_actor_by_state_gauge_;
};

} // namespace gcs
Expand Down
17 changes: 15 additions & 2 deletions src/mock/ray/gcs/gcs_placement_group_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,27 @@
#include <gmock/gmock.h>

#include "ray/gcs/gcs_placement_group_manager.h"
#include "ray/observability/fake_metric.h"

namespace ray {
namespace gcs {

class MockGcsPlacementGroupManager : public GcsPlacementGroupManager {
public:
explicit MockGcsPlacementGroupManager(GcsResourceManager &gcs_resource_manager)
: GcsPlacementGroupManager(context_, gcs_resource_manager) {}
explicit MockGcsPlacementGroupManager(
GcsResourceManager &gcs_resource_manager,
ray::observability::MetricInterface &placement_group_gauge,
ray::observability::MetricInterface
&placement_group_creation_latency_in_ms_histogram,
ray::observability::MetricInterface
&placement_group_scheduling_latency_in_ms_histogram,
ray::observability::MetricInterface &placement_group_count_gauge)
: GcsPlacementGroupManager(context_,
gcs_resource_manager,
placement_group_gauge,
placement_group_creation_latency_in_ms_histogram,
placement_group_scheduling_latency_in_ms_histogram,
placement_group_count_gauge) {}
MOCK_METHOD(void,
HandleCreatePlacementGroup,
(rpc::CreatePlacementGroupRequest request,
Expand Down
8 changes: 8 additions & 0 deletions src/ray/common/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -420,3 +420,11 @@ ray_cc_library(
"//src/ray/common:status",
],
)

ray_cc_library(
name = "metrics",
hdrs = ["metrics.h"],
deps = [
"//src/ray/stats:stats_lib",
],
)
43 changes: 43 additions & 0 deletions src/ray/common/metrics.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2025 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "ray/stats/metric.h"

namespace ray {

inline ray::stats::Gauge GetActorByStateGaugeMetric() {
/// Tracks actors by state, including pending, running, and idle actors.
///
/// To avoid metric collection conflicts between components reporting on the same actor,
Copy link
Contributor

@ZacAttack ZacAttack Sep 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused by this. Right now we have component labels for things like raylet and gcs and we have a Name label for a metric name. Now we're adding a source label. Why would two components necessarily conflict in the current set up? Are they within the same component? I'm unclear why we need an additional label now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point about Source vs. Component. I’m also not sure of the original purpose of each, these tags predate both my time and this PR (which is purely a refactoring and doesn’t add the Source tag). I’m open to revisiting or merging the two tags in a follow-up PR.

/// we use the "Source" required label.
return ray::stats::Gauge{
/*name=*/"actors",
/*description=*/
"An actor can be in one of DEPENDENCIES_UNREADY, PENDING_CREATION, ALIVE, "
"ALIVE_IDLE, ALIVE_RUNNING_TASKS, RESTARTING, or DEAD states. "
"An actor is considered ALIVE_IDLE if it is not executing any tasks.",
/*unit=*/"",
// State: the actor state, which is from rpc::ActorTableData::ActorState,
// For ALIVE actor the sub-state can be IDLE, RUNNING_TASK,
// RUNNING_IN_RAY_GET, and RUNNING_IN_RAY_WAIT.
// Name: the name of actor class (Keep in sync with the TASK_OR_ACTOR_NAME_TAG_KEY
// in python/ray/_private/telemetry/metric_cardinality.py) Source: component
// reporting, e.g., "gcs" or "executor".
/*tag_keys=*/{"State", "Name", "Source", "JobId"},
};
}

} // namespace ray
1 change: 1 addition & 0 deletions src/ray/core_worker/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ ray_cc_library(
":reference_counter",
":shutdown_coordinator",
":task_event_buffer",
"//src/ray/common:metrics",
"//src/ray/common:protobuf_utils",
"//src/ray/core_worker/task_execution:task_receiver",
"//src/ray/core_worker/task_submission:normal_task_submitter",
Expand Down
39 changes: 21 additions & 18 deletions src/ray/core_worker/core_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,10 @@ JobID GetProcessJobID(const CoreWorkerOptions &options) {
return options.job_id;
}

TaskCounter::TaskCounter(ray::observability::MetricInterface &task_by_state_counter)
: task_by_state_counter_(task_by_state_counter) {
TaskCounter::TaskCounter(ray::observability::MetricInterface &task_by_state_gauge,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should change the name of the type since it's now a gauge?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The internal Prometheus metric has always been a gauge, while the wrapper has always been TaskCounter. I fixed the naming in this PR from task_by_state_counter to task_by_state_gauge to correct a regression I introduced earlier, but underneath, it has always been a gauge. I can see the merit in naming the wrapper Counter, since the concept of a gauge might feel unfamiliar or like an implementation detail, at least to me. But I’m open to using either name for the wrapper, though elsewhere in the codebase Gauge and Counter are used interchangeably for wrapper names (like in [1]).

ray::observability::MetricInterface &actor_by_state_gauge)
: task_by_state_gauge_(task_by_state_gauge),
actor_by_state_gauge_(actor_by_state_gauge) {
counter_.SetOnChangeCallback(
[this](const std::tuple<std::string, TaskStatusType, bool>
&key) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) mutable {
Expand All @@ -181,31 +183,31 @@ TaskCounter::TaskCounter(ray::observability::MetricInterface &task_by_state_coun
const auto is_retry_label = is_retry ? "1" : "0";
// RUNNING_IN_RAY_GET/WAIT are sub-states of RUNNING, so we need to subtract
// them out to avoid double-counting.
task_by_state_counter_.Record(
task_by_state_gauge_.Record(
running_total - num_in_get - num_in_wait,
{{"State"sv, rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING)},
{"Name"sv, func_name},
{"IsRetry"sv, is_retry_label},
{"JobId"sv, job_id_},
{"Source"sv, "executor"}});
// Negate the metrics recorded from the submitter process for these tasks.
task_by_state_counter_.Record(
task_by_state_gauge_.Record(
-running_total,
{{"State"sv, rpc::TaskStatus_Name(rpc::TaskStatus::SUBMITTED_TO_WORKER)},
{"Name"sv, func_name},
{"IsRetry"sv, is_retry_label},
{"JobId"sv, job_id_},
{"Source"sv, "executor"}});
// Record sub-state for get.
task_by_state_counter_.Record(
task_by_state_gauge_.Record(
num_in_get,
{{"State"sv, rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING_IN_RAY_GET)},
{"Name"sv, func_name},
{"IsRetry"sv, is_retry_label},
{"JobId"sv, job_id_},
{"Source"sv, "executor"}});
// Record sub-state for wait.
task_by_state_counter_.Record(
task_by_state_gauge_.Record(
num_in_wait,
{{"State"sv, rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING_IN_RAY_WAIT)},
{"Name"sv, func_name},
Expand All @@ -226,16 +228,16 @@ void TaskCounter::RecordMetrics() {
} else {
running_tasks = 1.0;
}
ray::stats::STATS_actors.Record(idle,
{{"State", "ALIVE_IDLE"},
{"Name", actor_name_},
{"Source", "executor"},
{"JobId", job_id_}});
ray::stats::STATS_actors.Record(running_tasks,
{{"State", "ALIVE_RUNNING_TASKS"},
{"Name", actor_name_},
{"Source", "executor"},
{"JobId", job_id_}});
actor_by_state_gauge_.Record(idle,
{{"State"sv, "ALIVE_IDLE"},
{"Name"sv, actor_name_},
{"Source"sv, "executor"},
{"JobId"sv, job_id_}});
actor_by_state_gauge_.Record(running_tasks,
{{"State"sv, "ALIVE_RUNNING_TASKS"},
{"Name"sv, actor_name_},
{"Source"sv, "executor"},
{"JobId"sv, job_id_}});
}
}

Expand Down Expand Up @@ -303,7 +305,8 @@ CoreWorker::CoreWorker(
instrumented_io_context &task_execution_service,
std::unique_ptr<worker::TaskEventBuffer> task_event_buffer,
uint32_t pid,
ray::observability::MetricInterface &task_by_state_counter)
ray::observability::MetricInterface &task_by_state_gauge,
ray::observability::MetricInterface &actor_by_state_gauge)
: options_(std::move(options)),
get_call_site_(RayConfig::instance().record_ref_creation_sites()
? options_.get_lang_stack
Expand Down Expand Up @@ -342,7 +345,7 @@ CoreWorker::CoreWorker(
task_execution_service_(task_execution_service),
exiting_detail_(std::nullopt),
max_direct_call_object_size_(RayConfig::instance().max_direct_call_object_size()),
task_counter_(task_by_state_counter),
task_counter_(task_by_state_gauge, actor_by_state_gauge),
task_event_buffer_(std::move(task_event_buffer)),
pid_(pid),
actor_shutdown_callback_(options_.actor_shutdown_callback),
Expand Down
9 changes: 6 additions & 3 deletions src/ray/core_worker/core_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ class TaskCounter {
enum class TaskStatusType { kPending, kRunning, kFinished };

public:
explicit TaskCounter(ray::observability::MetricInterface &task_by_state_counter);
explicit TaskCounter(ray::observability::MetricInterface &task_by_state_gauge,
ray::observability::MetricInterface &actor_by_state_gauge);

void BecomeActor(const std::string &actor_name) {
absl::MutexLock l(&mu_);
Expand Down Expand Up @@ -134,7 +135,8 @@ class TaskCounter {
// - Name: the name of the function called
// - IsRetry: whether the task is a retry
// - Source: component reporting, e.g., "core_worker", "executor", or "pull_manager"
ray::observability::MetricInterface &task_by_state_counter_;
ray::observability::MetricInterface &task_by_state_gauge_;
ray::observability::MetricInterface &actor_by_state_gauge_;
};

struct TaskToRetry {
Expand Down Expand Up @@ -199,7 +201,8 @@ class CoreWorker {
instrumented_io_context &task_execution_service,
std::unique_ptr<worker::TaskEventBuffer> task_event_buffer,
uint32_t pid,
ray::observability::MetricInterface &task_by_state_counter);
ray::observability::MetricInterface &task_by_state_counter,
ray::observability::MetricInterface &actor_by_state_counter);

CoreWorker(CoreWorker const &) = delete;

Expand Down
5 changes: 3 additions & 2 deletions src/ray/core_worker/core_worker_process.cc
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
return core_worker->core_worker_client_pool_->GetOrConnect(*addr);
},
gcs_client,
task_by_state_counter_,
task_by_state_gauge_,
/*free_actor_object_callback=*/
[this](const ObjectID &object_id) {
auto core_worker = GetCoreWorker();
Expand Down Expand Up @@ -680,7 +680,8 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
task_execution_service_,
std::move(task_event_buffer),
pid,
task_by_state_counter_);
task_by_state_gauge_,
actor_by_state_gauge_);
return core_worker;
}

Expand Down
4 changes: 3 additions & 1 deletion src/ray/core_worker/core_worker_process.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <memory>
#include <string>

#include "ray/common/metrics.h"
#include "ray/core_worker/core_worker_options.h"
#include "ray/core_worker/grpc_service.h"
#include "ray/core_worker/metrics.h"
Expand Down Expand Up @@ -184,7 +185,8 @@ class CoreWorkerProcessImpl {
/// The client to export metrics to the metrics agent.
std::unique_ptr<ray::rpc::MetricsAgentClient> metrics_agent_client_;

ray::stats::Gauge task_by_state_counter_{GetTaskMetric()};
ray::stats::Gauge task_by_state_gauge_{GetTaskByStateGaugeMetric()};
ray::stats::Gauge actor_by_state_gauge_{GetActorByStateGaugeMetric()};
};
} // namespace core
} // namespace ray
2 changes: 1 addition & 1 deletion src/ray/core_worker/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
namespace ray {
namespace core {

inline ray::stats::Gauge GetTaskMetric() {
inline ray::stats::Gauge GetTaskByStateGaugeMetric() {
/// Tracks tasks by state, including pending, running, and finished tasks.
/// This metric may be recorded from multiple components processing the task in Ray,
/// including the submitting core worker, executor core worker, and pull manager.
Expand Down
10 changes: 6 additions & 4 deletions src/ray/core_worker/tests/core_worker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ class CoreWorkerTest : public ::testing::Test {
return std::make_shared<rpc::FakeCoreWorkerClient>();
},
mock_gcs_client,
fake_task_by_state_counter_,
fake_task_by_state_gauge_,
/*free_actor_object_callback=*/[](const ObjectID &object_id) {});

auto object_recovery_manager = std::make_unique<ObjectRecoveryManager>(
Expand Down Expand Up @@ -272,7 +272,8 @@ class CoreWorkerTest : public ::testing::Test {
task_execution_service_,
std::move(task_event_buffer),
getpid(),
fake_task_by_state_counter_);
fake_task_by_state_gauge_,
fake_actor_by_state_gauge_);
}

protected:
Expand All @@ -291,7 +292,8 @@ class CoreWorkerTest : public ::testing::Test {
pubsub::Publisher *object_info_publisher_;
std::shared_ptr<TaskManager> task_manager_;
std::shared_ptr<CoreWorker> core_worker_;
ray::observability::FakeGauge fake_task_by_state_counter_;
ray::observability::FakeGauge fake_task_by_state_gauge_;
ray::observability::FakeGauge fake_actor_by_state_gauge_;
std::unique_ptr<FakePeriodicalRunner> fake_periodical_runner_;

// Controllable time for testing publisher timeouts
Expand All @@ -317,7 +319,7 @@ TEST_F(CoreWorkerTest, RecordMetrics) {
ASSERT_TRUE(status.ok());
// disconnect to trigger metric recording
core_worker_->Disconnect(rpc::WorkerExitType::SYSTEM_ERROR, "test", nullptr);
auto tag_to_value = fake_task_by_state_counter_.GetTagToValue();
auto tag_to_value = fake_task_by_state_gauge_.GetTagToValue();
// 4 states: RUNNING, SUBMITTED_TO_WORKER, RUNNING_IN_RAY_GET and RUNNING_IN_RAY_WAIT
ASSERT_EQ(tag_to_value.size(), 4);
for (auto &[key, value] : tag_to_value) {
Expand Down
12 changes: 12 additions & 0 deletions src/ray/gcs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ ray_cc_library(
"//src/ray/common:protobuf_utils",
"//src/ray/common:runtime_env",
"//src/ray/core_worker_rpc_client:core_worker_client_pool",
"//src/ray/observability:metric_interface",
"//src/ray/observability:ray_driver_job_definition_event",
"//src/ray/observability:ray_driver_job_lifecycle_event",
"//src/ray/observability:ray_event_recorder_interface",
Expand Down Expand Up @@ -510,6 +511,7 @@ ray_cc_library(
":gcs_worker_manager",
":grpc_service_interfaces",
":grpc_services",
":metrics",
"//src/ray/core_worker_rpc_client:core_worker_client",
"//src/ray/core_worker_rpc_client:core_worker_client_pool",
"//src/ray/gcs/store_client",
Expand Down Expand Up @@ -546,6 +548,7 @@ ray_cc_binary(
visibility = ["//visibility:public"],
deps = [
":gcs_server_lib",
"//src/ray/common:metrics",
"//src/ray/observability:metrics",
"//src/ray/stats:stats_lib",
"//src/ray/util:event",
Expand All @@ -555,3 +558,12 @@ ray_cc_binary(
"@com_github_gflags_gflags//:gflags",
],
)

ray_cc_library(
name = "metrics",
hdrs = ["metrics.h"],
deps = [
"//src/ray/observability:metrics",
"//src/ray/stats:stats_lib",
],
)
Loading