diff --git a/.buildkite/pipeline.macos.yml b/.buildkite/pipeline.macos.yml index 09b876c82147..9cdc3d8cec73 100644 --- a/.buildkite/pipeline.macos.yml +++ b/.buildkite/pipeline.macos.yml @@ -19,7 +19,7 @@ prelude_commands: &prelude_commands |- epilogue_commands: &epilogue_commands |- # Cleanup runtime environment to save storage - rm -rf /tmp/ray + rm -rf /tmp/ray || true # Cleanup local caches (this shouldn't clean up global disk cache) bazel clean diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index f1979080485c..07bc88cbc8d2 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -324,6 +324,36 @@ - bazel test --config=ci $(./ci/run/bazel_export_options) --test_tag_filters=-kubernetes,medium_size_python_tests_k_to_z python/ray/tests/... +- label: ":redis: (External Redis) (Small & Client)" + conditions: ["RAY_CI_PYTHON_AFFECTED"] + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT + - bazel test --config=ci $(./scripts/bazel_export_options) + --test_tag_filters=client_tests,small_size_python_tests + --test_env=TEST_EXTERNAL_REDIS=1 + -- python/ray/tests/... +- label: ":redis: (External Redis) (Large)" + conditions: ["RAY_CI_PYTHON_AFFECTED"] + parallelism: 3 + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT + - TEST_EXTERNAL_REDIS=1 . ./ci/ci.sh test_large +- label: ":redis: (External Redis) (Medium A-J)" + conditions: ["RAY_CI_PYTHON_AFFECTED"] + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT + - bazel test --config=ci $(./scripts/bazel_export_options) + --test_tag_filters=-kubernetes,medium_size_python_tests_a_to_j + --test_env=TEST_EXTERNAL_REDIS=1 + -- //python/ray/tests/... +- label: ":redis: (External Redis) (Medium K-Z)" + conditions: ["RAY_CI_PYTHON_AFFECTED"] + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT + - bazel test --config=ci $(./scripts/bazel_export_options) + --test_tag_filters=-kubernetes,medium_size_python_tests_k_to_z + --test_env=TEST_EXTERNAL_REDIS=1 + -- //python/ray/tests/... - label: ":python: Debug Test" conditions: ["RAY_CI_PYTHON_AFFECTED"] commands: diff --git a/bazel/python.bzl b/bazel/python.bzl index 5749d7725b07..725da8d20525 100644 --- a/bazel/python.bzl +++ b/bazel/python.bzl @@ -2,6 +2,7 @@ load("@bazel_skylib//lib:paths.bzl", "paths") # py_test_module_list creates a py_test target for each # Python file in `files` + def py_test_module_list(files, size, deps, extra_srcs, name_suffix="", **kwargs): for file in files: # remove .py diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index b1b83fd7fe75..d61a1c771ae5 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -64,6 +64,12 @@ def make_global_state_accessor(ray_context): return global_state_accessor +def test_external_redis(): + import os + + return os.environ.get("TEST_EXTERNAL_REDIS") == "1" + + def _pid_alive(pid): """Check if the process with this PID is alive or not. diff --git a/python/ray/autoscaler/_private/constants.py b/python/ray/autoscaler/_private/constants.py index 56addae330c2..05832cf92335 100644 --- a/python/ray/autoscaler/_private/constants.py +++ b/python/ray/autoscaler/_private/constants.py @@ -87,7 +87,6 @@ def env_integer(key, default): ["gcs_server", True], ["monitor.py", False], ["ray.util.client.server", False], - ["redis-server", False], ["default_worker.py", False], # Python worker. ["setup_worker.py", False], # Python environment setup worker. # For mac osx, setproctitle doesn't change the process name returned diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 5ed913fb645e..424ec1a3411c 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -984,19 +984,7 @@ def stop(force, grace_period): proc, proc_cmd, proc_args = candidate corpus = proc_cmd if filter_by_cmd else subprocess.list2cmdline(proc_args) if keyword in corpus: - # This is a way to avoid killing redis server that's not started by Ray. - # We are using a simple hacky solution here since - # Redis server will anyway removed soon from the ray repository. - # This feature is only supported on MacOS/Linux temporarily until - # Redis is removed from Ray. - if ( - keyword == "redis-server" - and sys.platform != "win32" - and "core/src/ray/thirdparty/redis/src/redis-server" not in corpus - ): - continue found.append(candidate) - for proc, proc_cmd, proc_args in found: proc_string = str(subprocess.list2cmdline(proc_args)) try: diff --git a/python/ray/serve/tests/test_cross_language.py b/python/ray/serve/tests/test_cross_language.py index 8a14b173930d..99d91b650809 100644 --- a/python/ray/serve/tests/test_cross_language.py +++ b/python/ray/serve/tests/test_cross_language.py @@ -4,7 +4,7 @@ from ray.serve.config import ReplicaConfig, DeploymentConfig from ray.serve.utils import msgpack_serialize from ray.serve.generated.serve_pb2 import JAVA, RequestMetadata, RequestWrapper -from ray.tests.conftest import shutdown_only # noqa: F401 +from ray.tests.conftest import shutdown_only, maybe_external_redis # noqa: F401 def test_controller_starts_java_replica(shutdown_only): # noqa: F811 diff --git a/python/ray/serve/tests/test_standalone.py b/python/ray/serve/tests/test_standalone.py index 20357ed935cb..053fa49ef06a 100644 --- a/python/ray/serve/tests/test_standalone.py +++ b/python/ray/serve/tests/test_standalone.py @@ -36,6 +36,7 @@ # Explicitly importing it here because it is a ray core tests utility ( # not in the tree) from ray.tests.conftest import ray_start_with_dashboard # noqa: F401 +from ray.tests.conftest import maybe_external_redis # noqa: F401 @pytest.fixture diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index 0bb351076fb0..8114a887551c 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -32,17 +32,11 @@ setup_tls, teardown_tls, get_and_run_node_killer, + test_external_redis, ) from ray.cluster_utils import Cluster, AutoscalingCluster, cluster_not_supported -@pytest.fixture -def shutdown_only(): - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - def get_default_fixure_system_config(): system_config = { "object_timeout_milliseconds": 200, @@ -64,10 +58,11 @@ def get_default_fixture_ray_kwargs(): return ray_kwargs -@pytest.fixture -def external_redis(request, monkeypatch): +@contextmanager +def _setup_redis(request): # Setup external Redis and env var for initialization. param = getattr(request, "param", {}) + external_redis_ports = param.get("external_redis_ports") if external_redis_ports is None: with socket.socket() as s: @@ -88,12 +83,41 @@ def external_redis(request, monkeypatch): processes.append(proc) wait_for_redis_to_start("127.0.0.1", port, ray_constants.REDIS_DEFAULT_PASSWORD) address_str = ",".join(map(lambda x: f"127.0.0.1:{x}", external_redis_ports)) - monkeypatch.setenv("RAY_REDIS_ADDRESS", address_str) - yield None + import os + + old_addr = os.environ.get("RAY_REDIS_ADDRESS") + os.environ["RAY_REDIS_ADDRESS"] = address_str + yield + if old_addr is not None: + os.environ["RAY_REDIS_ADDRESS"] = old_addr + else: + del os.environ["RAY_REDIS_ADDRESS"] for proc in processes: proc.process.terminate() +@pytest.fixture +def maybe_external_redis(request): + if test_external_redis(): + with _setup_redis(request): + yield + else: + yield + + +@pytest.fixture +def external_redis(request): + with _setup_redis(request): + yield + + +@pytest.fixture +def shutdown_only(maybe_external_redis): + yield None + # The code after the yield will run as teardown code. + ray.shutdown() + + @contextmanager def _ray_start(**kwargs): init_kwargs = get_default_fixture_ray_kwargs() @@ -107,7 +131,7 @@ def _ray_start(**kwargs): @pytest.fixture -def ray_start_with_dashboard(request): +def ray_start_with_dashboard(request, maybe_external_redis): param = getattr(request, "param", {}) if param.get("num_cpus") is None: param["num_cpus"] = 1 @@ -117,7 +141,7 @@ def ray_start_with_dashboard(request): # The following fixture will start ray with 0 cpu. @pytest.fixture -def ray_start_no_cpu(request): +def ray_start_no_cpu(request, maybe_external_redis): param = getattr(request, "param", {}) with _ray_start(num_cpus=0, **param) as res: yield res @@ -125,7 +149,7 @@ def ray_start_no_cpu(request): # The following fixture will start ray with 1 cpu. @pytest.fixture -def ray_start_regular(request): +def ray_start_regular(request, maybe_external_redis): param = getattr(request, "param", {}) with _ray_start(**param) as res: yield res @@ -156,14 +180,14 @@ def ray_start_shared_local_modes(request): @pytest.fixture -def ray_start_2_cpus(request): +def ray_start_2_cpus(request, maybe_external_redis): param = getattr(request, "param", {}) with _ray_start(num_cpus=2, **param) as res: yield res @pytest.fixture -def ray_start_10_cpus(request): +def ray_start_10_cpus(request, maybe_external_redis): param = getattr(request, "param", {}) with _ray_start(num_cpus=10, **param) as res: yield res @@ -206,14 +230,14 @@ def _ray_start_cluster(**kwargs): # This fixture will start a cluster with empty nodes. @pytest.fixture -def ray_start_cluster(request): +def ray_start_cluster(request, maybe_external_redis): param = getattr(request, "param", {}) with _ray_start_cluster(**param) as res: yield res @pytest.fixture -def ray_start_cluster_enabled(request): +def ray_start_cluster_enabled(request, maybe_external_redis): param = getattr(request, "param", {}) param["skip_cluster"] = False with _ray_start_cluster(**param) as res: @@ -221,14 +245,14 @@ def ray_start_cluster_enabled(request): @pytest.fixture -def ray_start_cluster_init(request): +def ray_start_cluster_init(request, maybe_external_redis): param = getattr(request, "param", {}) with _ray_start_cluster(do_init=True, **param) as res: yield res @pytest.fixture -def ray_start_cluster_head(request): +def ray_start_cluster_head(request, maybe_external_redis): param = getattr(request, "param", {}) with _ray_start_cluster(do_init=True, num_nodes=1, **param) as res: yield res @@ -245,14 +269,14 @@ def ray_start_cluster_head_with_external_redis(request, external_redis): @pytest.fixture -def ray_start_cluster_2_nodes(request): +def ray_start_cluster_2_nodes(request, maybe_external_redis): param = getattr(request, "param", {}) with _ray_start_cluster(do_init=True, num_nodes=2, **param) as res: yield res @pytest.fixture -def ray_start_object_store_memory(request): +def ray_start_object_store_memory(request, maybe_external_redis): # Start the Ray processes. store_size = request.param system_config = get_default_fixure_system_config() diff --git a/python/ray/tests/test_client_proxy.py b/python/ray/tests/test_client_proxy.py index d0d1d2c307d1..615161d3a0f2 100644 --- a/python/ray/tests/test_client_proxy.py +++ b/python/ray/tests/test_client_proxy.py @@ -10,6 +10,7 @@ import grpc import ray +from ray.ray_constants import REDIS_DEFAULT_PASSWORD import ray.core.generated.ray_client_pb2 as ray_client_pb2 from ray.cloudpickle.compat import pickle from ray.job_config import JobConfig @@ -18,12 +19,12 @@ def start_ray_and_proxy_manager(n_ports=2): - ray_instance = ray.init(_redis_password="test") + ray_instance = ray.init(_redis_password=REDIS_DEFAULT_PASSWORD) agent_port = ray.worker.global_worker.node.metrics_agent_port pm = proxier.ProxyManager( ray_instance["address"], session_dir=ray_instance["session_dir"], - redis_password="test", + redis_password=REDIS_DEFAULT_PASSWORD, runtime_env_agent_port=agent_port, ) free_ports = random.choices(range(45000, 45100), k=n_ports) diff --git a/python/ray/tests/test_multiprocessing.py b/python/ray/tests/test_multiprocessing.py index de346e50da68..d43d0bc1d728 100644 --- a/python/ray/tests/test_multiprocessing.py +++ b/python/ray/tests/test_multiprocessing.py @@ -16,6 +16,7 @@ from ray.util.joblib import register_ray from joblib import parallel_backend, Parallel, delayed +from ray._private.test_utils import test_external_redis def teardown_function(function): @@ -68,6 +69,9 @@ def ray_start_4_cpu(): ray.shutdown() +@pytest.mark.skipif( + test_external_redis(), reason="The same Redis is used within the test." +) def test_ray_init(shutdown_only): def getpid(args): return os.getpid() @@ -117,6 +121,9 @@ def check_pool_size(pool, size): ], indirect=True, ) +@pytest.mark.skipif( + test_external_redis(), reason="The same Redis is used within the test." +) def test_connect_to_ray(ray_start_cluster): def getpid(args): return os.getpid() diff --git a/python/ray/tests/test_ray_init.py b/python/ray/tests/test_ray_init.py index 12b09066f8ab..7b375ee09d8f 100644 --- a/python/ray/tests/test_ray_init.py +++ b/python/ray/tests/test_ray_init.py @@ -3,7 +3,6 @@ import logging import pytest -import redis import unittest.mock import ray import ray._private.services @@ -16,80 +15,6 @@ import grpc -@pytest.fixture -def password(): - random_bytes = os.urandom(128) - if hasattr(random_bytes, "hex"): - return random_bytes.hex() # Python 3 - return random_bytes.encode("hex") # Python 2 - - -class TestRedisPassword: - @pytest.mark.skipif( - True, reason="Not valid anymore. To be added back when fixing Redis mode" - ) - def test_redis_password(self, password, shutdown_only): - @ray.remote - def f(): - return 1 - - info = ray.init(_redis_password=password) - address = info["redis_address"] - redis_ip, redis_port = address.split(":") - - # Check that we can run a task - object_ref = f.remote() - ray.get(object_ref) - - # Check that Redis connections require a password - redis_client = redis.StrictRedis(host=redis_ip, port=redis_port, password=None) - with pytest.raises(redis.exceptions.AuthenticationError): - redis_client.ping() - # We want to simulate how this is called by ray.scripts.start(). - try: - ray._private.services.wait_for_redis_to_start( - redis_ip, redis_port, password="wrong password" - ) - # We catch a generic Exception here in case someone later changes the - # type of the exception. - except Exception as ex: - if not ( - isinstance(ex.__cause__, redis.AuthenticationError) - and "invalid password" in str(ex.__cause__) - ) and not ( - isinstance(ex, redis.ResponseError) - and "WRONGPASS invalid username-password pair" in str(ex) - ): - raise - # By contrast, we may be fairly confident the exact string - # 'invalid password' won't go away, because redis-py simply wraps - # the exact error from the Redis library. - # https://github.com/andymccurdy/redis-py/blob/master/ - # redis/connection.py#L132 - # Except, apparently sometimes redis-py raises a completely - # different *type* of error for a bad password, - # redis.ResponseError, which is not even derived from - # redis.ConnectionError as redis.AuthenticationError is. - - # Check that we can connect to Redis using the provided password - redis_client = redis.StrictRedis( - host=redis_ip, port=redis_port, password=password - ) - assert redis_client.ping() - - def test_redis_password_cluster(self, password, shutdown_only): - @ray.remote - def f(): - return 1 - - node_args = {"redis_password": password} - cluster = Cluster(initialize_head=True, connect=True, head_node_args=node_args) - cluster.add_node(**node_args) - - object_ref = f.remote() - ray.get(object_ref) - - def test_shutdown_and_reset_global_worker(shutdown_only): ray.init(job_config=ray.job_config.JobConfig(code_search_path=["a"])) ray.shutdown()