Skip to content

Commit

Permalink
[Core] Fix task submission never return when network partition happens (
Browse files Browse the repository at this point in the history
ray-project#44692)

This fixes the problem that PushTask() grpc call is hanging when network partition happens. This grpc call hang because by default grpc sends two ping frames and then it won't send anything if no data frame sent. Meanwhile, the worker node was taking a force shutdown.

Signed-off-by: hongchaodeng <hongchaodeng1@gmail.com>
  • Loading branch information
hongchaodeng authored Apr 26, 2024
1 parent 6d6d0a1 commit b0a0d34
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 52 deletions.
1 change: 1 addition & 0 deletions python/ray/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ py_test_module_list(
files = [
"test_gcs_ha_e2e.py",
"test_gcs_ha_e2e_2.py",
"test_network_failure_e2e.py",
],
size = "medium",
tags = ["exclusive", "ha_integration", "team:core"],
Expand Down
113 changes: 63 additions & 50 deletions python/ray/tests/conftest_docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,64 +79,77 @@ def print_logs(self):
worker_node_vol = volume()
head_node_container_name = "gcs" + str(int(time.time()))

head_node = container(
image="rayproject/ray:ha_integration",
name=head_node_container_name,
network="{gcs_network.name}",
command=[
"ray",
"start",
"--head",
"--block",
"--num-cpus",
"0",
# Fix the port of raylet to make sure raylet restarts at the same
# ip:port is treated as a different raylet.
"--node-manager-port",
"9379",
],
volumes={"{head_node_vol.name}": {"bind": "/tmp", "mode": "rw"}},
environment={

def gen_head_node(envs):
return container(
image="rayproject/ray:ha_integration",
name=head_node_container_name,
network="{gcs_network.name}",
command=[
"ray",
"start",
"--head",
"--block",
"--num-cpus",
"0",
# Fix the port of raylet to make sure raylet restarts at the same
# ip:port is treated as a different raylet.
"--node-manager-port",
"9379",
],
volumes={"{head_node_vol.name}": {"bind": "/tmp", "mode": "rw"}},
environment=envs,
wrapper_class=Container,
ports={
"8000/tcp": None,
},
# volumes={
# "/tmp/ray/": {"bind": "/tmp/ray/", "mode": "rw"}
# },
)


def gen_worker_node(envs):
return container(
image="rayproject/ray:ha_integration",
network="{gcs_network.name}",
command=[
"ray",
"start",
"--address",
f"{head_node_container_name}:6379",
"--block",
# Fix the port of raylet to make sure raylet restarts at the same
# ip:port is treated as a different raylet.
"--node-manager-port",
"9379",
],
volumes={"{worker_node_vol.name}": {"bind": "/tmp", "mode": "rw"}},
environment=envs,
wrapper_class=Container,
ports={
"8000/tcp": None,
},
# volumes={
# "/tmp/ray/": {"bind": "/tmp/ray/", "mode": "rw"}
# },
)


head_node = gen_head_node(
{
"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379",
"RAY_raylet_client_num_connect_attempts": "10",
"RAY_raylet_client_connect_timeout_milliseconds": "100",
},
wrapper_class=Container,
ports={
"8000/tcp": None,
},
# volumes={
# "/tmp/ray/": {"bind": "/tmp/ray/", "mode": "rw"}
# },
}
)

worker_node = container(
image="rayproject/ray:ha_integration",
network="{gcs_network.name}",
command=[
"ray",
"start",
"--address",
f"{head_node_container_name}:6379",
"--block",
# Fix the port of raylet to make sure raylet restarts at the same
# ip:port is treated as a different raylet.
"--node-manager-port",
"9379",
],
volumes={"{worker_node_vol.name}": {"bind": "/tmp", "mode": "rw"}},
environment={
worker_node = gen_worker_node(
{
"RAY_REDIS_ADDRESS": "{redis.ips.primary}:6379",
"RAY_raylet_client_num_connect_attempts": "10",
"RAY_raylet_client_connect_timeout_milliseconds": "100",
},
wrapper_class=Container,
ports={
"8000/tcp": None,
},
# volumes={
# "/tmp/ray/": {"bind": "/tmp/ray/", "mode": "rw"}
# },
}
)


Expand Down
133 changes: 133 additions & 0 deletions python/ray/tests/test_network_failure_e2e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import sys
import json

from time import sleep
import pytest
from ray._private.test_utils import wait_for_condition
from ray.tests.conftest_docker import * # noqa
from ray.tests.conftest_docker import gen_head_node, gen_worker_node


SLEEP_TASK_SCRIPTS = """
import ray
ray.init()
@ray.remote(max_retries=-1)
def f():
import time
time.sleep(10000)
ray.get([f.remote() for _ in range(2)])
"""

head = gen_head_node(
{
"RAY_grpc_keepalive_time_ms": "1000",
"RAY_grpc_client_keepalive_time_ms": "1000",
"RAY_grpc_client_keepalive_timeout_ms": "1000",
"RAY_health_check_initial_delay_ms": "1000",
"RAY_health_check_period_ms": "1000",
"RAY_health_check_timeout_ms": "1000",
"RAY_health_check_failure_threshold": "2",
}
)

worker = gen_worker_node(
{
"RAY_grpc_keepalive_time_ms": "1000",
"RAY_grpc_client_keepalive_time_ms": "1000",
"RAY_grpc_client_keepalive_timeout_ms": "1000",
"RAY_health_check_initial_delay_ms": "1000",
"RAY_health_check_period_ms": "1000",
"RAY_health_check_timeout_ms": "1000",
"RAY_health_check_failure_threshold": "2",
}
)


def test_network_task_submit(head, worker, gcs_network):
network = gcs_network
# https://docker-py.readthedocs.io/en/stable/containers.html#docker.models.containers.Container.exec_run
head.exec_run(
cmd=f"python -c '{SLEEP_TASK_SCRIPTS}'",
detach=True,
environment=[
"RAY_grpc_client_keepalive_time_ms=1000",
"RAY_grpc_client_keepalive_timeout_ms=1000",
],
)

def check_task_running(n=None):
output = head.exec_run(cmd="ray list tasks --format json")
if output.exit_code == 0:
tasks_json = json.loads(output.output)
print("tasks_json:", json.dumps(tasks_json, indent=2))
if n is not None and n != len(tasks_json):
return False
return all([task["state"] == "RUNNING" for task in tasks_json])
return False

# list_task make sure all tasks are running
wait_for_condition(lambda: check_task_running(2))

# Previously grpc client will only send 2 ping frames when there is no
# data/header frame to be sent.
# keepalive interval is 1s. So after 3s it wouldn't send anything and
# failed the test previously.
sleep(3)

# partition the network between head and worker
# https://docker-py.readthedocs.io/en/stable/networks.html#docker.models.networks.Network.disconnect
network.disconnect(worker.name)
print("Disconnected network")

def check_dead_node():
output = head.exec_run(cmd="ray list nodes --format json")
if output.exit_code == 0:
nodes_json = json.loads(output.output)
print("nodes_json:", json.dumps(nodes_json, indent=2))
for node in nodes_json:
if node["state"] == "DEAD" and not node["is_head_node"]:
return True
return False

wait_for_condition(check_dead_node)
print("observed node died")

# Previously under network partition, the tasks would stay in RUNNING state
# and hanging forever.
# We write this test to check that.
def check_task_not_running():
output = head.exec_run(cmd="ray list tasks --format json")
if output.exit_code == 0:
tasks_json = json.loads(output.output)
print("tasks_json:", json.dumps(tasks_json, indent=2))
return all([task["state"] != "RUNNING" for task in tasks_json])
return False

# we set num_cpus=0 for head node.
# which ensures no task was scheduled on the head node.
wait_for_condition(check_task_not_running)

# After the fix, we should observe that the tasks are not running.
# `ray list tasks` would show two FAILED and
# two PENDING_NODE_ASSIGNMENT states.

def check_task_pending(n=0):
output = head.exec_run(cmd="ray list tasks --format json")
if output.exit_code == 0:
tasks_json = json.loads(output.output)
print("tasks_json:", json.dumps(tasks_json, indent=2))
return n == sum(
[task["state"] == "PENDING_NODE_ASSIGNMENT" for task in tasks_json]
)
return False

wait_for_condition(lambda: check_task_pending(2))


if __name__ == "__main__":
import os

if os.environ.get("PARALLEL_CI"):
sys.exit(pytest.main(["-n", "auto", "--boxed", "-vs", __file__]))
else:
sys.exit(pytest.main(["-sv", __file__]))
1 change: 1 addition & 0 deletions src/ray/common/grpc_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ inline grpc::ChannelArguments CreateDefaultChannelArguments() {
::RayConfig::instance().grpc_client_keepalive_time_ms());
arguments.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS,
::RayConfig::instance().grpc_client_keepalive_timeout_ms());
arguments.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
}
arguments.SetInt(GRPC_ARG_CLIENT_IDLE_TIMEOUT_MS,
::RayConfig::instance().grpc_client_idle_timeout_ms());
Expand Down
9 changes: 7 additions & 2 deletions src/ray/rpc/grpc_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ void GrpcServer::Run() {
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_TIMEOUT_MS,
RayConfig::instance().grpc_keepalive_timeout_ms());
builder.AddChannelArgument(GRPC_ARG_KEEPALIVE_PERMIT_WITHOUT_CALLS, 0);
builder.AddChannelArgument(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, 0);
builder.AddChannelArgument(GRPC_ARG_HTTP2_WRITE_BUFFER_SIZE,
RayConfig::instance().grpc_stream_buffer_size());
// NOTE(rickyyx): This argument changes how frequent the gRPC server expects a keepalive
Expand All @@ -81,8 +82,12 @@ void GrpcServer::Run() {
// https://github.com/ray-project/ray/blob/releases/2.0.0/python/ray/_private/gcs_utils.py#L72
// Setting this value larger will trigger GOAWAY from the gRPC server to be sent to the
// client to back-off keepalive pings. (https://github.com/ray-project/ray/issues/25367)
builder.AddChannelArgument(GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS,
60000);
builder.AddChannelArgument(
GRPC_ARG_HTTP2_MIN_RECV_PING_INTERVAL_WITHOUT_DATA_MS,
// If the `client_keepalive_time` is smaller than this, the client will receive
// "too many pings" error and crash.
std::min(static_cast<int64_t>(60000),
RayConfig::instance().grpc_client_keepalive_time_ms()));
if (RayConfig::instance().USE_TLS()) {
// Create credentials from locations specified in config
std::string rootcert = ReadCert(RayConfig::instance().TLS_CA_CERT());
Expand Down

0 comments on commit b0a0d34

Please sign in to comment.