Skip to content

[Prototype] Run tests in parallel #273

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion fast_llm/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,6 @@ def __init__(self, **kwargs):

if dynamic_type is not None:
for cls_, name in dynamic_type.items():
print(cls_, name, wrapped)
cls_.register_subclass(name, wrapped)

return wrapped
Expand Down
23 changes: 23 additions & 0 deletions fast_llm/engine/distributed/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import enum
import functools
import logging
import os
import typing
Expand All @@ -8,6 +9,8 @@
from fast_llm.utils import Assert, div, log

if typing.TYPE_CHECKING:
import torch

from fast_llm.core.distributed import ProcessGroup


Expand Down Expand Up @@ -257,6 +260,18 @@ class DistributedConfig(Config):
desc="Pointer to the distributed config this one is an identical copy of.",
hint=FieldHint.derived,
)
gpu_ids: list[int] | None = Field(
default=None,
desc="Specify which GPU to use on each local rank."
" We recommend setting `CUDA_VISIBLE_DEVICES` instead when possible.",
hint=FieldHint.expert,
)
gpu_memory_limit_gb: float | None = Field(
default=None,
desc="Limit the memory usage on each GPU."
"Warning: the limit is be set globally and unset at deletion of the config object.",
hint=FieldHint.expert,
)

def _validate(self) -> None:
if self.world_size is None:
Expand Down Expand Up @@ -371,6 +386,14 @@ def _validate(self) -> None:
self.compare(self.reference_config, ValueError)
Assert.in_range(self.rank, 0, self.world_size)
Assert.in_range(self.local_rank, 0, self.local_world_size)
if self.gpu_ids is not None:
Assert.eq(len(self.gpu_ids), self.local_world_size)

@functools.cached_property
def device(self) -> "torch.device":
import torch

return torch.device(self.local_rank if self.gpu_ids is None else self.gpu_ids[self.local_rank])

def _add_distributed_dim(self, distributed_dim: DistributedDim) -> None:
if distributed_dim.name in self.distributed_dims:
Expand Down
10 changes: 9 additions & 1 deletion fast_llm/engine/distributed/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@ def __init__(self, config: DistributedConfig, use_cpu: bool = False):
else:
Assert.in_range_incl(self._config.local_world_size, 1, torch.cuda.device_count())
torch.cuda.init()
self.device = torch.device(self._config.local_rank)
self.device = self._config.device
torch.cuda.set_device(self.device)
if self._config.gpu_memory_limit_gb is not None:
torch.cuda.set_per_process_memory_fraction(
self._config.gpu_memory_limit_gb * 1e9 / torch.cuda.mem_get_info(0)[1], self.device
)

# We bypass `torch.distributed.init_process_group` which makes things way more complicated for no reason.

Expand Down Expand Up @@ -171,3 +175,7 @@ def __del__(self):
for group in self._process_groups.values():
if group is not None and hasattr(group, "_shutdown"):
group._shutdown() # noqa

if self._config.gpu_memory_limit_gb is not None:
# Not ideal, but better than nothing.
torch.cuda.set_per_process_memory_fraction(1.0, self.device)
9 changes: 9 additions & 0 deletions fast_llm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import math
import signal
import socket
import typing
from typing import Callable

Expand Down Expand Up @@ -368,3 +369,11 @@ def enabled(self) -> bool:
@property
def interrupted(self):
return self._interrupted


def get_free_port():
sock = socket.socket()
sock.bind(("", 0))
port = sock.getsockname()[1]
sock.close()
return port
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ DEV =
# Required for testing
pytest>=8.3.2
pytest-depends>=1.0.1
pytest-xdist>=3.6.1
# Somehow needed for Megatron to work with base image 24.11
setuptools>=75.6.0

Expand Down
194 changes: 143 additions & 51 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from fast_llm.data.dataset.gpt.memmap import GPTMemmapDataset
from fast_llm.data.dataset.gpt.sampled import GPTSample
from fast_llm.engine.distributed.config import DistributedConfig
from fast_llm.layers.ssm.config import SSMConfig
from fast_llm.layers.transformer.config import TransformerConfig
from fast_llm.models.gpt.config import (
Expand All @@ -25,6 +26,7 @@
)
from fast_llm.models.ssm.config import HybridSSMBaseModelConfig, LLambaHuggingfaceCheckpointFormat
from fast_llm.tools.train import CliTrainingConfig
from fast_llm.utils import Assert
from tests.compare_tensor_logs import CompareConfig, compare_tensor_logs

# FIXME: figure out correct import of megatron modules without this hack
Expand Down Expand Up @@ -286,7 +288,22 @@
TEST_MODEL_TYPE, CONFIG_FAST_LLM, CONFIG_GPT2, CONFIG_COMMON, HUGGINGFACE_CHECKPOINT_FORMAT = _CONFIGS[TEST_MODEL]


requires_cuda = pytest.mark.skipif(not torch.cuda.is_available(), reason="CUDA is not available")
requires_cuda = pytest.mark.get_test_resources()
requires_multi_gpu = lambda num_gpus: pytest.mark.get_test_resources(num_gpus=num_gpus, ports=2, timeout=60)


@pytest.fixture(scope="session")
def get_distributed_config(get_test_resources):
def _get_distributed_config():
owned_test_resources = get_test_resources()
return DistributedConfig.from_dict(
{
"gpu_ids": list(owned_test_resources["gpus"]),
"gpu_memory_limit_gb": next(iter(owned_test_resources["gpus"].values())),
}
)

return _get_distributed_config


def get_test_dataset(
Expand Down Expand Up @@ -350,29 +367,87 @@ def get_test_concatenated_memmap_dataset(
index_file.open("w").writelines([str(path / f"dataset_{i}") + "\n" for i in range(num_files)])


def run_test_script(
name: str,
script: list[str],
num_gpus: int = 1,
*,
model_type: str = TEST_MODEL_TYPE,
is_megatron: bool = False,
compare: str | None = None,
config: CompareConfig | None = None,
prepare_fn=None,
compare_fn=None,
do_compare: bool = True,
):
if torch.cuda.device_count() < num_gpus:
pytest.skip(f"Not enough GPUs to run test ({torch.cuda.device_count()}<{num_gpus})")
env = os.environ.copy()
if is_megatron:
@pytest.fixture(scope="session")
def run_fast_llm_train(get_test_resources):
def do_run_fast_llm_train(
model_type: str,
cli_args: list[str],
*,
force_separate_process: bool = False,
num_gpus: int = 1,
):
owned_test_resources = get_test_resources()
gpu_memory_gb = next(iter(owned_test_resources["gpus"].values()))
cli_args = [
*cli_args,
f"model.distributed.gpu_ids=[{",".join(str(gpu_id) for gpu_id in owned_test_resources["gpus"])}]",
f"model.distributed.gpu_memory_limit_gb={gpu_memory_gb}",
]
Assert.eq(num_gpus, len(owned_test_resources["gpus"]))
if num_gpus > 1 or force_separate_process:
command = [
"python",
"-m",
"torch.distributed.run",
f"--nproc-per-node={num_gpus}",
"--no-python",
f"--rdzv-endpoint=localhost:{owned_test_resources["ports"][0]}",
f"--master-port={owned_test_resources["ports"][1]}",
"fast-llm",
"train",
model_type,
*cli_args,
]
print(" ".join(command))
completed_proc = subprocess.run(command, timeout=60)
if completed_proc.returncode:
raise RuntimeError(f"Process failed with return code {completed_proc.returncode}")
else:
print(" ".join(["fast-llm", "train", model_type, *cli_args]))
CliTrainingConfig.parse_and_run([model_type, *cli_args])

return do_run_fast_llm_train


@pytest.fixture(scope="session")
def run_megatron_train(get_test_resources):
def do_run_megatron_train(
cli_args: list[str],
*,
num_gpus: int = 1,
):
owned_test_resources = get_test_resources()
env = os.environ.copy()
# Prevent Megatron from complaining.
env["CUDA_DEVICE_MAX_CONNECTIONS"] = "1"
env["NVTE_FLASH_ATTN"] = "0"
env["CUDA_VISIBLE_DEVICES"] = ",".join(str(gpu) for gpu in owned_test_resources["gpus"])
Assert.eq(1, len(owned_test_resources["gpus"]), num_gpus)
# Torchrun needed because Megatron really wants to initialize distributed.
command = [
"python",
"-m",
"torch.distributed.run",
f"--rdzv-endpoint=localhost:{owned_test_resources["ports"][0]}",
f"--master-port={owned_test_resources["ports"][1]}",
"Megatron-LM/pretrain_gpt.py",
*cli_args,
]
print(" ".join(command))
completed_proc = subprocess.run(command, env=env, timeout=60)
if completed_proc.returncode:
raise RuntimeError(f"Process failed with return code {completed_proc.returncode}")

return do_run_megatron_train


def _get_run_test_path(name: str, is_megatron: bool) -> tuple[pathlib.Path, bool]:
# Prepare experiment directory.
path = TEST_RESULTS_PATH.resolve() / name
skip = False
artifact_path = path / ARTIFACT_PATH
# Check is we can reuse an existing result.
# TODO: No longer important?
if path.exists():
assert path.is_dir()
# TODO: Better way to check if the previous attempt succeeded.
Expand All @@ -385,43 +460,60 @@ def run_test_script(
elif FORCE_REUSE_RESULTS:
raise RuntimeError(artifact_path)
else:
# Make sure the directory is empty.
shutil.rmtree(path)
elif FORCE_REUSE_RESULTS:
raise RuntimeError(path)
if prepare_fn is not None:
skip = prepare_fn(TEST_RESULTS_PATH / name, None if compare is None else TEST_RESULTS_PATH / compare, skip)
if is_megatron:
script = [*script, f"--structured-logs-dir={path}", f"--data-cache-path={path}"]
else:
script = [model_type, *script, f"run.experiment_dir={path}"]
header = ["Megatron-LM/pretrain_gpt.py"] if is_megatron else ["--no-python", "fast-llm", "train"]
command = [
"python",
"-m",
"torch.distributed.run",
f"--nproc-per-node={num_gpus}",
*header,
*script,
]
print(" ".join(command))
if skip:
print("Reusing existing run.")
else:
get_test_dataset()
if num_gpus == 1 and not is_megatron:
CliTrainingConfig.parse_and_run(script)
return path, skip


@pytest.fixture(scope="session")
def run_test_script(run_fast_llm_train, run_megatron_train):
def do_run_test_script(
name: str,
cli_args: list[str],
*,
model_type: str = TEST_MODEL_TYPE,
num_gpus: int = 1,
force_separate_process: bool = False,
is_megatron: bool = False,
compare: str | None = None,
config: CompareConfig | None = None,
prepare_fn=None,
compare_fn=None,
do_compare: bool = True,
):
# Prepare experiment directory.
path, skip = _get_run_test_path(name, is_megatron)
if prepare_fn is not None:
skip = prepare_fn(TEST_RESULTS_PATH / name, None if compare is None else TEST_RESULTS_PATH / compare, skip)
if skip:
print("Reusing existing run.")
else:
completed_proc = subprocess.run(command, env=env, timeout=60)
if completed_proc.returncode:
raise RuntimeError(f"Process failed with return code {completed_proc.returncode}")
if compare and do_compare:
if compare_fn is not None:
compare_fn(TEST_RESULTS_PATH / name, TEST_RESULTS_PATH / compare)
compare_tensor_logs(
TEST_RESULTS_PATH / compare / ARTIFACT_PATH,
TEST_RESULTS_PATH / name / ARTIFACT_PATH,
config,
)
get_test_dataset()
if is_megatron:
assert model_type == "gpt"
run_megatron_train(
[*cli_args, f"--structured-logs-dir={path}", f"--data-cache-path={path}"],
num_gpus=num_gpus,
)
else:
run_fast_llm_train(
model_type,
[*cli_args, f"run.experiment_dir={path}"],
force_separate_process=force_separate_process,
num_gpus=num_gpus,
)
if compare and do_compare:
if compare_fn is not None:
compare_fn(TEST_RESULTS_PATH / name, TEST_RESULTS_PATH / compare)
compare_tensor_logs(
TEST_RESULTS_PATH / compare / ARTIFACT_PATH,
TEST_RESULTS_PATH / name / ARTIFACT_PATH,
config,
)

return do_run_test_script


def materialize_meta_tensors(model, tensor_space):
Expand Down
Loading
Loading