Skip to content

Commit 495e092

Browse files
committed
[core][metric/04] redefine STATS_tasks using Metric class
Signed-off-by: Cuong Nguyen <can@anyscale.com>
1 parent 031f9b9 commit 495e092

19 files changed

+176
-58
lines changed

src/ray/core_worker/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ ray_cc_library(
2424
":future_resolver",
2525
":generator_waiter",
2626
":memory_store",
27+
":metrics",
2728
":object_recovery_manager",
2829
":plasma_store_provider",
2930
":profile_event",
3031
":reference_count",
3132
":shutdown_coordinator",
3233
":task_event_buffer",
33-
":metrics",
3434
"//src/ray/common/cgroup:cgroup_context",
3535
"//src/ray/common/cgroup:cgroup_manager",
3636
"//src/ray/common/cgroup:constants",

src/ray/core_worker/core_worker.cc

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,8 @@ JobID GetProcessJobID(const CoreWorkerOptions &options) {
167167
return options.job_id;
168168
}
169169

170-
TaskCounter::TaskCounter(ray::observability::MetricInterface &metric_tasks)
171-
: metric_tasks_(metric_tasks) {
170+
TaskCounter::TaskCounter(ray::observability::MetricInterface &task_by_state_counter)
171+
: task_by_state_counter_(task_by_state_counter) {
172172
counter_.SetOnChangeCallback(
173173
[this](const std::tuple<std::string, TaskStatusType, bool>
174174
&key) ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) mutable {
@@ -183,31 +183,31 @@ TaskCounter::TaskCounter(ray::observability::MetricInterface &metric_tasks)
183183
const auto is_retry_label = is_retry ? "1" : "0";
184184
// RUNNING_IN_RAY_GET/WAIT are sub-states of RUNNING, so we need to subtract
185185
// them out to avoid double-counting.
186-
metric_tasks_.Record(
186+
task_by_state_counter_.Record(
187187
running_total - num_in_get - num_in_wait,
188188
{{"State"sv, rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING)},
189189
{"Name"sv, func_name},
190190
{"IsRetry"sv, is_retry_label},
191191
{"JobId"sv, job_id_},
192192
{"Source"sv, "executor"}});
193193
// Negate the metrics recorded from the submitter process for these tasks.
194-
metric_tasks_.Record(
194+
task_by_state_counter_.Record(
195195
-running_total,
196196
{{"State"sv, rpc::TaskStatus_Name(rpc::TaskStatus::SUBMITTED_TO_WORKER)},
197197
{"Name"sv, func_name},
198198
{"IsRetry"sv, is_retry_label},
199199
{"JobId"sv, job_id_},
200200
{"Source"sv, "executor"}});
201201
// Record sub-state for get.
202-
metric_tasks_.Record(
202+
task_by_state_counter_.Record(
203203
num_in_get,
204204
{{"State"sv, rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING_IN_RAY_GET)},
205205
{"Name"sv, func_name},
206206
{"IsRetry"sv, is_retry_label},
207207
{"JobId"sv, job_id_},
208208
{"Source"sv, "executor"}});
209209
// Record sub-state for wait.
210-
metric_tasks_.Record(
210+
task_by_state_counter_.Record(
211211
num_in_wait,
212212
{{"State"sv, rpc::TaskStatus_Name(rpc::TaskStatus::RUNNING_IN_RAY_WAIT)},
213213
{"Name"sv, func_name},
@@ -321,7 +321,7 @@ CoreWorker::CoreWorker(
321321
instrumented_io_context &task_execution_service,
322322
std::unique_ptr<worker::TaskEventBuffer> task_event_buffer,
323323
uint32_t pid,
324-
ray::observability::MetricInterface &metric_tasks)
324+
ray::observability::MetricInterface &task_by_state_counter)
325325
: options_(std::move(options)),
326326
get_call_site_(RayConfig::instance().record_ref_creation_sites()
327327
? options_.get_lang_stack
@@ -357,15 +357,14 @@ CoreWorker::CoreWorker(
357357
actor_id_(ActorID::Nil()),
358358
task_queue_length_(0),
359359
num_executed_tasks_(0),
360-
task_counter_(metric_tasks),
361360
task_execution_service_(task_execution_service),
362361
exiting_detail_(std::nullopt),
363362
max_direct_call_object_size_(RayConfig::instance().max_direct_call_object_size()),
363+
task_counter_(task_by_state_counter),
364364
task_event_buffer_(std::move(task_event_buffer)),
365365
pid_(pid),
366366
actor_shutdown_callback_(std::move(options_.actor_shutdown_callback)),
367-
runtime_env_json_serialization_cache_(kDefaultSerializationCacheCap),
368-
metric_tasks_(metric_tasks) {
367+
runtime_env_json_serialization_cache_(kDefaultSerializationCacheCap) {
369368
// Initialize task receivers.
370369
if (options_.worker_type == WorkerType::WORKER || options_.is_local_mode) {
371370
RAY_CHECK(options_.task_execution_callback != nullptr);

src/ray/core_worker/core_worker.h

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class TaskCounter {
8080
enum class TaskStatusType { kPending, kRunning, kFinished };
8181

8282
public:
83-
TaskCounter(ray::observability::MetricInterface &metric_tasks);
83+
explicit TaskCounter(ray::observability::MetricInterface &task_by_state_counter);
8484

8585
void BecomeActor(const std::string &actor_name) {
8686
absl::MutexLock l(&mu_);
@@ -138,9 +138,7 @@ class TaskCounter {
138138
// Used for actor state tracking.
139139
std::string actor_name_ ABSL_GUARDED_BY(mu_);
140140
int64_t num_tasks_running_ ABSL_GUARDED_BY(mu_) = 0;
141-
142-
// Metrics
143-
ray::observability::MetricInterface &metric_tasks_;
141+
ray::observability::MetricInterface &task_by_state_counter_;
144142
};
145143

146144
struct TaskToRetry {
@@ -205,7 +203,7 @@ class CoreWorker {
205203
instrumented_io_context &task_execution_service,
206204
std::unique_ptr<worker::TaskEventBuffer> task_event_buffer,
207205
uint32_t pid,
208-
ray::observability::MetricInterface &metric_tasks);
206+
ray::observability::MetricInterface &task_by_state_counter);
209207

210208
CoreWorker(CoreWorker const &) = delete;
211209

@@ -1954,8 +1952,5 @@ class CoreWorker {
19541952
std::mutex gcs_client_node_cache_populated_mutex_;
19551953
std::condition_variable gcs_client_node_cache_populated_cv_;
19561954
bool gcs_client_node_cache_populated_ = false;
1957-
1958-
// Metrics
1959-
ray::observability::MetricInterface &metric_tasks_;
19601955
};
19611956
} // namespace ray::core

src/ray/core_worker/core_worker_process.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -455,7 +455,7 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
455455
return core_worker->core_worker_client_pool_->GetOrConnect(addr.value());
456456
},
457457
gcs_client,
458-
metric_tasks_);
458+
task_by_state_counter_);
459459

460460
auto on_excess_queueing = [this](const ActorID &actor_id, uint64_t num_queued) {
461461
auto timestamp = std::chrono::duration_cast<std::chrono::seconds>(
@@ -660,7 +660,7 @@ std::shared_ptr<CoreWorker> CoreWorkerProcessImpl::CreateCoreWorker(
660660
task_execution_service_,
661661
std::move(task_event_buffer),
662662
pid,
663-
metric_tasks_);
663+
task_by_state_counter_);
664664
return core_worker;
665665
}
666666

src/ray/core_worker/core_worker_process.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -183,8 +183,7 @@ class CoreWorkerProcessImpl {
183183
/// The client to export metrics to the metrics agent.
184184
std::unique_ptr<ray::rpc::MetricsAgentClient> metrics_agent_client_;
185185

186-
/// Metrics
187-
ray::stats::Gauge metric_tasks_{GetTaskMetric()};
186+
ray::stats::Gauge task_by_state_counter_{GetTaskMetric()};
188187
};
189188
} // namespace core
190189
} // namespace ray

src/ray/core_worker/metrics.h

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright 2025 The Ray Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
#pragma once
16+
17+
#include "ray/stats/metric.h"
18+
19+
namespace ray {
20+
namespace core {
21+
22+
inline ray::stats::Gauge GetTaskMetric() {
23+
/// Tracks tasks by state, including pending, running, and finished tasks.
24+
/// This metric may be recorded from multiple components processing the task in Ray,
25+
/// including the submitting core worker, executor core worker, and pull manager.
26+
///
27+
/// To avoid metric collection conflicts between components reporting on the same task,
28+
/// we use the "Source" required label.
29+
return ray::stats::Gauge{
30+
/*name=*/"tasks",
31+
/*description=*/"Current number of tasks currently in a particular state.",
32+
/*unit=*/"",
33+
// State: the task state, as described by rpc::TaskState proto in common.proto.
34+
// Name: the name of the function called (Keep in sync with the
35+
// TASK_OR_ACTOR_NAME_TAG_KEY in
36+
// python/ray/_private/telemetry/metric_cardinality.py) Source: component reporting,
37+
// e.g., "core_worker", "executor", or "pull_manager". IsRetry: whether this task is
38+
// a retry.
39+
/*tag_keys=*/{"State", "Name", "Source", "IsRetry", "JobId"},
40+
};
41+
}
42+
43+
} // namespace core
44+
} // namespace ray

src/ray/core_worker/task_manager.h

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@
3131
#include "ray/core_worker/task_event_buffer.h"
3232
#include "ray/core_worker/task_manager_interface.h"
3333
#include "ray/gcs/gcs_client/gcs_client.h"
34+
#include "ray/observability/metric_interface.h"
3435
#include "ray/stats/metric_defs.h"
3536
#include "ray/util/counter_map.h"
3637
#include "src/ray/protobuf/common.pb.h"
3738
#include "src/ray/protobuf/core_worker.pb.h"
3839
#include "src/ray/protobuf/gcs.pb.h"
39-
#include "ray/observability/metric_interface.h"
4040

4141
namespace ray {
4242
namespace core {
@@ -186,7 +186,7 @@ class TaskManager : public TaskManagerInterface {
186186
std::function<std::shared_ptr<ray::rpc::CoreWorkerClientInterface>(const ActorID &)>
187187
client_factory,
188188
std::shared_ptr<gcs::GcsClient> gcs_client,
189-
ray::observability::MetricInterface &metric_tasks)
189+
ray::observability::MetricInterface &task_by_state_counter)
190190
: in_memory_store_(in_memory_store),
191191
reference_counter_(reference_counter),
192192
put_in_local_plasma_callback_(std::move(put_in_local_plasma_callback)),
@@ -197,16 +197,16 @@ class TaskManager : public TaskManagerInterface {
197197
task_event_buffer_(task_event_buffer),
198198
get_actor_rpc_client_callback_(std::move(client_factory)),
199199
gcs_client_(std::move(gcs_client)),
200-
metric_tasks_(metric_tasks) {
200+
task_by_state_counter_(task_by_state_counter) {
201201
task_counter_.SetOnChangeCallback(
202202
[this](const std::tuple<std::string, rpc::TaskStatus, bool> &key)
203203
ABSL_EXCLUSIVE_LOCKS_REQUIRED(&mu_) {
204-
metric_tasks_.Record(
204+
task_by_state_counter_.Record(
205205
task_counter_.Get(key),
206206
{{"State"sv, rpc::TaskStatus_Name(std::get<1>(key))},
207207
{"Name"sv, std::get<0>(key)},
208208
{"IsRetry"sv, std::get<2>(key) ? "1" : "0"},
209-
{"Source"sv, "owner"sv}});
209+
{"Source"sv, "owner"}});
210210
});
211211
reference_counter_.SetReleaseLineageCallback(
212212
[this](const ObjectID &object_id, std::vector<ObjectID> *ids_to_release) {
@@ -803,7 +803,7 @@ class TaskManager : public TaskManagerInterface {
803803
std::shared_ptr<gcs::GcsClient> gcs_client_;
804804

805805
// Metrics
806-
ray::observability::MetricInterface &metric_tasks_;
806+
ray::observability::MetricInterface &task_by_state_counter_;
807807

808808
friend class TaskManagerTest;
809809
};

src/ray/core_worker/tests/BUILD.bazel

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ ray_cc_test(
9797
"//src/ray/core_worker:task_event_buffer",
9898
"//src/ray/core_worker:task_manager",
9999
"//src/ray/gcs/gcs_client:gcs_client_lib",
100+
"//src/ray/observability:fake_metric",
100101
"@com_google_googletest//:gtest",
101102
"@com_google_googletest//:gtest_main",
102103
],
@@ -251,6 +252,7 @@ ray_cc_test(
251252
"//src/ray/core_worker:memory_store",
252253
"//src/ray/core_worker:reference_count",
253254
"//src/ray/ipc:fake_raylet_ipc_client",
255+
"//src/ray/observability:fake_metric",
254256
"@com_google_googletest//:gtest",
255257
"@com_google_googletest//:gtest_main",
256258
],

src/ray/core_worker/tests/core_worker_test.cc

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include "ray/core_worker/task_submission/actor_task_submitter.h"
4141
#include "ray/core_worker/task_submission/normal_task_submitter.h"
4242
#include "ray/ipc/fake_raylet_ipc_client.h"
43+
#include "ray/observability/fake_metric.h"
4344
#include "ray/rpc/worker/core_worker_client_pool.h"
4445

4546
namespace ray {
@@ -159,7 +160,8 @@ class CoreWorkerHandleGetObjectStatusTest : public ::testing::Test {
159160
[](const ActorID &actor_id) {
160161
return std::make_shared<rpc::CoreWorkerClientInterface>();
161162
},
162-
mock_gcs_client);
163+
mock_gcs_client,
164+
fake_task_by_state_counter_);
163165

164166
auto object_recovery_manager = std::make_unique<ObjectRecoveryManager>(
165167
rpc_address,
@@ -243,7 +245,8 @@ class CoreWorkerHandleGetObjectStatusTest : public ::testing::Test {
243245
std::move(actor_manager),
244246
task_execution_service_,
245247
std::move(task_event_buffer),
246-
getpid());
248+
getpid(),
249+
fake_task_by_state_counter_);
247250
}
248251

249252
protected:
@@ -260,6 +263,7 @@ class CoreWorkerHandleGetObjectStatusTest : public ::testing::Test {
260263
std::shared_ptr<ReferenceCounter> reference_counter_;
261264
std::shared_ptr<CoreWorkerMemoryStore> memory_store_;
262265
std::shared_ptr<CoreWorker> core_worker_;
266+
ray::observability::FakeMetric fake_task_by_state_counter_;
263267
};
264268

265269
std::shared_ptr<RayObject> MakeRayObject(const std::string &data_str,

src/ray/core_worker/tests/task_manager_test.cc

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "ray/core_worker/reference_count.h"
3232
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
3333
#include "ray/core_worker/task_event_buffer.h"
34+
#include "ray/observability/fake_metric.h"
3435

3536
namespace ray {
3637
namespace core {
@@ -181,7 +182,8 @@ class TaskManagerTest : public ::testing::Test {
181182
-> std::shared_ptr<ray::rpc::CoreWorkerClientInterface> {
182183
return nullptr;
183184
},
184-
mock_gcs_client_) {}
185+
mock_gcs_client_,
186+
fake_task_by_state_counter_) {}
185187

186188
virtual void TearDown() { AssertNoLeaks(); }
187189

@@ -228,6 +230,7 @@ class TaskManagerTest : public ::testing::Test {
228230
uint32_t last_delay_ms_ = 0;
229231
bool last_object_recovery_ = false;
230232
std::unordered_set<ObjectID> stored_in_plasma;
233+
ray::observability::FakeMetric fake_task_by_state_counter_;
231234
};
232235

233236
class TaskManagerLineageTest : public TaskManagerTest {
@@ -1396,7 +1399,8 @@ TEST_F(TaskManagerTest, PlasmaPut_ObjectStoreFull_FailsTaskAndWritesError) {
13961399
[](const ActorID &) -> std::shared_ptr<ray::rpc::CoreWorkerClientInterface> {
13971400
return nullptr;
13981401
},
1399-
mock_gcs_client_);
1402+
mock_gcs_client_,
1403+
fake_task_by_state_counter_);
14001404

14011405
rpc::Address caller_address;
14021406
auto spec = CreateTaskHelper(1, {});
@@ -1459,7 +1463,8 @@ TEST_F(TaskManagerTest, PlasmaPut_TransientFull_RetriesThenSucceeds) {
14591463
[](const ActorID &) -> std::shared_ptr<ray::rpc::CoreWorkerClientInterface> {
14601464
return nullptr;
14611465
},
1462-
mock_gcs_client_);
1466+
mock_gcs_client_,
1467+
fake_task_by_state_counter_);
14631468

14641469
rpc::Address caller_address;
14651470
auto spec = CreateTaskHelper(1, {});
@@ -1520,7 +1525,8 @@ TEST_F(TaskManagerTest, DynamicReturn_PlasmaPutFailure_FailsTaskImmediately) {
15201525
[](const ActorID &) -> std::shared_ptr<ray::rpc::CoreWorkerClientInterface> {
15211526
return nullptr;
15221527
},
1523-
mock_gcs_client_);
1528+
mock_gcs_client_,
1529+
fake_task_by_state_counter_);
15241530

15251531
auto spec = CreateTaskHelper(1, {}, /*dynamic_returns=*/true);
15261532
dyn_mgr.AddPendingTask(addr_, spec, "", /*num_retries=*/0);

0 commit comments

Comments
 (0)