From e2b2aa5a0fdd3e682dd1fbd62e2ba81b8aa054d2 Mon Sep 17 00:00:00 2001 From: Woosuk Kwon Date: Sun, 1 Sep 2024 23:09:46 -0700 Subject: [PATCH] [TPU] Align worker index with node boundary (#7932) --- vllm/executor/ray_tpu_executor.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/vllm/executor/ray_tpu_executor.py b/vllm/executor/ray_tpu_executor.py index 8f867b1d647a5..8c8b5f741488b 100644 --- a/vllm/executor/ray_tpu_executor.py +++ b/vllm/executor/ray_tpu_executor.py @@ -111,12 +111,40 @@ def _init_workers_ray(self, placement_group: "PlacementGroup", # Else, added to the list of workers. self.workers.append(worker) + logger.debug("workers: %s", self.workers) + logger.debug("driver_dummy_worker: %s", self.driver_dummy_worker) if self.driver_dummy_worker is None: raise ValueError( "Ray does not allocate any TPUs on the driver node. Consider " "adjusting the Ray placement group or running the driver on a " "TPU node.") + worker_ips = [ + ray.get(worker.get_node_ip.remote()) # type: ignore[attr-defined] + for worker in self.workers + ] + ip_counts: Dict[str, int] = {} + for ip in worker_ips: + ip_counts[ip] = ip_counts.get(ip, 0) + 1 + + def sort_by_driver_then_worker_ip(worker): + """ + Sort the workers based on 3 properties: + 1. If the worker is on the same node as the driver (vllm engine), + it should be placed first. + 2. Then, if the worker is on a node with fewer workers, it should + be placed first. + 3. Finally, if the work is on a node with smaller IP address, it + should be placed first. + """ + ip = ray.get(worker.get_node_ip.remote()) + return (ip != driver_ip, ip_counts[ip], ip) + + # After sorting, the workers on the same node will be + # close to each other, and the workers on the driver + # node will be placed first. + self.workers = sorted(self.workers, key=sort_by_driver_then_worker_ip) + # Get the set of TPU IDs used on each node. worker_node_and_gpu_ids = self._run_workers("get_node_and_gpu_ids", use_dummy_driver=True)