diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 47daba04f21b3..7969e8b26b316 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -325,6 +325,7 @@ py_test_module_list( files = [ "test_gcs_ha_e2e.py", "test_gcs_ha_e2e_2.py", + "test_network_failure_e2e.py", ], size = "medium", tags = ["exclusive", "ha_integration", "team:core"], diff --git a/python/ray/tests/conftest_docker.py b/python/ray/tests/conftest_docker.py index 914f19732ff79..52eebb9d6a19a 100644 --- a/python/ray/tests/conftest_docker.py +++ b/python/ray/tests/conftest_docker.py @@ -79,64 +79,77 @@ def print_logs(self): worker_node_vol = volume() head_node_container_name = "gcs" + str(int(time.time())) -head_node = container( - image="rayproject/ray:ha_integration", - name=head_node_container_name, - network="{gcs_network.name}", - command=[ - "ray", - "start", - "--head", - "--block", - "--num-cpus", - "0", - # Fix the port of raylet to make sure raylet restarts at the same - # ip:port is treated as a different raylet. - "--node-manager-port", - "9379", - ], - volumes={"{head_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, - environment={ + +def gen_head_node(envs): + return container( + image="rayproject/ray:ha_integration", + name=head_node_container_name, + network="{gcs_network.name}", + command=[ + "ray", + "start", + "--head", + "--block", + "--num-cpus", + "0", + # Fix the port of raylet to make sure raylet restarts at the same + # ip:port is treated as a different raylet. + "--node-manager-port", + "9379", + ], + volumes={"{head_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, + environment=envs, + wrapper_class=Container, + ports={ + "8000/tcp": None, + }, + # volumes={ + # "/tmp/ray/": {"bind": "/tmp/ray/", "mode": "rw"} + # }, + ) + + +def gen_worker_node(envs): + return container( + image="rayproject/ray:ha_integration", + network="{gcs_network.name}", + command=[ + "ray", + "start", + "--address", + f"{head_node_container_name}:6379", + "--block", + # Fix the port of raylet to make sure raylet restarts at the same + # ip:port is treated as a different raylet. + "--node-manager-port", + "9379", + ], + volumes={"{worker_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, + environment=envs, + wrapper_class=Container, + ports={ + "8000/tcp": None, + }, + # volumes={ + # "/tmp/ray/": {"bind": "/tmp/ray/", "mode": "rw"} + # }, + ) + + +head_node = gen_head_node( + { "RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379", "RAY_raylet_client_num_connect_attempts": "10", "RAY_raylet_client_connect_timeout_milliseconds": "100", - }, - wrapper_class=Container, - ports={ - "8000/tcp": None, - }, - # volumes={ - # "/tmp/ray/": {"bind": "/tmp/ray/", "mode": "rw"} - # }, + } ) -worker_node = container( - image="rayproject/ray:ha_integration", - network="{gcs_network.name}", - command=[ - "ray", - "start", - "--address", - f"{head_node_container_name}:6379", - "--block", - # Fix the port of raylet to make sure raylet restarts at the same - # ip:port is treated as a different raylet. - "--node-manager-port", - "9379", - ], - volumes={"{worker_node_vol.name}": {"bind": "/tmp", "mode": "rw"}}, - environment={ +worker_node = gen_worker_node( + { "RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379", "RAY_raylet_client_num_connect_attempts": "10", "RAY_raylet_client_connect_timeout_milliseconds": "100", - }, - wrapper_class=Container, - ports={ - "8000/tcp": None, - }, - # volumes={ - # "/tmp/ray/": {"bind": "/tmp/ray/", "mode": "rw"} - # }, + } ) diff --git a/python/ray/tests/test_network_failure_e2e.py b/python/ray/tests/test_network_failure_e2e.py new file mode 100644 index 0000000000000..c139cf1e4e99c --- /dev/null +++ b/python/ray/tests/test_network_failure_e2e.py @@ -0,0 +1,133 @@ +import sys +import json + +from time import sleep +import pytest +from ray._private.test_utils import wait_for_condition +from ray.tests.conftest_docker import * # noqa +from ray.tests.conftest_docker import gen_head_node, gen_worker_node + + +SLEEP_TASK_SCRIPTS = """ +import ray +ray.init() +@ray.remote(max_retries=-1) +def f(): + import time + time.sleep(10000) +ray.get([f.remote() for _ in range(2)]) +""" + +head = gen_head_node( + { + "RAY_grpc_keepalive_time_ms": "1000", + "RAY_grpc_client_keepalive_time_ms": "1000", + "RAY_grpc_client_keepalive_timeout_ms": "1000", + "RAY_health_check_initial_delay_ms": "1000", + "RAY_health_check_period_ms": "1000", + "RAY_health_check_timeout_ms": "1000", + "RAY_health_check_failure_threshold": "2", + } +) + +worker = gen_worker_node( + { + "RAY_grpc_keepalive_time_ms": "1000", + "RAY_grpc_client_keepalive_time_ms": "1000", + "RAY_grpc_client_keepalive_timeout_ms": "1000", + "RAY_health_check_initial_delay_ms": "1000", + "RAY_health_check_period_ms": "1000", + "RAY_health_check_timeout_ms": "1000", + "RAY_health_check_failure_threshold": "2", + } +) + + +def test_network_task_submit(head, worker, gcs_network): + network = gcs_network + # https://docker-py.readthedocs.io/en/stable/containers.html#docker.models.containers.Container.exec_run + head.exec_run( + cmd=f"python -c '{SLEEP_TASK_SCRIPTS}'", + detach=True, + environment=[ + "RAY_grpc_client_keepalive_time_ms=1000", + "RAY_grpc_client_keepalive_timeout_ms=1000", + ], + ) + + def check_task_running(n=None): + output = head.exec_run(cmd="ray list tasks --format json") + if output.exit_code == 0: + tasks_json = json.loads(output.output) + print("tasks_json:", json.dumps(tasks_json, indent=2)) + if n is not None and n != len(tasks_json): + return False + return all([task["state"] == "RUNNING" for task in tasks_json]) + return False + + # list_task make sure all tasks are running + wait_for_condition(lambda: check_task_running(2)) + + # Previously grpc client will only send 2 ping frames when there is no + # data/header frame to be sent. + # keepalive interval is 1s. So after 3s it wouldn't send anything and + # failed the test previously. + sleep(3) + + # partition the network between head and worker + # https://docker-py.readthedocs.io/en/stable/networks.html#docker.models.networks.Network.disconnect + network.disconnect(worker.name) + print("Disconnected network") + + def check_dead_node(): + output = head.exec_run(cmd="ray list nodes --format json") + if output.exit_code == 0: + nodes_json = json.loads(output.output) + print("nodes_json:", json.dumps(nodes_json, indent=2)) + for node in nodes_json: + if node["state"] == "DEAD" and not node["is_head_node"]: + return True + return False + + wait_for_condition(check_dead_node) + print("observed node died") + + # Previously under network partition, the tasks would stay in RUNNING state + # and hanging forever. + # We write this test to check that. + def check_task_not_running(): + output = head.exec_run(cmd="ray list tasks --format json") + if output.exit_code == 0: + tasks_json = json.loads(output.output) + print("tasks_json:", json.dumps(tasks_json, indent=2)) + return all([task["state"] != "RUNNING" for task in tasks_json]) + return False + + # we set num_cpus=0 for head node. + # which ensures no task was scheduled on the head node. + wait_for_condition(check_task_not_running) + + # After the fix, we should observe that the tasks are not running. + # `ray list tasks` would show two FAILED and + # two PENDING_NODE_ASSIGNMENT states. + + def check_task_pending(n=0): + output = head.exec_run(cmd="ray list tasks --format json") + if output.exit_code == 0: + tasks_json = json.loads(output.output) + print("tasks_json:", json.dumps(tasks_json, indent=2)) + return n == sum( + [task["state"] == "PENDING_NODE_ASSIGNMENT" for task in tasks_json] + ) + return False + + wait_for_condition(lambda: check_task_pending(2)) + + +if __name__ == "__main__": + import os + + if os.environ.get("PARALLEL_CI"): + sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__])) + else: + sys.exit(pytest.main(["-sv", __file__])) diff --git a/src/ray/common/grpc_util.h b/src/ray/common/grpc_util.h index 8cccd5250f00a..4ac4ac235d547 100644 --- a/src/ray/common/grpc_util.h +++ b/src/ray/common/grpc_util.h @@ -157,6 +157,7 @@ inline grpc::ChannelArguments CreateDefaultChannelArguments() { ::RayConfig::instance().grpc_client_keepalive_time_ms()); arguments.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, ::RayConfig::instance().grpc_client_keepalive_timeout_ms()); + arguments.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); } arguments.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS, ::RayConfig::instance().grpc_client_idle_timeout_ms()); diff --git a/src/ray/rpc/grpc_server.cc b/src/ray/rpc/grpc_server.cc index 0143bc39ee942..d56de646e373f 100644 --- a/src/ray/rpc/grpc_server.cc +++ b/src/ray/rpc/grpc_server.cc @@ -73,6 +73,7 @@ void GrpcServer::Run() { builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, RayConfig::instance().grpc_keepalive_timeout_ms()); builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 0); + builder.AddChannelArgument(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0); builder.AddChannelArgument(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE, RayConfig::instance().grpc_stream_buffer_size()); // NOTE(rickyyx): This argument changes how frequent the gRPC server expects a keepalive @@ -81,8 +82,12 @@ void GrpcServer::Run() { // https://github.com/ray-project/ray/blob/releases/2.0.0/python/ray/_private/gcs_utils.py#L72 // Setting this value larger will trigger GOAWAY from the gRPC server to be sent to the // client to back-off keepalive pings. (https://github.com/ray-project/ray/issues/25367) - builder.AddChannelArgument(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, - 60000); + builder.AddChannelArgument( + GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS, + // If the `client_keepalive_time` is smaller than this, the client will receive + // "too many pings" error and crash. + std::min(static_cast(60000), + RayConfig::instance().grpc_client_keepalive_time_ms())); if (RayConfig::instance().USE_TLS()) { // Create credentials from locations specified in config std::string rootcert = ReadCert(RayConfig::instance().TLS_CA_CERT());