forked from ray-project/ray
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_core.py
131 lines (108 loc) · 3.48 KB
/
test_core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import ray
from ray._private.test_utils import wait_for_condition
from ray.autoscaler.v2.tests.util import (
NodeCountCheck,
TotalResourceCheck,
check_cluster,
)
import time
from logger import logger
ray.init("auto")
# Sync with the compute config.
HEAD_NODE_CPU = 0
WORKER_NODE_CPU = 4
IDLE_TERMINATION_S = 60 * 5 # 5 min
DEFAULT_RETRY_INTERVAL_MS = 15 * 1000 # 15 sec
ctx = {
"num_cpus": 0,
"num_nodes": 1,
}
logger.info(f"Starting cluster with {ctx['num_nodes']} nodes, {ctx['num_cpus']} cpus")
check_cluster(
[
NodeCountCheck(ctx["num_nodes"]),
TotalResourceCheck({"CPU": ctx["num_cpus"]}),
]
)
# Request for cluster resources
def test_request_cluster_resources(ctx: dict):
from ray.autoscaler._private.commands import request_resources
request_resources(num_cpus=8)
ctx["num_cpus"] += 8
ctx["num_nodes"] += 8 // WORKER_NODE_CPU
# Assert on number of worker nodes.
logger.info(
f"Requesting cluster constraints: {ctx['num_nodes']} nodes, "
f"{ctx['num_cpus']} cpus"
)
wait_for_condition(
check_cluster,
timeout=60 * 5, # 5min
retry_interval_ms=DEFAULT_RETRY_INTERVAL_MS,
targets=[
NodeCountCheck(ctx["num_nodes"]),
TotalResourceCheck({"CPU": ctx["num_cpus"]}),
],
)
# Reset the cluster constraints.
request_resources(num_cpus=0)
ctx["num_cpus"] -= 8
ctx["num_nodes"] -= 8 // WORKER_NODE_CPU
logger.info(
f"Waiting for cluster go idle after constraint cleared: {ctx['num_nodes']} "
f"nodes, {ctx['num_cpus']} cpus"
)
wait_for_condition(
check_cluster,
timeout=60 + IDLE_TERMINATION_S, # 1min + idle timeout
retry_interval_ms=DEFAULT_RETRY_INTERVAL_MS,
targets=[
NodeCountCheck(ctx["num_nodes"]),
TotalResourceCheck({"CPU": ctx["num_cpus"]}),
],
)
# Run actors/tasks that exceed the cluster resources should upscale the cluster
def test_run_tasks_concurrent(ctx: dict):
num_tasks = 2
num_actors = 2
@ray.remote(num_cpus=WORKER_NODE_CPU)
def f():
while True:
time.sleep(1)
@ray.remote(num_cpus=WORKER_NODE_CPU)
class Actor:
def __init__(self):
pass
tasks = [f.remote() for _ in range(num_tasks)]
actors = [Actor.remote() for _ in range(num_actors)]
ctx["num_cpus"] += (num_tasks + num_actors) * WORKER_NODE_CPU
ctx["num_nodes"] += num_tasks + num_actors
logger.info(f"Waiting for {ctx['num_nodes']} nodes, {ctx['num_cpus']} cpus")
wait_for_condition(
check_cluster,
timeout=60 * 5, # 5min
retry_interval_ms=DEFAULT_RETRY_INTERVAL_MS,
targets=[
NodeCountCheck(ctx["num_nodes"]),
TotalResourceCheck({"CPU": ctx["num_cpus"]}),
],
)
[ray.cancel(task) for task in tasks]
[ray.kill(actor) for actor in actors]
ctx["num_cpus"] -= (num_actors + num_tasks) * WORKER_NODE_CPU
ctx["num_nodes"] -= num_actors + num_tasks
logger.info(
f"Waiting for cluster to scale down to {ctx['num_nodes']} nodes, "
f"{ctx['num_cpus']} cpus"
)
wait_for_condition(
check_cluster,
timeout=60 + IDLE_TERMINATION_S,
retry_interval_ms=DEFAULT_RETRY_INTERVAL_MS,
targets=[
NodeCountCheck(ctx["num_nodes"]),
TotalResourceCheck({"CPU": ctx["num_cpus"]}),
],
)
test_request_cluster_resources(ctx)
test_run_tasks_concurrent(ctx)