Skip to content

Commit

Permalink
Collect library usage (#24312)
Browse files Browse the repository at this point in the history
Collect which libraries are used for usage stats purpose.
  • Loading branch information
jjyao authored Apr 30, 2022
1 parent 87eaf55 commit cfc192e
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 2 deletions.
4 changes: 4 additions & 0 deletions python/ray/_private/usage/usage_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,7 @@
"Enable usage stats collection? "
"This prompt will auto-proceed in 10 seconds to avoid blocking cluster startup."
)

LIBRARY_USAGE_PREFIX = "library_usage_"

USAGE_STATS_NAMESPACE = "usage_stats"
70 changes: 70 additions & 0 deletions python/ray/_private/usage/usage_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@

import ray.ray_constants as ray_constants
import ray._private.usage.usage_constants as usage_constant
from ray.experimental.internal_kv import (
_internal_kv_put,
_internal_kv_initialized,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -106,6 +110,7 @@ class UsageStatsToReport:
total_num_gpus: Optional[int]
total_memory_gb: Optional[float]
total_object_store_memory_gb: Optional[float]
library_usages: Optional[List[str]]
# The total number of successful reports for the lifetime of the cluster.
total_success: int
# The total number of failed reports for the lifetime of the cluster.
Expand Down Expand Up @@ -135,6 +140,48 @@ class UsageStatsEnabledness(Enum):
ENABLED_BY_DEFAULT = auto()


_recorded_library_usages = set()


def _put_library_usage(library_usage: str):
assert _internal_kv_initialized()
try:
_internal_kv_put(
f"{usage_constant.LIBRARY_USAGE_PREFIX}{library_usage}",
"",
namespace=usage_constant.USAGE_STATS_NAMESPACE,
)
except Exception as e:
logger.debug(f"Failed to put library usage, {e}")


def record_library_usage(library_usage: str):
"""Record library usage (e.g. which library is used)"""
if library_usage in _recorded_library_usages:
return
_recorded_library_usages.add(library_usage)

if not _internal_kv_initialized():
# This happens if the library is imported before ray.init
return

# Only report library usage from driver to reduce
# the load to kv store.
if ray.worker.global_worker.mode == ray.SCRIPT_MODE:
_put_library_usage(library_usage)


def _put_pre_init_library_usages():
assert _internal_kv_initialized()
if ray.worker.global_worker.mode != ray.SCRIPT_MODE:
return
for library_usage in _recorded_library_usages:
_put_library_usage(library_usage)


ray.worker._post_init_hooks.append(_put_pre_init_library_usages)


def _usage_stats_report_url():
# The usage collection server URL.
# The environment variable is testing-purpose only.
Expand Down Expand Up @@ -318,6 +365,24 @@ def put_cluster_metadata(gcs_client, num_retries) -> None:
return metadata


def get_library_usages_to_report(gcs_client, num_retries) -> List[str]:
try:
result = []
library_usages = ray._private.utils.internal_kv_list_with_retry(
gcs_client,
usage_constant.LIBRARY_USAGE_PREFIX,
namespace=usage_constant.USAGE_STATS_NAMESPACE,
num_retries=num_retries,
)
for library_usage in library_usages:
library_usage = library_usage.decode("utf-8")
result.append(library_usage[len(usage_constant.LIBRARY_USAGE_PREFIX) :])
return result
except Exception as e:
logger.info(f"Failed to get library usages to report {e}")
return []


def get_cluster_status_to_report(gcs_client, num_retries) -> ClusterStatusToReport:
"""Get the current status of this cluster.
Expand Down Expand Up @@ -498,6 +563,10 @@ def generate_report_data(
ray.experimental.internal_kv.internal_kv_get_gcs_client(),
num_retries=20,
)
library_usages = get_library_usages_to_report(
ray.experimental.internal_kv.internal_kv_get_gcs_client(),
num_retries=20,
)
data = UsageStatsToReport(
ray_version=cluster_metadata["ray_version"],
python_version=cluster_metadata["python_version"],
Expand All @@ -517,6 +586,7 @@ def generate_report_data(
total_num_gpus=cluster_status_to_report.total_num_gpus,
total_memory_gb=cluster_status_to_report.total_memory_gb,
total_object_store_memory_gb=cluster_status_to_report.total_object_store_memory_gb, # noqa: E501
library_usages=library_usages,
total_success=total_success,
total_failed=total_failed,
seq_number=seq_number,
Expand Down
40 changes: 40 additions & 0 deletions python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,42 @@ def check_dashboard_dependencies_installed() -> bool:
return False


def internal_kv_list_with_retry(gcs_client, prefix, namespace, num_retries=20):
result = None
if isinstance(prefix, str):
prefix = prefix.encode()
if isinstance(namespace, str):
namespace = namespace.encode()
for _ in range(num_retries):
try:
result = gcs_client.internal_kv_keys(prefix, namespace)
except Exception as e:
if isinstance(e, grpc.RpcError) and e.code() in (
grpc.StatusCode.UNAVAILABLE,
grpc.StatusCode.UNKNOWN,
):
logger.warning(
f"Unable to connect to GCS at {gcs_client.address}. "
"Check that (1) Ray GCS with matching version started "
"successfully at the specified address, and (2) there is "
"no firewall setting preventing access."
)
else:
logger.exception("Internal KV List failed")
result = None

if result is not None:
break
else:
logger.debug(f"Fetched {prefix}=None from KV. Retrying.")
time.sleep(2)
if result is None:
raise RuntimeError(
f"Could not list '{prefix}' from GCS. Did GCS start successfully?"
)
return result


def internal_kv_get_with_retry(gcs_client, key, namespace, num_retries=20):
result = None
if isinstance(key, str):
Expand Down Expand Up @@ -1228,6 +1264,10 @@ def internal_kv_get_with_retry(gcs_client, key, namespace, num_retries=20):
def internal_kv_put_with_retry(gcs_client, key, value, namespace, num_retries=20):
if isinstance(key, str):
key = key.encode()
if isinstance(value, str):
value = value.encode()
if isinstance(namespace, str):
namespace = namespace.encode()
error = None
for _ in range(num_retries):
try:
Expand Down
3 changes: 3 additions & 0 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
from ray.data.impl.block_list import BlockList
from ray.data.impl.lazy_block_list import LazyBlockList
from ray.data.impl.delegating_block_builder import DelegatingBlockBuilder
from ray._private.usage import usage_lib

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -128,6 +129,8 @@ def __init__(
read methods to construct a dataset.
"""
assert isinstance(plan, ExecutionPlan)
usage_lib.record_library_usage("dataset")

self._plan = plan
self._uuid = uuid4().hex
self._epoch = epoch
Expand Down
5 changes: 4 additions & 1 deletion python/ray/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from ray._private.gcs_utils import GcsClient
from ray._private.resource_spec import ResourceSpec
from ray._private.utils import try_to_create_directory, try_to_symlink, open_log
import ray._private.usage.usage_lib as ray_usage_lib

# Logger for this module. It should be configured at the entry point
# into the program using Ray. Ray configures it by default automatically
Expand Down Expand Up @@ -347,6 +346,8 @@ def check_version_info(self):
Raises:
Exception: An exception is raised if there is a version mismatch.
"""
import ray._private.usage.usage_lib as ray_usage_lib

cluster_metadata = ray_usage_lib.get_cluster_metadata(
self.get_gcs_client(), num_retries=NUM_REDIS_GET_RETRIES
)
Expand Down Expand Up @@ -1054,6 +1055,8 @@ def _write_cluster_info_to_kv(self):
Check `usage_stats_head.py` for more details.
"""
# Make sure the cluster metadata wasn't reported before.
import ray._private.usage.usage_lib as ray_usage_lib

ray_usage_lib.put_cluster_metadata(self.get_gcs_client(), NUM_REDIS_GET_RETRIES)

def start_head_processes(self):
Expand Down
3 changes: 3 additions & 0 deletions python/ray/serve/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
get_internal_replica_context,
ReplicaContext,
)
from ray._private.usage import usage_lib

logger = logging.getLogger(__file__)

Expand Down Expand Up @@ -108,6 +109,8 @@ def start(
dedicated_cpu (bool): Whether to reserve a CPU core for the internal
Serve controller actor. Defaults to False.
"""
usage_lib.record_library_usage("serve")

http_deprecated_args = ["http_host", "http_port", "http_middlewares"]
for key in http_deprecated_args:
if key in kwargs:
Expand Down
45 changes: 45 additions & 0 deletions python/ray/tests/test_usage_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
"total_num_gpus": {"type": ["null", "integer"]},
"total_memory_gb": {"type": ["null", "number"]},
"total_object_store_memory_gb": {"type": ["null", "number"]},
"library_usages": {
"type": ["null", "array"],
"items": {"type": "string"},
},
"total_success": {"type": "integer"},
"total_failed": {"type": "integer"},
"seq_number": {"type": "integer"},
Expand Down Expand Up @@ -281,6 +285,35 @@ def test_usage_lib_cluster_metadata_generation(monkeypatch, ray_start_cluster):
)


def test_library_usages():
if os.environ.get("RAY_MINIMAL") == "1":
# Doesn't work with minimal installation
# since we import serve.
return

ray_usage_lib._recorded_library_usages.clear()
ray_usage_lib.record_library_usage("pre_init")
ray.init()
ray_usage_lib.record_library_usage("post_init")
ray.workflow.init()
ray.data.range(10)
from ray import serve

serve.start()
library_usages = ray_usage_lib.get_library_usages_to_report(
ray.experimental.internal_kv.internal_kv_get_gcs_client(), num_retries=20
)
assert set(library_usages) == {
"pre_init",
"post_init",
"dataset",
"workflow",
"serve",
}
serve.shutdown()
ray.shutdown()


def test_usage_lib_cluster_metadata_generation_usage_disabled(
monkeypatch, shutdown_only
):
Expand Down Expand Up @@ -531,6 +564,12 @@ def test_usage_report_e2e(monkeypatch, ray_start_cluster, tmp_path):
m.setenv("RAY_USAGE_STATS_REPORT_INTERVAL_S", "1")
cluster = ray_start_cluster
cluster.add_node(num_cpus=3)
ray_usage_lib._recorded_library_usages.clear()
if os.environ.get("RAY_MINIMAL") != "1":
from ray import tune # noqa: F401
from ray.rllib.agents.ppo import PPOTrainer # noqa: F401
from ray import train # noqa: F401

ray.init(address=cluster.address)

@ray.remote(num_cpus=0)
Expand All @@ -556,6 +595,8 @@ def get_payload(self):
@ray.remote(num_cpus=0, runtime_env={"pip": ["ray[serve]"]})
class ServeInitator:
def __init__(self):
# This is used in the worker process
# so it won't be tracked as library usage.
from ray import serve

serve.start()
Expand Down Expand Up @@ -605,6 +646,10 @@ def ready(self):
assert payload["total_num_gpus"] is None
assert payload["total_memory_gb"] > 0
assert payload["total_object_store_memory_gb"] > 0
if os.environ.get("RAY_MINIMAL") == "1":
assert set(payload["library_usages"]) == set()
else:
assert set(payload["library_usages"]) == {"rllib", "train", "tune"}
validate(instance=payload, schema=schema)
"""
Verify the usage_stats.json is updated.
Expand Down
4 changes: 4 additions & 0 deletions python/ray/train/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
)
from ray.train.trainer import Trainer, TrainingIterator

from ray._private.usage import usage_lib

usage_lib.record_library_usage("train")

__all__ = [
"BackendConfig",
"CheckpointStrategy",
Expand Down
4 changes: 4 additions & 0 deletions python/ray/tune/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@
from ray.tune.utils.placement_groups import PlacementGroupFactory
from ray.tune.utils.trainable import with_parameters

from ray._private.usage import usage_lib

usage_lib.record_library_usage("tune")

__all__ = [
"Trainable",
"DurableTrainable",
Expand Down
3 changes: 2 additions & 1 deletion python/ray/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
GcsLogSubscriber,
GcsFunctionKeySubscriber,
)
from ray._private.usage import usage_lib
from ray._private.runtime_env.py_modules import upload_py_modules_if_needed
from ray._private.runtime_env.working_dir import upload_working_dir_if_needed
from ray._private.runtime_env.constants import RAY_JOB_CONFIG_JSON_ENV_VAR
Expand Down Expand Up @@ -990,6 +989,8 @@ def init(
# In this case, we need to start a new cluster.

# Don't collect usage stats in ray.init().
from ray._private.usage import usage_lib

usage_lib.set_usage_stats_enabled_via_env_var(False)

# Use a random port by not specifying Redis port / GCS server port.
Expand Down
3 changes: 3 additions & 0 deletions python/ray/workflow/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from ray.workflow import workflow_access
from ray.workflow.workflow_storage import get_workflow_storage
from ray.util.annotations import PublicAPI
from ray._private.usage import usage_lib

if TYPE_CHECKING:
from ray.workflow.virtual_actor_class import VirtualActorClass, VirtualActor
Expand All @@ -46,6 +47,8 @@ def init() -> None:
If Ray is not initialized, we will initialize Ray and
use ``/tmp/ray/workflow_data`` as the default storage.
"""
usage_lib.record_library_usage("workflow")

if not ray.is_initialized():
# We should use get_temp_dir_path, but for ray client, we don't
# have this one. We need a flag to tell whether it's a client
Expand Down
3 changes: 3 additions & 0 deletions rllib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from ray.rllib.policy.tf_policy import TFPolicy
from ray.rllib.policy.torch_policy import TorchPolicy
from ray.tune.registry import register_trainable
from ray._private.usage import usage_lib


def _setup_logger():
Expand Down Expand Up @@ -56,6 +57,8 @@ def setup(self, config):

_setup_logger()

usage_lib.record_library_usage("rllib")

__all__ = [
"Policy",
"TFPolicy",
Expand Down

0 comments on commit cfc192e

Please sign in to comment.