|  | 
| 3 | 3 | from __future__ import print_function | 
| 4 | 4 | 
 | 
| 5 | 5 | import time | 
| 6 |  | -import pytest | 
| 7 |  | -import subprocess | 
|  | 6 | + | 
| 8 | 7 | import ray | 
| 9 | 8 | 
 | 
| 10 |  | -REDIS_PORT = 6543 | 
|  | 9 | + | 
|  | 10 | +def setup_module(): | 
|  | 11 | +    if not ray.worker.global_worker.connected: | 
|  | 12 | +        ray.init(num_cpus=1) | 
|  | 13 | + | 
|  | 14 | +    # Finish initializing Ray. Otherwise available_resources() does not | 
|  | 15 | +    # reflect resource use of submitted tasks | 
|  | 16 | +    ray.get(cpu_task.remote(0)) | 
| 11 | 17 | 
 | 
| 12 | 18 | 
 | 
| 13 | 19 | @ray.remote(num_cpus=1) | 
| 14 | 20 | def cpu_task(seconds): | 
| 15 | 21 |     time.sleep(seconds) | 
| 16 | 22 | 
 | 
| 17 | 23 | 
 | 
| 18 |  | -def _get_raylet_pid(raylet_socket): | 
| 19 |  | -    output = subprocess.check_output("ps -a".split(" ")) | 
| 20 |  | -    all_processes_split = output.decode("ascii").split("\n") | 
| 21 |  | -    search_term = "python/ray/core/src/ray/raylet/raylet {}".format( | 
| 22 |  | -        raylet_socket) | 
| 23 |  | -    print([x for x in all_processes_split if search_term in x]) | 
| 24 |  | -    return [ | 
| 25 |  | -        x.strip().split(" ")[0] for x in all_processes_split | 
| 26 |  | -        if search_term in x | 
| 27 |  | -    ][0] | 
| 28 |  | - | 
| 29 |  | - | 
| 30 | 24 | class TestAvailableResources(object): | 
| 31 |  | -    @classmethod | 
| 32 |  | -    def setup_class(cls): | 
| 33 |  | -        if not ray.worker.global_worker.connected: | 
| 34 |  | -            ray.init(num_cpus=1) | 
| 35 |  | - | 
| 36 |  | -        # Finish initializing Ray. Otherwise available_resources() does not | 
| 37 |  | -        # reflect resource use of submitted tasks | 
| 38 |  | -        ray.get(cpu_task.remote(0)) | 
| 39 |  | - | 
| 40 |  | -    @classmethod | 
| 41 |  | -    def teardown_class(cls): | 
| 42 |  | -        ray.shutdown() | 
|  | 25 | +    timeout = 10 | 
| 43 | 26 | 
 | 
| 44 | 27 |     def test_no_tasks(self): | 
| 45 | 28 |         cluster_resources = ray.global_state.cluster_resources() | 
| 46 | 29 |         available_resources = ray.global_state.cluster_resources() | 
| 47 | 30 |         assert cluster_resources == available_resources | 
| 48 | 31 | 
 | 
| 49 |  | -    @pytest.mark.timeout(10) | 
| 50 | 32 |     def test_replenish_resources(self): | 
| 51 | 33 |         cluster_resources = ray.global_state.cluster_resources() | 
| 52 | 34 | 
 | 
| 53 | 35 |         ray.get(cpu_task.remote(0)) | 
| 54 | 36 |         start = time.time() | 
| 55 | 37 |         resources_reset = False | 
| 56 | 38 | 
 | 
| 57 |  | -        while not resources_reset: | 
|  | 39 | +        while not resources_reset and time.time() - start < self.timeout: | 
| 58 | 40 |             resources_reset = ( | 
| 59 | 41 |                 cluster_resources == ray.global_state.available_resources()) | 
| 60 | 42 | 
 | 
| 61 | 43 |         assert resources_reset | 
| 62 | 44 | 
 | 
| 63 |  | -    @pytest.mark.timeout(10) | 
| 64 | 45 |     def test_uses_resources(self): | 
| 65 | 46 |         cluster_resources = ray.global_state.cluster_resources() | 
| 66 | 47 |         task_id = cpu_task.remote(1) | 
| 67 | 48 |         start = time.time() | 
| 68 | 49 |         resource_used = False | 
| 69 | 50 | 
 | 
| 70 |  | -        while not resource_used: | 
|  | 51 | +        while not resource_used and time.time() - start < self.timeout: | 
| 71 | 52 |             available_resources = ray.global_state.available_resources() | 
| 72 | 53 |             resource_used = available_resources[ | 
| 73 | 54 |                 "CPU"] == cluster_resources["CPU"] - 1 | 
| 74 | 55 | 
 | 
| 75 | 56 |         assert resource_used | 
| 76 | 57 | 
 | 
| 77 | 58 |         ray.get(task_id)  # clean up to reset resources | 
| 78 |  | - | 
| 79 |  | - | 
| 80 |  | -class TestMultiNodeState(object): | 
| 81 |  | -    @classmethod | 
| 82 |  | -    def setup_class(cls): | 
| 83 |  | -        subprocess.check_call("ray start --head --redis-port " | 
| 84 |  | -                              "{port} --num-cpus 1 --use-raylet".format( | 
| 85 |  | -                                  port=REDIS_PORT).split(" ")) | 
| 86 |  | -        ray.init(redis_address="localhost:{}".format(REDIS_PORT)) | 
| 87 |  | - | 
| 88 |  | -    @classmethod | 
| 89 |  | -    def teardown_class(cls): | 
| 90 |  | -        subprocess.check_call("ray stop".split(" ")) | 
| 91 |  | -        ray.shutdown() | 
| 92 |  | - | 
| 93 |  | -    @pytest.mark.timeout(20) | 
| 94 |  | -    def test_add_remove_client(self): | 
| 95 |  | -        """Tests client table is correct after node removal.""" | 
| 96 |  | -        clients = ray.global_state.client_table() | 
| 97 |  | -        assert len(clients) == 1 | 
| 98 |  | -        head_raylet_pid = _get_raylet_pid(clients[0]["RayletSocketName"]) | 
| 99 |  | - | 
| 100 |  | -        subprocess.check_call( | 
| 101 |  | -            "ray start --redis-address localhost:{port} " | 
| 102 |  | -            "--num-cpus 1 --use-raylet".format(port=REDIS_PORT).split(" ")) | 
| 103 |  | - | 
| 104 |  | -        clients = ray.global_state.client_table() | 
| 105 |  | -        assert len(clients) == 2 | 
| 106 |  | -        assert sum(cl["Resources"].get("CPU") for cl in clients) == 2 | 
| 107 |  | - | 
| 108 |  | -        worker_raylet_pid = _get_raylet_pid(clients[1]["RayletSocketName"]) | 
| 109 |  | -        assert head_raylet_pid != worker_raylet_pid | 
| 110 |  | - | 
| 111 |  | -        subprocess.check_output(["kill", str(worker_raylet_pid)]) | 
| 112 |  | - | 
| 113 |  | -        # wait for heartbeat | 
| 114 |  | -        while all(cl_entries["IsInsertion"] for cl_entries in clients): | 
| 115 |  | -            clients = ray.global_state.client_table() | 
| 116 |  | -            time.sleep(1) | 
| 117 |  | -        assert sum(cl["Resources"].get("CPU", 0) for cl in clients) == 1 | 
0 commit comments