From 8ec558dcb949ce39df5fd7a855001287de2ce73b Mon Sep 17 00:00:00 2001 From: Yi Cheng <74173148+iycheng@users.noreply.github.com> Date: Thu, 19 May 2022 21:46:55 -0700 Subject: [PATCH] [core] Reenable GCS test with redis as backend. (#23506) Since ray supports Redis as a storage backend, we should ensure the code path with Redis as storage is still being covered e2e. The tests don't run for a while after we switch to memory mode by default. This PR tries to fix this and make it run with every commit. In the future, if we support more and more storage backends, this should be revised to be more efficient and selective. But now I think the cost should be ok. This PR is part of GCS HA testing-related work. --- .buildkite/pipeline.macos.yml | 2 +- .buildkite/pipeline.yml | 30 ++++++++ bazel/python.bzl | 1 + python/ray/_private/test_utils.py | 6 ++ python/ray/autoscaler/_private/constants.py | 1 - python/ray/scripts/scripts.py | 12 --- python/ray/serve/tests/test_cross_language.py | 2 +- python/ray/serve/tests/test_standalone.py | 1 + python/ray/tests/conftest.py | 68 +++++++++++------ python/ray/tests/test_client_proxy.py | 5 +- python/ray/tests/test_multiprocessing.py | 7 ++ python/ray/tests/test_ray_init.py | 75 ------------------- 12 files changed, 96 insertions(+), 114 deletions(-) diff --git a/.buildkite/pipeline.macos.yml b/.buildkite/pipeline.macos.yml index 09b876c82147..9cdc3d8cec73 100644 --- a/.buildkite/pipeline.macos.yml +++ b/.buildkite/pipeline.macos.yml @@ -19,7 +19,7 @@ prelude_commands: &prelude_commands |- epilogue_commands: &epilogue_commands |- # Cleanup runtime environment to save storage - rm -rf /tmp/ray + rm -rf /tmp/ray || true # Cleanup local caches (this shouldn't clean up global disk cache) bazel clean diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index f1979080485c..07bc88cbc8d2 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -324,6 +324,36 @@ - bazel test --config=ci $(./ci/run/bazel_export_options) --test_tag_filters=-kubernetes,medium_size_python_tests_k_to_z python/ray/tests/... +- label: ":redis: (External Redis) (Small & Client)" + conditions: ["RAY_CI_PYTHON_AFFECTED"] + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT + - bazel test --config=ci $(./scripts/bazel_export_options) + --test_tag_filters=client_tests,small_size_python_tests + --test_env=TEST_EXTERNAL_REDIS=1 + -- python/ray/tests/... +- label: ":redis: (External Redis) (Large)" + conditions: ["RAY_CI_PYTHON_AFFECTED"] + parallelism: 3 + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT + - TEST_EXTERNAL_REDIS=1 . ./ci/ci.sh test_large +- label: ":redis: (External Redis) (Medium A-J)" + conditions: ["RAY_CI_PYTHON_AFFECTED"] + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT + - bazel test --config=ci $(./scripts/bazel_export_options) + --test_tag_filters=-kubernetes,medium_size_python_tests_a_to_j + --test_env=TEST_EXTERNAL_REDIS=1 + -- //python/ray/tests/... +- label: ":redis: (External Redis) (Medium K-Z)" + conditions: ["RAY_CI_PYTHON_AFFECTED"] + commands: + - cleanup() { if [ "${BUILDKITE_PULL_REQUEST}" = "false" ]; then ./ci/build/upload_build_info.sh; fi }; trap cleanup EXIT + - bazel test --config=ci $(./scripts/bazel_export_options) + --test_tag_filters=-kubernetes,medium_size_python_tests_k_to_z + --test_env=TEST_EXTERNAL_REDIS=1 + -- //python/ray/tests/... - label: ":python: Debug Test" conditions: ["RAY_CI_PYTHON_AFFECTED"] commands: diff --git a/bazel/python.bzl b/bazel/python.bzl index 5749d7725b07..725da8d20525 100644 --- a/bazel/python.bzl +++ b/bazel/python.bzl @@ -2,6 +2,7 @@ load("@bazel_skylib//lib:paths.bzl", "paths") # py_test_module_list creates a py_test target for each # Python file in `files` + def py_test_module_list(files, size, deps, extra_srcs, name_suffix="", **kwargs): for file in files: # remove .py diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index b1b83fd7fe75..d61a1c771ae5 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -64,6 +64,12 @@ def make_global_state_accessor(ray_context): return global_state_accessor +def test_external_redis(): + import os + + return os.environ.get("TEST_EXTERNAL_REDIS") == "1" + + def _pid_alive(pid): """Check if the process with this PID is alive or not. diff --git a/python/ray/autoscaler/_private/constants.py b/python/ray/autoscaler/_private/constants.py index 56addae330c2..05832cf92335 100644 --- a/python/ray/autoscaler/_private/constants.py +++ b/python/ray/autoscaler/_private/constants.py @@ -87,7 +87,6 @@ def env_integer(key, default): ["gcs_server", True], ["monitor.py", False], ["ray.util.client.server", False], - ["redis-server", False], ["default_worker.py", False], # Python worker. ["setup_worker.py", False], # Python environment setup worker. # For mac osx, setproctitle doesn't change the process name returned diff --git a/python/ray/scripts/scripts.py b/python/ray/scripts/scripts.py index 5ed913fb645e..424ec1a3411c 100644 --- a/python/ray/scripts/scripts.py +++ b/python/ray/scripts/scripts.py @@ -984,19 +984,7 @@ def stop(force, grace_period): proc, proc_cmd, proc_args = candidate corpus = proc_cmd if filter_by_cmd else subprocess.list2cmdline(proc_args) if keyword in corpus: - # This is a way to avoid killing redis server that's not started by Ray. - # We are using a simple hacky solution here since - # Redis server will anyway removed soon from the ray repository. - # This feature is only supported on MacOS/Linux temporarily until - # Redis is removed from Ray. - if ( - keyword == "redis-server" - and sys.platform != "win32" - and "core/src/ray/thirdparty/redis/src/redis-server" not in corpus - ): - continue found.append(candidate) - for proc, proc_cmd, proc_args in found: proc_string = str(subprocess.list2cmdline(proc_args)) try: diff --git a/python/ray/serve/tests/test_cross_language.py b/python/ray/serve/tests/test_cross_language.py index 8a14b173930d..99d91b650809 100644 --- a/python/ray/serve/tests/test_cross_language.py +++ b/python/ray/serve/tests/test_cross_language.py @@ -4,7 +4,7 @@ from ray.serve.config import ReplicaConfig, DeploymentConfig from ray.serve.utils import msgpack_serialize from ray.serve.generated.serve_pb2 import JAVA, RequestMetadata, RequestWrapper -from ray.tests.conftest import shutdown_only # noqa: F401 +from ray.tests.conftest import shutdown_only, maybe_external_redis # noqa: F401 def test_controller_starts_java_replica(shutdown_only): # noqa: F811 diff --git a/python/ray/serve/tests/test_standalone.py b/python/ray/serve/tests/test_standalone.py index 20357ed935cb..053fa49ef06a 100644 --- a/python/ray/serve/tests/test_standalone.py +++ b/python/ray/serve/tests/test_standalone.py @@ -36,6 +36,7 @@ # Explicitly importing it here because it is a ray core tests utility ( # not in the tree) from ray.tests.conftest import ray_start_with_dashboard # noqa: F401 +from ray.tests.conftest import maybe_external_redis # noqa: F401 @pytest.fixture diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index 0bb351076fb0..8114a887551c 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -32,17 +32,11 @@ setup_tls, teardown_tls, get_and_run_node_killer, + test_external_redis, ) from ray.cluster_utils import Cluster, AutoscalingCluster, cluster_not_supported -@pytest.fixture -def shutdown_only(): - yield None - # The code after the yield will run as teardown code. - ray.shutdown() - - def get_default_fixure_system_config(): system_config = { "object_timeout_milliseconds": 200, @@ -64,10 +58,11 @@ def get_default_fixture_ray_kwargs(): return ray_kwargs -@pytest.fixture -def external_redis(request, monkeypatch): +@contextmanager +def _setup_redis(request): # Setup external Redis and env var for initialization. param = getattr(request, "param", {}) + external_redis_ports = param.get("external_redis_ports") if external_redis_ports is None: with socket.socket() as s: @@ -88,12 +83,41 @@ def external_redis(request, monkeypatch): processes.append(proc) wait_for_redis_to_start("127.0.0.1", port, ray_constants.REDIS_DEFAULT_PASSWORD) address_str = ",".join(map(lambda x: f"127.0.0.1:{x}", external_redis_ports)) - monkeypatch.setenv("RAY_REDIS_ADDRESS", address_str) - yield None + import os + + old_addr = os.environ.get("RAY_REDIS_ADDRESS") + os.environ["RAY_REDIS_ADDRESS"] = address_str + yield + if old_addr is not None: + os.environ["RAY_REDIS_ADDRESS"] = old_addr + else: + del os.environ["RAY_REDIS_ADDRESS"] for proc in processes: proc.process.terminate() +@pytest.fixture +def maybe_external_redis(request): + if test_external_redis(): + with _setup_redis(request): + yield + else: + yield + + +@pytest.fixture +def external_redis(request): + with _setup_redis(request): + yield + + +@pytest.fixture +def shutdown_only(maybe_external_redis): + yield None + # The code after the yield will run as teardown code. + ray.shutdown() + + @contextmanager def _ray_start(**kwargs): init_kwargs = get_default_fixture_ray_kwargs() @@ -107,7 +131,7 @@ def _ray_start(**kwargs): @pytest.fixture -def ray_start_with_dashboard(request): +def ray_start_with_dashboard(request, maybe_external_redis): param = getattr(request, "param", {}) if param.get("num_cpus") is None: param["num_cpus"] = 1 @@ -117,7 +141,7 @@ def ray_start_with_dashboard(request): # The following fixture will start ray with 0 cpu. @pytest.fixture -def ray_start_no_cpu(request): +def ray_start_no_cpu(request, maybe_external_redis): param = getattr(request, "param", {}) with _ray_start(num_cpus=0, **param) as res: yield res @@ -125,7 +149,7 @@ def ray_start_no_cpu(request): # The following fixture will start ray with 1 cpu. @pytest.fixture -def ray_start_regular(request): +def ray_start_regular(request, maybe_external_redis): param = getattr(request, "param", {}) with _ray_start(**param) as res: yield res @@ -156,14 +180,14 @@ def ray_start_shared_local_modes(request): @pytest.fixture -def ray_start_2_cpus(request): +def ray_start_2_cpus(request, maybe_external_redis): param = getattr(request, "param", {}) with _ray_start(num_cpus=2, **param) as res: yield res @pytest.fixture -def ray_start_10_cpus(request): +def ray_start_10_cpus(request, maybe_external_redis): param = getattr(request, "param", {}) with _ray_start(num_cpus=10, **param) as res: yield res @@ -206,14 +230,14 @@ def _ray_start_cluster(**kwargs): # This fixture will start a cluster with empty nodes. @pytest.fixture -def ray_start_cluster(request): +def ray_start_cluster(request, maybe_external_redis): param = getattr(request, "param", {}) with _ray_start_cluster(**param) as res: yield res @pytest.fixture -def ray_start_cluster_enabled(request): +def ray_start_cluster_enabled(request, maybe_external_redis): param = getattr(request, "param", {}) param["skip_cluster"] = False with _ray_start_cluster(**param) as res: @@ -221,14 +245,14 @@ def ray_start_cluster_enabled(request): @pytest.fixture -def ray_start_cluster_init(request): +def ray_start_cluster_init(request, maybe_external_redis): param = getattr(request, "param", {}) with _ray_start_cluster(do_init=True, **param) as res: yield res @pytest.fixture -def ray_start_cluster_head(request): +def ray_start_cluster_head(request, maybe_external_redis): param = getattr(request, "param", {}) with _ray_start_cluster(do_init=True, num_nodes=1, **param) as res: yield res @@ -245,14 +269,14 @@ def ray_start_cluster_head_with_external_redis(request, external_redis): @pytest.fixture -def ray_start_cluster_2_nodes(request): +def ray_start_cluster_2_nodes(request, maybe_external_redis): param = getattr(request, "param", {}) with _ray_start_cluster(do_init=True, num_nodes=2, **param) as res: yield res @pytest.fixture -def ray_start_object_store_memory(request): +def ray_start_object_store_memory(request, maybe_external_redis): # Start the Ray processes. store_size = request.param system_config = get_default_fixure_system_config() diff --git a/python/ray/tests/test_client_proxy.py b/python/ray/tests/test_client_proxy.py index d0d1d2c307d1..615161d3a0f2 100644 --- a/python/ray/tests/test_client_proxy.py +++ b/python/ray/tests/test_client_proxy.py @@ -10,6 +10,7 @@ import grpc import ray +from ray.ray_constants import REDIS_DEFAULT_PASSWORD import ray.core.generated.ray_client_pb2 as ray_client_pb2 from ray.cloudpickle.compat import pickle from ray.job_config import JobConfig @@ -18,12 +19,12 @@ def start_ray_and_proxy_manager(n_ports=2): - ray_instance = ray.init(_redis_password="test") + ray_instance = ray.init(_redis_password=REDIS_DEFAULT_PASSWORD) agent_port = ray.worker.global_worker.node.metrics_agent_port pm = proxier.ProxyManager( ray_instance["address"], session_dir=ray_instance["session_dir"], - redis_password="test", + redis_password=REDIS_DEFAULT_PASSWORD, runtime_env_agent_port=agent_port, ) free_ports = random.choices(range(45000, 45100), k=n_ports) diff --git a/python/ray/tests/test_multiprocessing.py b/python/ray/tests/test_multiprocessing.py index de346e50da68..d43d0bc1d728 100644 --- a/python/ray/tests/test_multiprocessing.py +++ b/python/ray/tests/test_multiprocessing.py @@ -16,6 +16,7 @@ from ray.util.joblib import register_ray from joblib import parallel_backend, Parallel, delayed +from ray._private.test_utils import test_external_redis def teardown_function(function): @@ -68,6 +69,9 @@ def ray_start_4_cpu(): ray.shutdown() +@pytest.mark.skipif( + test_external_redis(), reason="The same Redis is used within the test." +) def test_ray_init(shutdown_only): def getpid(args): return os.getpid() @@ -117,6 +121,9 @@ def check_pool_size(pool, size): ], indirect=True, ) +@pytest.mark.skipif( + test_external_redis(), reason="The same Redis is used within the test." +) def test_connect_to_ray(ray_start_cluster): def getpid(args): return os.getpid() diff --git a/python/ray/tests/test_ray_init.py b/python/ray/tests/test_ray_init.py index 12b09066f8ab..7b375ee09d8f 100644 --- a/python/ray/tests/test_ray_init.py +++ b/python/ray/tests/test_ray_init.py @@ -3,7 +3,6 @@ import logging import pytest -import redis import unittest.mock import ray import ray._private.services @@ -16,80 +15,6 @@ import grpc -@pytest.fixture -def password(): - random_bytes = os.urandom(128) - if hasattr(random_bytes, "hex"): - return random_bytes.hex() # Python 3 - return random_bytes.encode("hex") # Python 2 - - -class TestRedisPassword: - @pytest.mark.skipif( - True, reason="Not valid anymore. To be added back when fixing Redis mode" - ) - def test_redis_password(self, password, shutdown_only): - @ray.remote - def f(): - return 1 - - info = ray.init(_redis_password=password) - address = info["redis_address"] - redis_ip, redis_port = address.split(":") - - # Check that we can run a task - object_ref = f.remote() - ray.get(object_ref) - - # Check that Redis connections require a password - redis_client = redis.StrictRedis(host=redis_ip, port=redis_port, password=None) - with pytest.raises(redis.exceptions.AuthenticationError): - redis_client.ping() - # We want to simulate how this is called by ray.scripts.start(). - try: - ray._private.services.wait_for_redis_to_start( - redis_ip, redis_port, password="wrong password" - ) - # We catch a generic Exception here in case someone later changes the - # type of the exception. - except Exception as ex: - if not ( - isinstance(ex.__cause__, redis.AuthenticationError) - and "invalid password" in str(ex.__cause__) - ) and not ( - isinstance(ex, redis.ResponseError) - and "WRONGPASS invalid username-password pair" in str(ex) - ): - raise - # By contrast, we may be fairly confident the exact string - # 'invalid password' won't go away, because redis-py simply wraps - # the exact error from the Redis library. - # https://github.com/andymccurdy/redis-py/blob/master/ - # redis/connection.py#L132 - # Except, apparently sometimes redis-py raises a completely - # different *type* of error for a bad password, - # redis.ResponseError, which is not even derived from - # redis.ConnectionError as redis.AuthenticationError is. - - # Check that we can connect to Redis using the provided password - redis_client = redis.StrictRedis( - host=redis_ip, port=redis_port, password=password - ) - assert redis_client.ping() - - def test_redis_password_cluster(self, password, shutdown_only): - @ray.remote - def f(): - return 1 - - node_args = {"redis_password": password} - cluster = Cluster(initialize_head=True, connect=True, head_node_args=node_args) - cluster.add_node(**node_args) - - object_ref = f.remote() - ray.get(object_ref) - - def test_shutdown_and_reset_global_worker(shutdown_only): ray.init(job_config=ray.job_config.JobConfig(code_search_path=["a"])) ray.shutdown()