Skip to content

Commit d8293bf

Browse files
committed
[core] Make RPC Chaos Configurations More Readable as JSON
Signed-off-by: dancingactor <s990346@gmail.com>
1 parent 8147683 commit d8293bf

10 files changed

+195
-50
lines changed

python/ray/tests/test_actor_lineage_reconstruction.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import gc
2+
import json
23
import os
34
import signal
45
import sys
@@ -26,10 +27,25 @@ def test_actor_reconstruction_triggered_by_lineage_reconstruction(
2627
# -> actor is permanently dead when there is no reference.
2728
# This test also injects network failure to make sure relevant rpcs are retried.
2829
failure = RPC_FAILURE_MAP[deterministic_failure]
30+
parts = failure.split(":")
2931
monkeypatch.setenv(
3032
"RAY_testing_rpc_failure",
31-
f"ray::rpc::ActorInfoGcsService.grpc_client.RestartActorForLineageReconstruction=1:{failure},"
32-
f"ray::rpc::ActorInfoGcsService.grpc_client.ReportActorOutOfScope=1:{failure}",
33+
json.dumps(
34+
{
35+
"ray::rpc::ActorInfoGcsService.grpc_client.RestartActorForLineageReconstruction": {
36+
"num_failures": 1,
37+
"req_failure_prob": int(parts[0]),
38+
"resp_failure_prob": int(parts[1]),
39+
"in_flight_failure_prob": int(parts[2]),
40+
},
41+
"ray::rpc::ActorInfoGcsService.grpc_client.ReportActorOutOfScope": {
42+
"num_failures": 1,
43+
"req_failure_prob": int(parts[0]),
44+
"resp_failure_prob": int(parts[1]),
45+
"in_flight_failure_prob": int(parts[2]),
46+
},
47+
}
48+
),
3349
)
3450
cluster = ray_start_cluster
3551
cluster.add_node(resources={"head": 1})

python/ray/tests/test_core_worker_fault_tolerance.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import sys
23

34
import numpy as np
@@ -15,6 +16,20 @@
1516
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
1617

1718

19+
def create_failure_json(method, num_failures, failure_str):
20+
parts = failure_str.split(":")
21+
return json.dumps(
22+
{
23+
method: {
24+
"num_failures": num_failures,
25+
"req_failure_prob": int(parts[0]),
26+
"resp_failure_prob": int(parts[1]),
27+
"in_flight_failure_prob": int(parts[2]),
28+
}
29+
}
30+
)
31+
32+
1833
@pytest.mark.parametrize(
1934
"allow_out_of_order_execution",
2035
[True, False],
@@ -30,7 +45,7 @@ def test_push_actor_task_failure(
3045
failure = RPC_FAILURE_MAP[deterministic_failure]
3146
m.setenv(
3247
"RAY_testing_rpc_failure",
33-
f"CoreWorkerService.grpc_client.PushTask=2:{failure}",
48+
create_failure_json("CoreWorkerService.grpc_client.PushTask", 2, failure),
3449
)
3550
m.setenv("RAY_actor_scheduling_queue_max_reorder_wait_seconds", "0")
3651
cluster = ray_start_cluster
@@ -60,7 +75,9 @@ def test_update_object_location_batch_failure(
6075
failure = RPC_FAILURE_MAP[deterministic_failure]
6176
m.setenv(
6277
"RAY_testing_rpc_failure",
63-
f"CoreWorkerService.grpc_client.UpdateObjectLocationBatch=1:{failure}",
78+
create_failure_json(
79+
"CoreWorkerService.grpc_client.UpdateObjectLocationBatch", 1, failure
80+
),
6481
)
6582
cluster = ray_start_cluster
6683
head_node_id = cluster.add_node(
@@ -102,7 +119,7 @@ def test_get_object_status_rpc_retry_and_idempotency(
102119
failure = RPC_FAILURE_MAP[deterministic_failure]
103120
monkeypatch.setenv(
104121
"RAY_testing_rpc_failure",
105-
f"CoreWorkerService.grpc_client.GetObjectStatus=1:{failure}",
122+
create_failure_json("CoreWorkerService.grpc_client.GetObjectStatus", 1, failure),
106123
)
107124

108125
ray.init()
@@ -134,7 +151,9 @@ def test_wait_for_actor_ref_deleted_rpc_retry_and_idempotency(
134151
failure = RPC_FAILURE_MAP[deterministic_failure]
135152
monkeypatch.setenv(
136153
"RAY_testing_rpc_failure",
137-
f"CoreWorkerService.grpc_client.WaitForActorRefDeleted=1:{failure}",
154+
create_failure_json(
155+
"CoreWorkerService.grpc_client.WaitForActorRefDeleted", 1, failure
156+
),
138157
)
139158

140159
ray.init()
@@ -172,7 +191,7 @@ def inject_cancel_remote_task_rpc_failure(monkeypatch, request):
172191
failure = RPC_FAILURE_MAP[deterministic_failure]
173192
monkeypatch.setenv(
174193
"RAY_testing_rpc_failure",
175-
f"CoreWorkerService.grpc_client.CancelRemoteTask=1:{failure}",
194+
create_failure_json("CoreWorkerService.grpc_client.CancelRemoteTask", 1, failure),
176195
)
177196

178197

@@ -213,7 +232,8 @@ def remote_wait(sg):
213232
def test_double_borrowing_with_rpc_failure(monkeypatch, shutdown_only):
214233
"""Regression test for https://github.com/ray-project/ray/issues/57997"""
215234
monkeypatch.setenv(
216-
"RAY_testing_rpc_failure", "CoreWorkerService.grpc_client.PushTask=3:0:100:0"
235+
"RAY_testing_rpc_failure",
236+
create_failure_json("CoreWorkerService.grpc_client.PushTask", 3, "0:100:0"),
217237
)
218238

219239
ray.init()

python/ray/tests/test_gcs_fault_tolerance.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import signal
44
import subprocess
55
import sys
6+
import json
67
import tempfile
78
import time
89
from concurrent.futures import ThreadPoolExecutor
@@ -1261,7 +1262,16 @@ def test_mark_job_finished_rpc_retry_and_idempotency(shutdown_only, monkeypatch)
12611262
# We inject request failures to force retries and test idempotency
12621263
monkeypatch.setenv(
12631264
"RAY_testing_rpc_failure",
1264-
"ray::rpc::JobInfoGcsService.grpc_client.MarkJobFinished=3:50:0:0",
1265+
json.dumps(
1266+
{
1267+
"ray::rpc::JobInfoGcsService.grpc_client.MarkJobFinished": {
1268+
"num_failures": 3,
1269+
"req_failure_prob": 50,
1270+
"resp_failure_prob": 0,
1271+
"in_flight_failure_prob": 0,
1272+
}
1273+
}
1274+
),
12651275
)
12661276

12671277
ray.init(num_cpus=1)

python/ray/tests/test_gcs_utils.py

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import contextlib
3+
import json
34
import os
45
import signal
56
import sys
@@ -109,8 +110,22 @@ def test_kv_timeout(ray_start_regular):
109110
def test_kv_transient_network_error(shutdown_only, monkeypatch):
110111
monkeypatch.setenv(
111112
"RAY_testing_rpc_failure",
112-
"ray::rpc::InternalKVGcsService.grpc_client.InternalKVGet=5:25:25:25,"
113-
"ray::rpc::InternalKVGcsService.grpc_client.InternalKVPut=5:25:25:25",
113+
json.dumps(
114+
{
115+
"ray::rpc::InternalKVGcsService.grpc_client.InternalKVGet": {
116+
"num_failures": 5,
117+
"req_failure_prob": 25,
118+
"resp_failure_prob": 25,
119+
"in_flight_failure_prob": 25,
120+
},
121+
"ray::rpc::InternalKVGcsService.grpc_client.InternalKVPut": {
122+
"num_failures": 5,
123+
"req_failure_prob": 25,
124+
"resp_failure_prob": 25,
125+
"in_flight_failure_prob": 25,
126+
},
127+
}
128+
),
114129
)
115130
ray.init()
116131
gcs_address = ray._private.worker.global_worker.gcs_client.address

python/ray/tests/test_object_manager_fault_tolerance.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import sys
23

34
import numpy as np
@@ -13,14 +14,28 @@
1314
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy
1415

1516

17+
def create_failure_json(method, num_failures, failure_str):
18+
parts = failure_str.split(":")
19+
return json.dumps(
20+
{
21+
method: {
22+
"num_failures": num_failures,
23+
"req_failure_prob": int(parts[0]),
24+
"resp_failure_prob": int(parts[1]),
25+
"in_flight_failure_prob": int(parts[2]),
26+
}
27+
}
28+
)
29+
30+
1631
@pytest.mark.parametrize("deterministic_failure", RPC_FAILURE_TYPES)
1732
def test_free_objects_idempotent(
1833
monkeypatch, shutdown_only, deterministic_failure, ray_start_cluster
1934
):
2035
failure = RPC_FAILURE_MAP[deterministic_failure]
2136
monkeypatch.setenv(
2237
"RAY_testing_rpc_failure",
23-
f"ObjectManagerService.grpc_client.FreeObjects=1:{failure}",
38+
create_failure_json("ObjectManagerService.grpc_client.FreeObjects", 1, failure),
2439
)
2540

2641
@ray.remote

python/ray/tests/test_raylet_fault_tolerance.py

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import os
23
import sys
34

@@ -19,14 +20,30 @@
1920
import psutil
2021

2122

23+
def create_failure_json(method, num_failures, failure_str):
24+
parts = failure_str.split(":")
25+
return json.dumps(
26+
{
27+
method: {
28+
"num_failures": num_failures,
29+
"req_failure_prob": int(parts[0]),
30+
"resp_failure_prob": int(parts[1]),
31+
"in_flight_failure_prob": int(parts[2]),
32+
}
33+
}
34+
)
35+
36+
2237
@pytest.mark.parametrize("deterministic_failure", RPC_FAILURE_TYPES)
2338
def test_request_worker_lease_idempotent(
2439
monkeypatch, shutdown_only, deterministic_failure, ray_start_cluster
2540
):
2641
failure = RPC_FAILURE_MAP[deterministic_failure]
2742
monkeypatch.setenv(
2843
"RAY_testing_rpc_failure",
29-
f"NodeManagerService.grpc_client.RequestWorkerLease=1:{failure}",
44+
create_failure_json(
45+
"NodeManagerService.grpc_client.RequestWorkerLease", 1, failure
46+
),
3047
)
3148

3249
@ray.remote
@@ -61,7 +78,16 @@ def test_drain_node_idempotent(monkeypatch, shutdown_only, ray_start_cluster):
6178
# NOTE: not testing response failure since the node is already marked as draining and shuts down gracefully.
6279
monkeypatch.setenv(
6380
"RAY_testing_rpc_failure",
64-
"NodeManagerService.grpc_client.DrainRaylet=1:100:0:0",
81+
json.dumps(
82+
{
83+
"NodeManagerService.grpc_client.DrainRaylet": {
84+
"num_failures": 1,
85+
"req_failure_prob": 100,
86+
"resp_failure_prob": 0,
87+
"in_flight_failure_prob": 0,
88+
}
89+
}
90+
),
6591
)
6692

6793
cluster = ray_start_cluster
@@ -99,10 +125,25 @@ def node_is_dead():
99125
def inject_release_unused_bundles_rpc_failure(monkeypatch, request):
100126
deterministic_failure = request.param
101127
failure = RPC_FAILURE_MAP[deterministic_failure]
128+
parts = failure.split(":")
102129
monkeypatch.setenv(
103130
"RAY_testing_rpc_failure",
104-
f"NodeManagerService.grpc_client.ReleaseUnusedBundles=1:{failure}"
105-
+ ",NodeManagerService.grpc_client.CancelResourceReserve=-1:100:0:0",
131+
json.dumps(
132+
{
133+
"NodeManagerService.grpc_client.ReleaseUnusedBundles": {
134+
"num_failures": 1,
135+
"req_failure_prob": int(parts[0]),
136+
"resp_failure_prob": int(parts[1]),
137+
"in_flight_failure_prob": int(parts[2]),
138+
},
139+
"NodeManagerService.grpc_client.CancelResourceReserve": {
140+
"num_failures": -1,
141+
"req_failure_prob": 100,
142+
"resp_failure_prob": 0,
143+
"in_flight_failure_prob": 0,
144+
},
145+
}
146+
),
106147
)
107148

108149

@@ -155,7 +196,9 @@ def inject_notify_gcs_restart_rpc_failure(monkeypatch, request):
155196
failure = RPC_FAILURE_MAP[deterministic_failure]
156197
monkeypatch.setenv(
157198
"RAY_testing_rpc_failure",
158-
f"NodeManagerService.grpc_client.NotifyGCSRestart=1:{failure}",
199+
create_failure_json(
200+
"NodeManagerService.grpc_client.NotifyGCSRestart", 1, failure
201+
),
159202
)
160203

161204

@@ -215,7 +258,16 @@ def test_kill_local_actor_rpc_retry_and_idempotency(monkeypatch, shutdown_only):
215258

216259
monkeypatch.setenv(
217260
"RAY_testing_rpc_failure",
218-
"NodeManagerService.grpc_client.KillLocalActor=1:100:0:0",
261+
json.dumps(
262+
{
263+
"NodeManagerService.grpc_client.KillLocalActor": {
264+
"num_failures": 1,
265+
"req_failure_prob": 100,
266+
"resp_failure_prob": 0,
267+
"in_flight_failure_prob": 0,
268+
}
269+
}
270+
),
219271
)
220272

221273
ray.init()

python/ray/tests/test_streaming_generator_4.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import gc
3+
import json
34
import os
45
import random
56
import signal
@@ -190,7 +191,16 @@ def test_many_tasks_lineage_reconstruction_mini_stress_test(
190191
)
191192
m.setenv(
192193
"RAY_testing_rpc_failure",
193-
"CoreWorkerService.grpc_client.ReportGeneratorItemReturns=5:25:25:25",
194+
json.dumps(
195+
{
196+
"CoreWorkerService.grpc_client.ReportGeneratorItemReturns": {
197+
"num_failures": 5,
198+
"req_failure_prob": 25,
199+
"resp_failure_prob": 25,
200+
"in_flight_failure_prob": 25,
201+
}
202+
}
203+
),
194204
)
195205
cluster = ray_start_cluster
196206
cluster.add_node(

src/ray/rpc/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ ray_cc_library(
9191
hdrs = ["rpc_chaos.h"],
9292
visibility = ["//visibility:public"],
9393
deps = [
94+
"//bazel:nlohmann_json",
9495
"//src/ray/common:ray_config",
9596
"@com_google_absl//absl/container:flat_hash_map",
9697
"@com_google_absl//absl/synchronization",

0 commit comments

Comments
 (0)