Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[air/benchmark] Torch benchmarks for 4x4 #26692

Merged
merged 29 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Correctly set GPU IDs
Signed-off-by: Kai Fricke <kai@anyscale.com>
  • Loading branch information
Kai Fricke committed Jul 19, 2022
commit 8d95a977b04298a2145ee3ee644c36ae5cb35fea
46 changes: 46 additions & 0 deletions release/air_tests/air_benchmarks/workloads/benchmark_util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import os
import socket
import subprocess
from collections import defaultdict
from contextlib import closing
from pathlib import Path

import ray
Expand Down Expand Up @@ -101,3 +104,46 @@ def run_fn_on_actors(
for actor in actors:
futures.append(actor.run_fn.remote(fn, *args, **kwargs))
return ray.get(futures)


def get_ip_port():
ip = ray.util.get_node_ip_address()
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(("localhost", 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
port = s.getsockname()[1]
return ip, port


def get_gpu_ids():
return ray.get_gpu_ids()


def map_ips_to_gpus(ips: List[str], gpus: List[List[int]]):
assert len(ips) == len(gpus)

map = defaultdict(set)
for ip, gpu in zip(ips, gpus):
map[ip].update(set(gpu))
return {ip: sorted(gpus) for ip, gpus in map.items()}


def set_cuda_visible_devices(
actors: List[ray.actor.ActorHandle],
actor_ips: List[str],
ip_to_gpus: Dict[str, set],
):
assert len(actors) == len(actor_ips)

def set_env(key: str, val: str):
os.environ[key] = val

futures = []
for actor, ip in zip(actors, actor_ips):
assert ip in ip_to_gpus

gpu_str = ",".join([str(device) for device in sorted(ip_to_gpus[ip])])
future = actor.run_fn.remote(set_env, "CUDA_VISIBLE_DEVICES", gpu_str)
futures.append(future)

ray.get(futures)
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import json
import os
import socket
import time
from contextlib import closing

import click
import numpy as np
Expand Down Expand Up @@ -152,12 +150,12 @@ def train_tf_vanilla(
) -> Tuple[float, float]:
# This function is kicked off by the main() function and subsequently kicks
# off tasks that run train_tf_vanilla_worker() on the worker nodes.
import ray
from benchmark_util import (
upload_file_to_all_nodes,
create_actors_with_resources,
run_commands_on_actors,
run_fn_on_actors,
get_ip_port,
)

path = os.path.abspath(__file__)
Expand All @@ -173,14 +171,6 @@ def train_tf_vanilla(
},
)

def get_ip_port():
ip = ray.util.get_node_ip_address()
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(("localhost", 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
port = s.getsockname()[1]
return ip, port

ips_ports = run_fn_on_actors(actors=actors, fn=get_ip_port)
ip_port_list = [f"{ip}:{port}" for ip, port in ips_ports]
ip_port_str = ",".join(ip_port_list)
Expand Down
51 changes: 40 additions & 11 deletions release/air_tests/air_benchmarks/workloads/torch_benchmark.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import json
import os
import socket
import time
from contextlib import closing
from typing import Dict, Tuple

import click
Expand Down Expand Up @@ -129,6 +127,13 @@ def train_func(use_ray: bool, config: Dict):
vanilla_device = torch.device(f"cuda:{gpu_id}")
torch.cuda.set_device(vanilla_device)

print(
"Setting GPU ID to",
gpu_id,
"with visible devices",
os.environ.get("CUDA_VISIBLE_DEVICES"),
)

def collate_fn(x):
return tuple(x_.to(vanilla_device) for x_ in default_collate(x))

Expand Down Expand Up @@ -242,6 +247,7 @@ def train_torch_vanilla_worker(
master_addr: str,
master_port: int,
use_gpu: bool = False,
gpu_id: int = 0,
):
# This function is kicked off by the main() function and runs the vanilla
# training script on a single worker.
Expand All @@ -255,6 +261,7 @@ def train_torch_vanilla_worker(
)

config["use_gpu"] = use_gpu
config["gpu_id"] = gpu_id
train_func(use_ray=False, config=config)

distributed.destroy_process_group()
Expand All @@ -270,12 +277,15 @@ def train_torch_vanilla(
) -> Tuple[float, float]:
# This function is kicked off by the main() function and subsequently kicks
# off tasks that run train_torch_vanilla_worker() on the worker nodes.
import ray
from benchmark_util import (
upload_file_to_all_nodes,
create_actors_with_resources,
run_commands_on_actors,
run_fn_on_actors,
get_ip_port,
get_gpu_ids,
map_ips_to_gpus,
set_cuda_visible_devices,
)

path = os.path.abspath(__file__)
Expand All @@ -291,15 +301,30 @@ def train_torch_vanilla(
},
)

def get_ip_port():
ip = ray.util.get_node_ip_address()
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(("localhost", 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
port = s.getsockname()[1]
return ip, port
# Get IPs and ports for all actors
ip_ports = run_fn_on_actors(actors=actors, fn=get_ip_port)

# Rank 0 is the master addr/port
master_addr, master_port = ip_ports[0]

if use_gpu:
# Extract IPs
actor_ips = [ipp[0] for ipp in ip_ports]

[(master_addr, master_port)] = run_fn_on_actors(actors=[actors[0]], fn=get_ip_port)
# Get allocated GPU IDs for all actors
gpu_ids = run_fn_on_actors(actors=actors, fn=get_gpu_ids)

# Build a map of IP to all allocated GPUs on that machine
ip_to_gpu_map = map_ips_to_gpus(ips=actor_ips, gpus=gpu_ids)

# Set the environment variables on the workers
set_cuda_visible_devices(
actors=actors, actor_ips=actor_ips, ip_to_gpus=ip_to_gpu_map
)

use_gpu_ids = [gi[0] for gi in gpu_ids]
else:
use_gpu_ids = [0] * num_workers

cmds = [
[
Expand All @@ -318,6 +343,7 @@ def get_ip_port():
str(master_port),
]
+ (["--use-gpu"] if use_gpu else [])
+ (["--gpu-id", str(use_gpu_ids[rank])] if use_gpu else [])
for rank in range(num_workers)
]

Expand Down Expand Up @@ -477,13 +503,15 @@ def run(
@click.option("--master-addr", type=str, default="")
@click.option("--master-port", type=int, default=0)
@click.option("--use-gpu", is_flag=True, default=False)
@click.option("--gpu-id", type=int, default=0)
def worker(
num_epochs: int = 4,
num_workers: int = 4,
rank: int = 0,
master_addr: str = "",
master_port: int = 0,
use_gpu: bool = False,
gpu_id: int = 0,
):
config = CONFIG.copy()
config["epochs"] = num_epochs
Expand All @@ -496,6 +524,7 @@ def worker(
master_addr=master_addr,
master_port=master_port,
use_gpu=use_gpu,
gpu_id=gpu_id,
)


Expand Down