Skip to content

Commit

Permalink
[Core][Telemetry] Add basic core telemetries (#30520)
Browse files Browse the repository at this point in the history
We added some basic Ray core metrics: num of actors created and num of pgs created. To achieve so, this PR introduced 3 changes (potentially we can split them):

Move the usage_lib.py TagKey into usage.proto, and also added @pcmoritz / @thomasdesr as code owner. This makes the future change to telemetry easier to audit; and also allows us use telemetry across different languages.
Introduced a Gcs/C++ version of usage_lib (GcsUsgageReporter). Its implementation is similar to [core] telemetry for oom occurences #30472. This make it easier to add telemetry from GCS.
Added PlacementGroup and Actor telemetry code which collects the number of actors/pgs created during the lifetime of the cluster.
  • Loading branch information
scv119 authored Dec 15, 2022
1 parent 963174c commit 8208e4e
Show file tree
Hide file tree
Showing 16 changed files with 242 additions and 78 deletions.
3 changes: 3 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
/src/ray/stats/metric_defs.h @ericl @scv119 @rkooo567
/src/ray/stats/metric_defs.cc @ericl @scv119 @rkooo567

# Telemetry
/src/ray/protobuf/usage.proto @pcmoritz @thomasdesr

# All C++ code.
# /src/ray @ray-project/ray-core-cpp

Expand Down
2 changes: 2 additions & 0 deletions BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2162,6 +2162,7 @@ cc_library(
":pubsub_lib",
":ray_common",
":redis_store_client",
"//src/ray/protobuf:usage_cc_proto",
],
)

Expand Down Expand Up @@ -2828,6 +2829,7 @@ filegroup(
"//src/ray/protobuf:reporter_py_proto",
"//src/ray/protobuf:runtime_env_agent_py_proto",
"//src/ray/protobuf:runtime_env_common_py_proto",
"//src/ray/protobuf:usage_py_proto",
],
)

Expand Down
63 changes: 4 additions & 59 deletions python/ray/_private/usage/usage_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@
import ray._private.usage.usage_constants as usage_constant
from ray._private import gcs_utils
from ray.experimental.internal_kv import _internal_kv_initialized, _internal_kv_put
from ray.core.generated import usage_pb2

logger = logging.getLogger(__name__)
TagKey = usage_pb2.TagKey

#################
# Internal APIs #
Expand Down Expand Up @@ -241,63 +243,6 @@ def _put_library_usage(library_usage: str):
logger.debug(f"Failed to write a library usage to the home folder, {e}")


class TagKey(Enum):
_TEST1 = auto()
_TEST2 = auto()

# RLlib
# The deep learning framework ("tf", "torch", etc.).
RLLIB_FRAMEWORK = auto()
# The algorithm name (only built-in algorithms).
RLLIB_ALGORITHM = auto()
# The number of workers as a string.
RLLIB_NUM_WORKERS = auto()

# Serve
# The public Python API version ("v1", "v2").
SERVE_API_VERSION = auto()
# The total number of running serve deployments as a string.
SERVE_NUM_DEPLOYMENTS = auto()

# The GCS storage type, which could be memory or redis.
GCS_STORAGE = auto()

# Ray Core State API
# NOTE(rickyxx): Currently only setting "1" for tracking existence of
# invocations only.
CORE_STATE_API_LIST_ACTORS = auto()
CORE_STATE_API_LIST_TASKS = auto()
CORE_STATE_API_LIST_JOBS = auto()
CORE_STATE_API_LIST_NODES = auto()
CORE_STATE_API_LIST_PLACEMENT_GROUPS = auto()
CORE_STATE_API_LIST_WORKERS = auto()
CORE_STATE_API_LIST_OBJECTS = auto()
CORE_STATE_API_LIST_RUNTIME_ENVS = auto()
CORE_STATE_API_LIST_CLUSTER_EVENTS = auto()
CORE_STATE_API_LIST_LOGS = auto()
CORE_STATE_API_GET_LOG = auto()
CORE_STATE_API_SUMMARIZE_TASKS = auto()
CORE_STATE_API_SUMMARIZE_ACTORS = auto()
CORE_STATE_API_SUMMARIZE_OBJECTS = auto()

# Dashboard
# {True, False}
# True if the dashboard page has been ever opened.
DASHBOARD_USED = auto()
# Whether a user is running ray with some third party metrics
# services (Ex: "True", "False")
DASHBOARD_METRICS_PROMETHEUS_ENABLED = auto()
DASHBOARD_METRICS_GRAFANA_ENABLED = auto()

# The count(int) of worker crash with exit type 'system error' since
# the cluster started, emitted from GCS
WORKER_CRASH_SYSTEM_ERROR = auto()

# The count(int) of worker crash with exit type 'out-of-memory' since
# the cluster started, emitted from GCS
WORKER_CRASH_OOM = auto()


def record_extra_usage_tag(key: TagKey, value: str):
"""Record extra kv usage tag.
Expand All @@ -307,7 +252,7 @@ def record_extra_usage_tag(key: TagKey, value: str):
then call this function.
It will make a synchronous call to the internal kv store if the tag is updated.
"""
key = key.name.lower()
key = TagKey.Name(key).lower()
with _recorded_extra_usage_tags_lock:
if _recorded_extra_usage_tags.get(key) == value:
return
Expand Down Expand Up @@ -659,7 +604,7 @@ def get_extra_usage_tags_to_report(gcs_client) -> Dict[str, str]:
except Exception as e:
logger.info(f"Failed to parse extra usage tags env var. Error: {e}")

valid_tag_keys = [tag_key.name.lower() for tag_key in TagKey]
valid_tag_keys = [tag_key.lower() for tag_key in TagKey.keys()]
try:
keys = gcs_client.internal_kv_keys(
usage_constant.EXTRA_USAGE_TAG_PREFIX.encode(),
Expand Down
4 changes: 3 additions & 1 deletion python/ray/tests/test_state_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2938,7 +2938,9 @@ def test_core_state_api_usage_tags(shutdown_only):
TagKey.CORE_STATE_API_SUMMARIZE_OBJECTS,
TagKey.CORE_STATE_API_SUMMARIZE_TASKS,
]
assert set(result.keys()).issuperset({tag.name.lower() for tag in expected_tags})
assert set(result.keys()).issuperset(
{TagKey.Name(tag).lower() for tag in expected_tags}
)


if __name__ == "__main__":
Expand Down
71 changes: 71 additions & 0 deletions python/ray/tests/test_usage_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
)
from ray._private.usage.usage_lib import ClusterConfigToReport, UsageStatsEnabledness
from ray.autoscaler._private.cli_logger import cli_logger
from ray.util.placement_group import (
placement_group,
)

schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
Expand Down Expand Up @@ -183,6 +186,8 @@ def test_get_extra_usage_tags_to_report(
"key": "val",
"_test1": "val1",
"_test2": "val2",
"actor_num_created": "0",
"pg_num_created": "0",
"gcs_storage": gcs_storage_type,
"dashboard_used": "False",
}
Expand All @@ -197,6 +202,8 @@ def test_get_extra_usage_tags_to_report(
"_test2": "val3",
"gcs_storage": gcs_storage_type,
"dashboard_used": "False",
"actor_num_created": "0",
"pg_num_created": "0",
}


Expand Down Expand Up @@ -238,6 +245,62 @@ def oomer():
assert result["worker_crash_oom"] == "1"


def test_actor_stats():
@ray.remote
class Actor:
pass

with ray.init(
_system_config={"metrics_report_interval_ms": 1000},
) as ctx:
gcs_client = gcs_utils.GcsClient(address=ctx.address_info["gcs_address"])

actor = Actor.remote()
wait_for_condition(
lambda: ray_usage_lib.get_extra_usage_tags_to_report(gcs_client).get(
"actor_num_created"
)
== "1",
timeout=5,
)
actor = Actor.remote()
wait_for_condition(
lambda: ray_usage_lib.get_extra_usage_tags_to_report(gcs_client).get(
"actor_num_created"
)
== "2",
timeout=5,
)
del actor


def test_pg_stats():
with ray.init(
num_cpus=3,
_system_config={"metrics_report_interval_ms": 1000},
) as ctx:
gcs_client = gcs_utils.GcsClient(address=ctx.address_info["gcs_address"])

pg = placement_group([{"CPU": 1}], strategy="STRICT_PACK")
ray.get(pg.ready())
wait_for_condition(
lambda: ray_usage_lib.get_extra_usage_tags_to_report(gcs_client).get(
"pg_num_created"
)
== "1",
timeout=5,
)
pg1 = placement_group([{"CPU": 1}], strategy="STRICT_PACK")
ray.get(pg1.ready())
wait_for_condition(
lambda: ray_usage_lib.get_extra_usage_tags_to_report(gcs_client).get(
"pg_num_created"
)
== "2",
timeout=5,
)


def test_usage_stats_enabledness(monkeypatch, tmp_path, reset_usage_stats):
with monkeypatch.context() as m:
m.setenv("RAY_USAGE_STATS_ENABLED", "1")
Expand Down Expand Up @@ -1064,6 +1127,10 @@ def ready(self):
assert payload["total_num_gpus"] is None
assert payload["total_memory_gb"] > 0
assert payload["total_object_store_memory_gb"] > 0
assert int(payload["extra_usage_tags"]["actor_num_created"]) >= 0
assert int(payload["extra_usage_tags"]["pg_num_created"]) >= 0
payload["extra_usage_tags"]["actor_num_created"] = "0"
payload["extra_usage_tags"]["pg_num_created"] = "0"
assert payload["extra_usage_tags"] == {
"extra_k1": "extra_v1",
"_test1": "extra_v2",
Expand All @@ -1072,6 +1139,8 @@ def ready(self):
"dashboard_metrics_prometheus_enabled": "False",
"serve_num_deployments": "1",
"serve_api_version": "v1",
"actor_num_created": "0",
"pg_num_created": "0",
"gcs_storage": gcs_storage_type,
"dashboard_used": "False",
}
Expand Down Expand Up @@ -1413,6 +1482,8 @@ def verify():
"dashboard_metrics_prometheus_enabled": "False",
"gcs_storage": gcs_storage_type,
"dashboard_used": "False",
"actor_num_created": "0",
"pg_num_created": "0",
}
assert num_nodes == 2
return True
Expand Down
8 changes: 4 additions & 4 deletions src/ray/gcs/gcs_client/test/usage_stats_client_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,22 +83,22 @@ class UsageStatsClientTest : public ::testing::Test {
TEST_F(UsageStatsClientTest, TestRecordExtraUsageTag) {
gcs::UsageStatsClient usage_stats_client(
"127.0.0.1:" + std::to_string(gcs_server_->GetPort()), *client_io_service_);
usage_stats_client.RecordExtraUsageTag("key1", "value1");
usage_stats_client.RecordExtraUsageTag(usage::TagKey::_TEST1, "value1");
ASSERT_TRUE(WaitForCondition(
[this]() {
std::string value;
RAY_CHECK_OK(this->gcs_client_->InternalKV().Get(
"usage_stats", "extra_usage_tag_key1", value));
"usage_stats", "extra_usage_tag__test1", value));
return value == "value1";
},
10000));
// Make sure the value is overriden for the same key.
usage_stats_client.RecordExtraUsageTag("key1", "value2");
usage_stats_client.RecordExtraUsageTag(usage::TagKey::_TEST2, "value2");
ASSERT_TRUE(WaitForCondition(
[this]() {
std::string value;
RAY_CHECK_OK(this->gcs_client_->InternalKV().Get(
"usage_stats", "extra_usage_tag_key1", value));
"usage_stats", "extra_usage_tag__test2", value));
return value == "value2";
},
10000));
Expand Down
9 changes: 6 additions & 3 deletions src/ray/gcs/gcs_client/usage_stats_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ UsageStatsClient::UsageStatsClient(const std::string &gcs_address,
RAY_CHECK_OK(gcs_client_->Connect(io_service));
}

void UsageStatsClient::RecordExtraUsageTag(const std::string &key,
const std::string &value) {
void UsageStatsClient::RecordExtraUsageTag(usage::TagKey key, const std::string &value) {
RAY_CHECK_OK(gcs_client_->InternalKV().AsyncInternalKVPut(
kUsageStatsNamespace,
kExtraUsageTagPrefix + key,
kExtraUsageTagPrefix + absl::AsciiStrToLower(usage::TagKey_Name(key)),
value,
/*overwrite=*/true,
[](Status status, boost::optional<int> added_num) {
Expand All @@ -36,5 +35,9 @@ void UsageStatsClient::RecordExtraUsageTag(const std::string &key,
}
}));
}

void UsageStatsClient::RecordExtraUsageCounter(usage::TagKey key, int64_t counter) {
RecordExtraUsageTag(key, std::to_string(counter));
}
} // namespace gcs
} // namespace ray
6 changes: 5 additions & 1 deletion src/ray/gcs/gcs_client/usage_stats_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <memory>

#include "ray/gcs/gcs_client/gcs_client.h"
#include "src/ray/protobuf/usage.pb.h"

namespace ray {
namespace gcs {
Expand All @@ -29,7 +30,10 @@ class UsageStatsClient {
///
/// \param key The tag key which MUST be a registered TagKey in usage_lib.py.
/// \param value The tag value.
void RecordExtraUsageTag(const std::string &key, const std::string &value);
void RecordExtraUsageTag(usage::TagKey key, const std::string &value);

// Report a monotonically increasing counter.
void RecordExtraUsageCounter(usage::TagKey key, int64_t counter);

private:
/// Kee in-sync with the same constants defined in usage_constants.py
Expand Down
5 changes: 5 additions & 0 deletions src/ray/gcs/gcs_server/gcs_actor_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1185,6 +1185,7 @@ void GcsActorManager::OnActorSchedulingFailed(
void GcsActorManager::OnActorCreationSuccess(const std::shared_ptr<GcsActor> &actor,
const rpc::PushTaskReply &reply) {
auto actor_id = actor->GetActorID();
liftime_num_created_actors_++;
RAY_LOG(INFO) << "Actor created successfully, actor id = " << actor_id
<< ", job id = " << actor_id.JobId();
// NOTE: If an actor is deleted immediately after the user creates the actor, reference
Expand Down Expand Up @@ -1592,6 +1593,10 @@ void GcsActorManager::RecordMetrics() const {
ray::stats::STATS_gcs_actors_count.Record(destroyed_actors_.size(), "Destroyed");
ray::stats::STATS_gcs_actors_count.Record(unresolved_actors_.size(), "Unresolved");
ray::stats::STATS_gcs_actors_count.Record(GetPendingActorsCount(), "Pending");
if (usage_stats_client_) {
usage_stats_client_->RecordExtraUsageCounter(usage::TagKey::ACTOR_NUM_CREATED,
liftime_num_created_actors_);
}
actor_state_counter_->FlushOnChangeCallbacks();
}

Expand Down
10 changes: 10 additions & 0 deletions src/ray/gcs/gcs_server/gcs_actor_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "ray/common/id.h"
#include "ray/common/runtime_env_manager.h"
#include "ray/common/task/task_spec.h"
#include "ray/gcs/gcs_client/usage_stats_client.h"
#include "ray/gcs/gcs_server/gcs_actor_scheduler.h"
#include "ray/gcs/gcs_server/gcs_function_manager.h"
#include "ray/gcs/gcs_server/gcs_init_data.h"
Expand Down Expand Up @@ -448,6 +449,10 @@ class GcsActorManager : public rpc::ActorInfoHandler {
return actor_state_counter_->Get(std::make_pair(state, name));
}

void SetUsageStatsClient(UsageStatsClient *usage_stats_client) {
usage_stats_client_ = usage_stats_client;
}

private:
/// A data structure representing an actor's owner.
struct Owner {
Expand Down Expand Up @@ -635,6 +640,8 @@ class GcsActorManager : public rpc::ActorInfoHandler {
RuntimeEnvManager &runtime_env_manager_;
/// Function manager for GC purpose
GcsFunctionManager &function_manager_;

UsageStatsClient *usage_stats_client_;
/// Run a function on a delay. This is useful for guaranteeing data will be
/// accessible for a minimum amount of time.
std::function<void(std::function<void(void)>, boost::posix_time::milliseconds)>
Expand All @@ -644,6 +651,9 @@ class GcsActorManager : public rpc::ActorInfoHandler {
std::shared_ptr<CounterMap<std::pair<rpc::ActorTableData::ActorState, std::string>>>
actor_state_counter_;

/// Total number of successfully created actors in the cluster lifetime.
int64_t liftime_num_created_actors_ = 0;

// Debug info.
enum CountType {
REGISTER_ACTOR_REQUEST = 0,
Expand Down
5 changes: 5 additions & 0 deletions src/ray/gcs/gcs_server/gcs_placement_group_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ void GcsPlacementGroupManager::OnPlacementGroupCreationSuccess(
placement_group_to_create_callbacks_.erase(pg_to_create_iter);
}
}));
lifetime_num_placement_groups_created_++;
io_context_.post([this] { SchedulePendingPlacementGroups(); },
"GcsPlacementGroupManager.SchedulePendingPlacementGroups");
MarkSchedulingDone();
Expand Down Expand Up @@ -930,6 +931,10 @@ void GcsPlacementGroupManager::RecordMetrics() const {
"Registered");
ray::stats::STATS_gcs_placement_group_count.Record(infeasible_placement_groups_.size(),
"Infeasible");
if (usage_stats_client_) {
usage_stats_client_->RecordExtraUsageCounter(usage::TagKey::PG_NUM_CREATED,
lifetime_num_placement_groups_created_);
}
placement_group_state_counter_->FlushOnChangeCallbacks();
}

Expand Down
Loading

0 comments on commit 8208e4e

Please sign in to comment.