diff --git a/python/ray/_private/usage/usage_constants.py b/python/ray/_private/usage/usage_constants.py index 9c3b8320a0e5d..9147d29f694da 100644 --- a/python/ray/_private/usage/usage_constants.py +++ b/python/ray/_private/usage/usage_constants.py @@ -30,3 +30,7 @@ "Enable usage stats collection? " "This prompt will auto-proceed in 10 seconds to avoid blocking cluster startup." ) + +LIBRARY_USAGE_PREFIX = "library_usage_" + +USAGE_STATS_NAMESPACE = "usage_stats" diff --git a/python/ray/_private/usage/usage_lib.py b/python/ray/_private/usage/usage_lib.py index 5d2ce1b231609..06a0351ab656b 100644 --- a/python/ray/_private/usage/usage_lib.py +++ b/python/ray/_private/usage/usage_lib.py @@ -59,6 +59,10 @@ import ray.ray_constants as ray_constants import ray._private.usage.usage_constants as usage_constant +from ray.experimental.internal_kv import ( + _internal_kv_put, + _internal_kv_initialized, +) logger = logging.getLogger(__name__) @@ -106,6 +110,7 @@ class UsageStatsToReport: total_num_gpus: Optional[int] total_memory_gb: Optional[float] total_object_store_memory_gb: Optional[float] + library_usages: Optional[List[str]] # The total number of successful reports for the lifetime of the cluster. total_success: int # The total number of failed reports for the lifetime of the cluster. @@ -135,6 +140,48 @@ class UsageStatsEnabledness(Enum): ENABLED_BY_DEFAULT = auto() +_recorded_library_usages = set() + + +def _put_library_usage(library_usage: str): + assert _internal_kv_initialized() + try: + _internal_kv_put( + f"{usage_constant.LIBRARY_USAGE_PREFIX}{library_usage}", + "", + namespace=usage_constant.USAGE_STATS_NAMESPACE, + ) + except Exception as e: + logger.debug(f"Failed to put library usage, {e}") + + +def record_library_usage(library_usage: str): + """Record library usage (e.g. which library is used)""" + if library_usage in _recorded_library_usages: + return + _recorded_library_usages.add(library_usage) + + if not _internal_kv_initialized(): + # This happens if the library is imported before ray.init + return + + # Only report library usage from driver to reduce + # the load to kv store. + if ray.worker.global_worker.mode == ray.SCRIPT_MODE: + _put_library_usage(library_usage) + + +def _put_pre_init_library_usages(): + assert _internal_kv_initialized() + if ray.worker.global_worker.mode != ray.SCRIPT_MODE: + return + for library_usage in _recorded_library_usages: + _put_library_usage(library_usage) + + +ray.worker._post_init_hooks.append(_put_pre_init_library_usages) + + def _usage_stats_report_url(): # The usage collection server URL. # The environment variable is testing-purpose only. @@ -318,6 +365,24 @@ def put_cluster_metadata(gcs_client, num_retries) -> None: return metadata +def get_library_usages_to_report(gcs_client, num_retries) -> List[str]: + try: + result = [] + library_usages = ray._private.utils.internal_kv_list_with_retry( + gcs_client, + usage_constant.LIBRARY_USAGE_PREFIX, + namespace=usage_constant.USAGE_STATS_NAMESPACE, + num_retries=num_retries, + ) + for library_usage in library_usages: + library_usage = library_usage.decode("utf-8") + result.append(library_usage[len(usage_constant.LIBRARY_USAGE_PREFIX) :]) + return result + except Exception as e: + logger.info(f"Failed to get library usages to report {e}") + return [] + + def get_cluster_status_to_report(gcs_client, num_retries) -> ClusterStatusToReport: """Get the current status of this cluster. @@ -498,6 +563,10 @@ def generate_report_data( ray.experimental.internal_kv.internal_kv_get_gcs_client(), num_retries=20, ) + library_usages = get_library_usages_to_report( + ray.experimental.internal_kv.internal_kv_get_gcs_client(), + num_retries=20, + ) data = UsageStatsToReport( ray_version=cluster_metadata["ray_version"], python_version=cluster_metadata["python_version"], @@ -517,6 +586,7 @@ def generate_report_data( total_num_gpus=cluster_status_to_report.total_num_gpus, total_memory_gb=cluster_status_to_report.total_memory_gb, total_object_store_memory_gb=cluster_status_to_report.total_object_store_memory_gb, # noqa: E501 + library_usages=library_usages, total_success=total_success, total_failed=total_failed, seq_number=seq_number, diff --git a/python/ray/_private/utils.py b/python/ray/_private/utils.py index c890cfe354ac3..56068566ad592 100644 --- a/python/ray/_private/utils.py +++ b/python/ray/_private/utils.py @@ -1191,6 +1191,42 @@ def check_dashboard_dependencies_installed() -> bool: return False +def internal_kv_list_with_retry(gcs_client, prefix, namespace, num_retries=20): + result = None + if isinstance(prefix, str): + prefix = prefix.encode() + if isinstance(namespace, str): + namespace = namespace.encode() + for _ in range(num_retries): + try: + result = gcs_client.internal_kv_keys(prefix, namespace) + except Exception as e: + if isinstance(e, grpc.RpcError) and e.code() in ( + grpc.StatusCode.UNAVAILABLE, + grpc.StatusCode.UNKNOWN, + ): + logger.warning( + f"Unable to connect to GCS at {gcs_client.address}. " + "Check that (1) Ray GCS with matching version started " + "successfully at the specified address, and (2) there is " + "no firewall setting preventing access." + ) + else: + logger.exception("Internal KV List failed") + result = None + + if result is not None: + break + else: + logger.debug(f"Fetched {prefix}=None from KV. Retrying.") + time.sleep(2) + if result is None: + raise RuntimeError( + f"Could not list '{prefix}' from GCS. Did GCS start successfully?" + ) + return result + + def internal_kv_get_with_retry(gcs_client, key, namespace, num_retries=20): result = None if isinstance(key, str): @@ -1228,6 +1264,10 @@ def internal_kv_get_with_retry(gcs_client, key, namespace, num_retries=20): def internal_kv_put_with_retry(gcs_client, key, value, namespace, num_retries=20): if isinstance(key, str): key = key.encode() + if isinstance(value, str): + value = value.encode() + if isinstance(namespace, str): + namespace = namespace.encode() error = None for _ in range(num_retries): try: diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 491295564f6a2..36d21ff151841 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -84,6 +84,7 @@ from ray.data.impl.block_list import BlockList from ray.data.impl.lazy_block_list import LazyBlockList from ray.data.impl.delegating_block_builder import DelegatingBlockBuilder +from ray._private.usage import usage_lib logger = logging.getLogger(__name__) @@ -128,6 +129,8 @@ def __init__( read methods to construct a dataset. """ assert isinstance(plan, ExecutionPlan) + usage_lib.record_library_usage("dataset") + self._plan = plan self._uuid = uuid4().hex self._epoch = epoch diff --git a/python/ray/node.py b/python/ray/node.py index 077c74fcd5f88..f31bb757b04f9 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -27,7 +27,6 @@ from ray._private.gcs_utils import GcsClient from ray._private.resource_spec import ResourceSpec from ray._private.utils import try_to_create_directory, try_to_symlink, open_log -import ray._private.usage.usage_lib as ray_usage_lib # Logger for this module. It should be configured at the entry point # into the program using Ray. Ray configures it by default automatically @@ -347,6 +346,8 @@ def check_version_info(self): Raises: Exception: An exception is raised if there is a version mismatch. """ + import ray._private.usage.usage_lib as ray_usage_lib + cluster_metadata = ray_usage_lib.get_cluster_metadata( self.get_gcs_client(), num_retries=NUM_REDIS_GET_RETRIES ) @@ -1054,6 +1055,8 @@ def _write_cluster_info_to_kv(self): Check `usage_stats_head.py` for more details. """ # Make sure the cluster metadata wasn't reported before. + import ray._private.usage.usage_lib as ray_usage_lib + ray_usage_lib.put_cluster_metadata(self.get_gcs_client(), NUM_REDIS_GET_RETRIES) def start_head_processes(self): diff --git a/python/ray/serve/api.py b/python/ray/serve/api.py index d9c7f10d10ec8..eb5b61c091e30 100644 --- a/python/ray/serve/api.py +++ b/python/ray/serve/api.py @@ -57,6 +57,7 @@ get_internal_replica_context, ReplicaContext, ) +from ray._private.usage import usage_lib logger = logging.getLogger(__file__) @@ -108,6 +109,8 @@ def start( dedicated_cpu (bool): Whether to reserve a CPU core for the internal Serve controller actor. Defaults to False. """ + usage_lib.record_library_usage("serve") + http_deprecated_args = ["http_host", "http_port", "http_middlewares"] for key in http_deprecated_args: if key in kwargs: diff --git a/python/ray/tests/test_usage_stats.py b/python/ray/tests/test_usage_stats.py index 04bc09e131aed..b4f572543c164 100644 --- a/python/ray/tests/test_usage_stats.py +++ b/python/ray/tests/test_usage_stats.py @@ -43,6 +43,10 @@ "total_num_gpus": {"type": ["null", "integer"]}, "total_memory_gb": {"type": ["null", "number"]}, "total_object_store_memory_gb": {"type": ["null", "number"]}, + "library_usages": { + "type": ["null", "array"], + "items": {"type": "string"}, + }, "total_success": {"type": "integer"}, "total_failed": {"type": "integer"}, "seq_number": {"type": "integer"}, @@ -281,6 +285,35 @@ def test_usage_lib_cluster_metadata_generation(monkeypatch, ray_start_cluster): ) +def test_library_usages(): + if os.environ.get("RAY_MINIMAL") == "1": + # Doesn't work with minimal installation + # since we import serve. + return + + ray_usage_lib._recorded_library_usages.clear() + ray_usage_lib.record_library_usage("pre_init") + ray.init() + ray_usage_lib.record_library_usage("post_init") + ray.workflow.init() + ray.data.range(10) + from ray import serve + + serve.start() + library_usages = ray_usage_lib.get_library_usages_to_report( + ray.experimental.internal_kv.internal_kv_get_gcs_client(), num_retries=20 + ) + assert set(library_usages) == { + "pre_init", + "post_init", + "dataset", + "workflow", + "serve", + } + serve.shutdown() + ray.shutdown() + + def test_usage_lib_cluster_metadata_generation_usage_disabled( monkeypatch, shutdown_only ): @@ -531,6 +564,12 @@ def test_usage_report_e2e(monkeypatch, ray_start_cluster, tmp_path): m.setenv("RAY_USAGE_STATS_REPORT_INTERVAL_S", "1") cluster = ray_start_cluster cluster.add_node(num_cpus=3) + ray_usage_lib._recorded_library_usages.clear() + if os.environ.get("RAY_MINIMAL") != "1": + from ray import tune # noqa: F401 + from ray.rllib.agents.ppo import PPOTrainer # noqa: F401 + from ray import train # noqa: F401 + ray.init(address=cluster.address) @ray.remote(num_cpus=0) @@ -556,6 +595,8 @@ def get_payload(self): @ray.remote(num_cpus=0, runtime_env={"pip": ["ray[serve]"]}) class ServeInitator: def __init__(self): + # This is used in the worker process + # so it won't be tracked as library usage. from ray import serve serve.start() @@ -605,6 +646,10 @@ def ready(self): assert payload["total_num_gpus"] is None assert payload["total_memory_gb"] > 0 assert payload["total_object_store_memory_gb"] > 0 + if os.environ.get("RAY_MINIMAL") == "1": + assert set(payload["library_usages"]) == set() + else: + assert set(payload["library_usages"]) == {"rllib", "train", "tune"} validate(instance=payload, schema=schema) """ Verify the usage_stats.json is updated. diff --git a/python/ray/train/__init__.py b/python/ray/train/__init__.py index 58a61bb045059..21c80b3511bd6 100644 --- a/python/ray/train/__init__.py +++ b/python/ray/train/__init__.py @@ -12,6 +12,10 @@ ) from ray.train.trainer import Trainer, TrainingIterator +from ray._private.usage import usage_lib + +usage_lib.record_library_usage("train") + __all__ = [ "BackendConfig", "CheckpointStrategy", diff --git a/python/ray/tune/__init__.py b/python/ray/tune/__init__.py index e4573ee16787d..1eef324151a01 100644 --- a/python/ray/tune/__init__.py +++ b/python/ray/tune/__init__.py @@ -43,6 +43,10 @@ from ray.tune.utils.placement_groups import PlacementGroupFactory from ray.tune.utils.trainable import with_parameters +from ray._private.usage import usage_lib + +usage_lib.record_library_usage("tune") + __all__ = [ "Trainable", "DurableTrainable", diff --git a/python/ray/worker.py b/python/ray/worker.py index 84ecaa27750a7..3cb129e0a0fae 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -37,7 +37,6 @@ GcsLogSubscriber, GcsFunctionKeySubscriber, ) -from ray._private.usage import usage_lib from ray._private.runtime_env.py_modules import upload_py_modules_if_needed from ray._private.runtime_env.working_dir import upload_working_dir_if_needed from ray._private.runtime_env.constants import RAY_JOB_CONFIG_JSON_ENV_VAR @@ -990,6 +989,8 @@ def init( # In this case, we need to start a new cluster. # Don't collect usage stats in ray.init(). + from ray._private.usage import usage_lib + usage_lib.set_usage_stats_enabled_via_env_var(False) # Use a random port by not specifying Redis port / GCS server port. diff --git a/python/ray/workflow/api.py b/python/ray/workflow/api.py index 4a5a4c76ddd90..6f078e035659a 100644 --- a/python/ray/workflow/api.py +++ b/python/ray/workflow/api.py @@ -29,6 +29,7 @@ from ray.workflow import workflow_access from ray.workflow.workflow_storage import get_workflow_storage from ray.util.annotations import PublicAPI +from ray._private.usage import usage_lib if TYPE_CHECKING: from ray.workflow.virtual_actor_class import VirtualActorClass, VirtualActor @@ -46,6 +47,8 @@ def init() -> None: If Ray is not initialized, we will initialize Ray and use ``/tmp/ray/workflow_data`` as the default storage. """ + usage_lib.record_library_usage("workflow") + if not ray.is_initialized(): # We should use get_temp_dir_path, but for ray client, we don't # have this one. We need a flag to tell whether it's a client diff --git a/rllib/__init__.py b/rllib/__init__.py index 1bc6b1abef202..a8867c09bf1d7 100644 --- a/rllib/__init__.py +++ b/rllib/__init__.py @@ -12,6 +12,7 @@ from ray.rllib.policy.tf_policy import TFPolicy from ray.rllib.policy.torch_policy import TorchPolicy from ray.tune.registry import register_trainable +from ray._private.usage import usage_lib def _setup_logger(): @@ -56,6 +57,8 @@ def setup(self, config): _setup_logger() +usage_lib.record_library_usage("rllib") + __all__ = [ "Policy", "TFPolicy",