Skip to content

Commit aa8a215

Browse files
committed
[core][metric/04] redefine STATS_tasks using Metric class
Signed-off-by: Cuong Nguyen <can@anyscale.com>
1 parent a72c161 commit aa8a215

20 files changed

+258
-58
lines changed

src/ray/core_worker/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,13 @@ ray_cc_library(
2828
":generator_waiter",
2929
":grpc_service",
3030
":memory_store",
31+
":metrics",
3132
":object_recovery_manager",
3233
":plasma_store_provider",
3334
":profile_event",
3435
":reference_count",
3536
":shutdown_coordinator",
3637
":task_event_buffer",
37-
":metrics",
3838
"//src/ray/common:protobuf_utils",
3939
"//src/ray/common/cgroup:cgroup_context",
4040
"//src/ray/common/cgroup:cgroup_manager",

src/ray/core_worker/core_worker.cc

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -168,8 +168,8 @@ JobID GetProcessJobID(const CoreWorkerOptions &options) {
168168
return options.job_id;
169169
}
170170

171-
TaskCounter::TaskCounter(ray::observability::MetricInterface &metric_tasks)
172-
: metric_tasks_(metric_tasks) {
171+
TaskCounter::TaskCounter(ray::observability::MetricInterface &task_by_state_counter)
172+
: task_by_state_counter_(task_by_state_counter) {
173173
counter_.SetOnChangeCallback(
174174
[this](const std::tuple<std::string, TaskStatusType, bool>
175175
&key) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) mutable {
@@ -184,31 +184,31 @@ TaskCounter::TaskCounter(ray::observability::MetricInterface &metric_tasks)
184184
const auto is_retry_label = is_retry ? "1" : "0";
185185
// RUNNING_IN_RAY_GET/WAIT are sub-states of RUNNING, so we need to subtract
186186
// them out to avoid double-counting.
187-
metric_tasks_.Record(
187+
task_by_state_counter_.Record(
188188
running_total - num_in_get - num_in_wait,
189189
{{"State"sv, rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING)},
190190
{"Name"sv, func_name},
191191
{"IsRetry"sv, is_retry_label},
192192
{"JobId"sv, job_id_},
193193
{"Source"sv, "executor"}});
194194
// Negate the metrics recorded from the submitter process for these tasks.
195-
metric_tasks_.Record(
195+
task_by_state_counter_.Record(
196196
-running_total,
197197
{{"State"sv, rpc::TaskStatus_Name(rpc::TaskStatus::SUBMITTED_TO_WORKER)},
198198
{"Name"sv, func_name},
199199
{"IsRetry"sv, is_retry_label},
200200
{"JobId"sv, job_id_},
201201
{"Source"sv, "executor"}});
202202
// Record sub-state for get.
203-
metric_tasks_.Record(
203+
task_by_state_counter_.Record(
204204
num_in_get,
205205
{{"State"sv, rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING_IN_RAY_GET)},
206206
{"Name"sv, func_name},
207207
{"IsRetry"sv, is_retry_label},
208208
{"JobId"sv, job_id_},
209209
{"Source"sv, "executor"}});
210210
// Record sub-state for wait.
211-
metric_tasks_.Record(
211+
task_by_state_counter_.Record(
212212
num_in_wait,
213213
{{"State"sv, rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING_IN_RAY_WAIT)},
214214
{"Name"sv, func_name},
@@ -322,7 +322,7 @@ CoreWorker::CoreWorker(
322322
instrumented_io_context &task_execution_service,
323323
std::unique_ptr<worker::TaskEventBuffer> task_event_buffer,
324324
uint32_t pid,
325-
ray::observability::MetricInterface &metric_tasks)
325+
ray::observability::MetricInterface &task_by_state_counter)
326326
: options_(std::move(options)),
327327
get_call_site_(RayConfig::instance().record_ref_creation_sites()
328328
? options_.get_lang_stack
@@ -358,15 +358,14 @@ CoreWorker::CoreWorker(
358358
actor_id_(ActorID::Nil()),
359359
task_queue_length_(0),
360360
num_executed_tasks_(0),
361-
task_counter_(metric_tasks),
362361
task_execution_service_(task_execution_service),
363362
exiting_detail_(std::nullopt),
364363
max_direct_call_object_size_(RayConfig::instance().max_direct_call_object_size()),
364+
task_counter_(task_by_state_counter),
365365
task_event_buffer_(std::move(task_event_buffer)),
366366
pid_(pid),
367367
actor_shutdown_callback_(std::move(options_.actor_shutdown_callback)),
368-
runtime_env_json_serialization_cache_(kDefaultSerializationCacheCap),
369-
metric_tasks_(metric_tasks) {
368+
runtime_env_json_serialization_cache_(kDefaultSerializationCacheCap) {
370369
// Initialize task receivers.
371370
if (options_.worker_type == WorkerType::WORKER || options_.is_local_mode) {
372371
RAY_CHECK(options_.task_execution_callback != nullptr);

src/ray/core_worker/core_worker.h

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class TaskCounter {
6969
enum class TaskStatusType { kPending, kRunning, kFinished };
7070

7171
public:
72-
TaskCounter(ray::observability::MetricInterface &metric_tasks);
72+
explicit TaskCounter(ray::observability::MetricInterface &task_by_state_counter);
7373

7474
void BecomeActor(const std::string &actor_name) {
7575
absl::MutexLock l(&mu_);
@@ -128,8 +128,8 @@ class TaskCounter {
128128
std::string actor_name_ ABSL_GUARDED_BY(mu_);
129129
int64_t num_tasks_running_ ABSL_GUARDED_BY(mu_) = 0;
130130

131-
// Metrics
132-
ray::observability::MetricInterface &metric_tasks_;
131+
// Metric to track the number of tasks by state. Tags: State, Name, IsRetry, Source.
132+
ray::observability::MetricInterface &task_by_state_counter_;
133133
};
134134

135135
struct TaskToRetry {
@@ -194,7 +194,7 @@ class CoreWorker {
194194
instrumented_io_context &task_execution_service,
195195
std::unique_ptr<worker::TaskEventBuffer> task_event_buffer,
196196
uint32_t pid,
197-
ray::observability::MetricInterface &metric_tasks);
197+
ray::observability::MetricInterface &task_by_state_counter);
198198

199199
CoreWorker(CoreWorker const &) = delete;
200200

@@ -1892,6 +1892,8 @@ class CoreWorker {
18921892

18931893
int64_t max_direct_call_object_size_;
18941894

1895+
friend class CoreWorkerTest;
1896+
18951897
TaskCounter task_counter_;
18961898

18971899
/// Used to guarantee that submitting actor task is thread safe.
@@ -1941,8 +1943,5 @@ class CoreWorker {
19411943
std::mutex gcs_client_node_cache_populated_mutex_;
19421944
std::condition_variable gcs_client_node_cache_populated_cv_;
19431945
bool gcs_client_node_cache_populated_ = false;
1944-
1945-
// Metrics
1946-
ray::observability::MetricInterface &metric_tasks_;
19471946
};
19481947
} // namespace ray::core

src/ray/core_worker/core_worker_process.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
457457
return core_worker->core_worker_client_pool_->GetOrConnect(addr.value());
458458
},
459459
gcs_client,
460-
metric_tasks_);
460+
task_by_state_counter_);
461461

462462
auto on_excess_queueing = [this](const ActorID &actor_id, uint64_t num_queued) {
463463
auto timestamp = std::chrono::duration_cast<std::chrono::seconds>(
@@ -662,7 +662,7 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
662662
task_execution_service_,
663663
std::move(task_event_buffer),
664664
pid,
665-
metric_tasks_);
665+
task_by_state_counter_);
666666
return core_worker;
667667
}
668668

src/ray/core_worker/core_worker_process.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,7 @@ class CoreWorkerProcessImpl {
184184
/// The client to export metrics to the metrics agent.
185185
std::unique_ptr<ray::rpc::MetricsAgentClient> metrics_agent_client_;
186186

187-
/// Metrics
188-
ray::stats::Gauge metric_tasks_{GetTaskMetric()};
187+
ray::stats::Gauge task_by_state_counter_{GetTaskMetric()};
189188
};
190189
} // namespace core
191190
} // namespace ray

src/ray/core_worker/metrics.h

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright 2025 The Ray Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include "ray/stats/metric.h"
18+
19+
namespace ray {
20+
namespace core {
21+
22+
inline ray::stats::Gauge GetTaskMetric() {
23+
/// Tracks tasks by state, including pending, running, and finished tasks.
24+
/// This metric may be recorded from multiple components processing the task in Ray,
25+
/// including the submitting core worker, executor core worker, and pull manager.
26+
///
27+
/// To avoid metric collection conflicts between components reporting on the same task,
28+
/// we use the "Source" required label.
29+
return ray::stats::Gauge{
30+
/*name=*/"tasks",
31+
/*description=*/"Current number of tasks currently in a particular state.",
32+
/*unit=*/"",
33+
// State: the task state, as described by rpc::TaskState proto in common.proto.
34+
// Name: the name of the function called (Keep in sync with the
35+
// TASK_OR_ACTOR_NAME_TAG_KEY in
36+
// python/ray/_private/telemetry/metric_cardinality.py) Source: component reporting,
37+
// e.g., "core_worker", "executor", or "pull_manager". IsRetry: whether this task is
38+
// a retry.
39+
/*tag_keys=*/{"State", "Name", "Source", "IsRetry", "JobId"},
40+
};
41+
}
42+
43+
} // namespace core
44+
} // namespace ray

src/ray/core_worker/task_manager.h

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@
3131
#include "ray/core_worker/task_event_buffer.h"
3232
#include "ray/core_worker/task_manager_interface.h"
3333
#include "ray/gcs/gcs_client/gcs_client.h"
34+
#include "ray/observability/metric_interface.h"
3435
#include "ray/stats/metric_defs.h"
3536
#include "ray/util/counter_map.h"
3637
#include "src/ray/protobuf/common.pb.h"
3738
#include "src/ray/protobuf/core_worker.pb.h"
3839
#include "src/ray/protobuf/gcs.pb.h"
39-
#include "ray/observability/metric_interface.h"
4040

4141
namespace ray {
4242
namespace core {
@@ -186,7 +186,7 @@ class TaskManager : public TaskManagerInterface {
186186
std::function<std::shared_ptr<ray::rpc::CoreWorkerClientInterface>(const ActorID &)>
187187
client_factory,
188188
std::shared_ptr<gcs::GcsClient> gcs_client,
189-
ray::observability::MetricInterface &metric_tasks)
189+
ray::observability::MetricInterface &task_by_state_counter)
190190
: in_memory_store_(in_memory_store),
191191
reference_counter_(reference_counter),
192192
put_in_local_plasma_callback_(std::move(put_in_local_plasma_callback)),
@@ -197,16 +197,16 @@ class TaskManager : public TaskManagerInterface {
197197
task_event_buffer_(task_event_buffer),
198198
get_actor_rpc_client_callback_(std::move(client_factory)),
199199
gcs_client_(std::move(gcs_client)),
200-
metric_tasks_(metric_tasks) {
200+
task_by_state_counter_(task_by_state_counter) {
201201
task_counter_.SetOnChangeCallback(
202202
[this](const std::tuple<std::string, rpc::TaskStatus, bool> &key)
203203
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) {
204-
metric_tasks_.Record(
204+
task_by_state_counter_.Record(
205205
task_counter_.Get(key),
206206
{{"State"sv, rpc::TaskStatus_Name(std::get<1>(key))},
207207
{"Name"sv, std::get<0>(key)},
208208
{"IsRetry"sv, std::get<2>(key) ? "1" : "0"},
209-
{"Source"sv, "owner"sv}});
209+
{"Source"sv, "owner"}});
210210
});
211211
reference_counter_.SetReleaseLineageCallback(
212212
[this](const ObjectID &object_id, std::vector<ObjectID> *ids_to_release) {
@@ -802,8 +802,8 @@ class TaskManager : public TaskManagerInterface {
802802

803803
std::shared_ptr<gcs::GcsClient> gcs_client_;
804804

805-
// Metrics
806-
ray::observability::MetricInterface &metric_tasks_;
805+
// Metric to track the number of tasks by state. Tags: State, Name, IsRetry, Source.
806+
observability::MetricInterface &task_by_state_counter_;
807807

808808
friend class TaskManagerTest;
809809
};

src/ray/core_worker/task_submission/actor_task_submitter.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -440,6 +440,8 @@ class ActorTaskSubmitter : public ActorTaskSubmitterInterface {
440440
instrumented_io_context &io_service_;
441441

442442
std::shared_ptr<ReferenceCounterInterface> reference_counter_;
443+
444+
friend class CoreWorkerTest;
443445
};
444446

445447
} // namespace core

src/ray/core_worker/tests/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ ray_cc_test(
9898
"//src/ray/core_worker:task_event_buffer",
9999
"//src/ray/core_worker:task_manager",
100100
"//src/ray/gcs/gcs_client:gcs_client_lib",
101+
"//src/ray/observability:fake_metric",
101102
"@com_google_googletest//:gtest",
102103
"@com_google_googletest//:gtest_main",
103104
],
@@ -253,6 +254,7 @@ ray_cc_test(
253254
"//src/ray/core_worker:memory_store",
254255
"//src/ray/core_worker:reference_count",
255256
"//src/ray/ipc:fake_raylet_ipc_client",
257+
"//src/ray/observability:fake_metric",
256258
"@com_google_googletest//:gtest",
257259
"@com_google_googletest//:gtest_main",
258260
],

src/ray/core_worker/tests/core_worker_test.cc

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "ray/core_worker/task_submission/actor_task_submitter.h"
4242
#include "ray/core_worker/task_submission/normal_task_submitter.h"
4343
#include "ray/ipc/fake_raylet_ipc_client.h"
44+
#include "ray/observability/fake_metric.h"
4445
#include "ray/rpc/worker/core_worker_client_pool.h"
4546

4647
namespace ray {
@@ -160,7 +161,8 @@ class CoreWorkerTest : public ::testing::Test {
160161
[](const ActorID &actor_id) {
161162
return std::make_shared<rpc::CoreWorkerClientInterface>();
162163
},
163-
mock_gcs_client);
164+
mock_gcs_client,
165+
fake_task_by_state_counter_);
164166

165167
auto object_recovery_manager = std::make_unique<ObjectRecoveryManager>(
166168
rpc_address_,
@@ -245,7 +247,8 @@ class CoreWorkerTest : public ::testing::Test {
245247
std::move(actor_manager),
246248
task_execution_service_,
247249
std::move(task_event_buffer),
248-
getpid());
250+
getpid(),
251+
fake_task_by_state_counter_);
249252
}
250253

251254
protected:
@@ -263,6 +266,7 @@ class CoreWorkerTest : public ::testing::Test {
263266
ActorTaskSubmitter *actor_task_submitter_;
264267
std::shared_ptr<TaskManager> task_manager_;
265268
std::shared_ptr<CoreWorker> core_worker_;
269+
ray::observability::FakeMetric fake_task_by_state_counter_;
266270
};
267271

268272
std::shared_ptr<RayObject> MakeRayObject(const std::string &data_str,
@@ -278,6 +282,22 @@ std::shared_ptr<RayObject> MakeRayObject(const std::string &data_str,
278282
return std::make_shared<RayObject>(data, metadata, std::vector<rpc::ObjectReference>());
279283
}
280284

285+
TEST_F(CoreWorkerTest, RecordMetrics) {
286+
std::vector<std::shared_ptr<RayObject>> results;
287+
auto status = core_worker_->Get({}, -1, results);
288+
ASSERT_TRUE(status.ok());
289+
// disconnect to trigger metric recording
290+
core_worker_->Disconnect(rpc::WorkerExitType::SYSTEM_ERROR, "test", nullptr);
291+
auto tag_to_value = fake_task_by_state_counter_.GetTagToValue();
292+
// 4 states: RUNNING, SUBMITTED_TO_WORKER, RUNNING_IN_RAY_GET and RUNNING_IN_RAY_WAIT
293+
ASSERT_EQ(tag_to_value.size(), 4);
294+
for (auto &[key, value] : tag_to_value) {
295+
ASSERT_EQ(key.at("Name"), "Unknown task");
296+
ASSERT_EQ(key.at("Source"), "executor");
297+
ASSERT_EQ(key.at("IsRetry"), "0");
298+
}
299+
}
300+
281301
TEST_F(CoreWorkerTest, HandleGetObjectStatusIdempotency) {
282302
auto object_id = ObjectID::FromRandom();
283303
auto ray_object = MakeRayObject("test_data", "meta");

0 commit comments

Comments
 (0)