Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .buildkite/pipeline.gpu_large.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
- ./ci/env/install-horovod.sh
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=gpu,gpu_only,-ray_air
--test_env=RAY_AIR_NEW_PERSISTENCE_MODE=0
python/ray/train/...

- label: ":tv: :database: :steam_locomotive: Datasets Train Integration GPU Tests and Examples (Python 3.7)"
Expand All @@ -20,7 +19,6 @@
- pip install -Ur ./python/requirements/ml/dl-gpu-requirements.txt
- ./ci/env/env_info.sh
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=datasets_train,-doctest
--test_env=RAY_AIR_NEW_PERSISTENCE_MODE=0
doc/...

- label: ":tv: :brain: RLlib: Multi-GPU Tests"
Expand Down
32 changes: 18 additions & 14 deletions doc/source/ray-core/_examples/datasets_train/datasets_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import sys
import time
from typing import Tuple
from tempfile import TemporaryDirectory

import boto3
import mlflow
Expand Down Expand Up @@ -464,19 +465,20 @@ def to_torch_dataset(torch_batch_iterator):
)

# Checkpoint model.
checkpoint = Checkpoint.from_dict(dict(model=net.state_dict()))

# Record and log stats.
print(f"train report on {train.get_context().get_world_rank()}")
train.report(
dict(
train_acc=train_acc,
train_loss=train_running_loss,
test_acc=test_acc,
test_loss=test_running_loss,
),
checkpoint=checkpoint,
)
with TemporaryDirectory() as tmpdir:
torch.save(net.module.state_dict(), os.path.join(tmpdir, "checkpoint.pt"))

# Record and log stats.
print(f"train report on {train.get_context().get_world_rank()}")
train.report(
dict(
train_acc=train_acc,
train_loss=train_running_loss,
test_acc=test_acc,
test_loss=test_running_loss,
),
checkpoint=Checkpoint.from_directory(tmpdir),
)


if __name__ == "__main__":
Expand Down Expand Up @@ -640,7 +642,9 @@ def to_torch_dataset(torch_batch_iterator):
dataset_config=DataConfig(datasets_to_split=["train", "test"]),
)
results = trainer.fit()
state_dict = results.checkpoint.to_dict()["model"]

with results.checkpoint.as_directory() as tmpdir:
state_dict = torch.load(os.path.join(tmpdir, "checkpoint.pt"))

def load_model_func():
num_layers = config["num_layers"]
Expand Down
31 changes: 17 additions & 14 deletions python/ray/train/examples/pytorch/torch_fashion_mnist_example.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
from filelock import FileLock
from typing import Dict

import torch
Expand All @@ -16,21 +18,22 @@ def get_dataloaders(batch_size):
# Transform to normalize the input images
transform = transforms.Compose([ToTensor(), Normalize((0.5,), (0.5,))])

# Download training data from open datasets.
training_data = datasets.FashionMNIST(
root="~/data",
train=True,
download=True,
transform=transform,
)
with FileLock(os.path.expanduser("~/data.lock")):
# Download training data from open datasets.
training_data = datasets.FashionMNIST(
root="~/data",
train=True,
download=True,
transform=transform,
)

# Download test data from open datasets.
test_data = datasets.FashionMNIST(
root="~/data",
train=False,
download=True,
transform=transform,
)
# Download test data from open datasets.
test_data = datasets.FashionMNIST(
root="~/data",
train=False,
download=True,
transform=transform,
)

# Create data loaders.
train_dataloader = DataLoader(training_data, batch_size=batch_size)
Expand Down
15 changes: 10 additions & 5 deletions python/ray/train/tests/test_accelerate_trainer_gpu.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import os
import pytest
import torch
from tempfile import TemporaryDirectory

import ray
import torch.nn as nn
from ray.train.examples.pytorch.torch_linear_example import LinearDataset
from ray.train import ScalingConfig
import ray.train as train
from ray.train.torch.torch_checkpoint import LegacyTorchCheckpoint
from ray.train import Checkpoint
from ray.train.huggingface import AccelerateTrainer
from accelerate import Accelerator

Expand Down Expand Up @@ -239,9 +241,10 @@ def linear_train_func(accelerator: Accelerator, config):

result = dict(loss=loss)
results.append(result)
train.report(
result, checkpoint=LegacyTorchCheckpoint.from_state_dict(state_dict)
)

with TemporaryDirectory() as tmpdir:
torch.save(state_dict, os.path.join(tmpdir, "checkpoint.pt"))
train.report(result, checkpoint=Checkpoint.from_directory(tmpdir))

return results

Expand Down Expand Up @@ -307,7 +310,9 @@ def train_func():
assert accelerator.process_index == train.get_context().get_world_rank()
model = torch.nn.Linear(3, 1)
model = accelerator.prepare(model)
train.report({}, checkpoint=LegacyTorchCheckpoint.from_model(model))
with TemporaryDirectory() as tmpdir:
torch.save(model, os.path.join(tmpdir, "checkpoint.pt"))
train.report({}, checkpoint=Checkpoint.from_directory(tmpdir))

scaling_config = ScalingConfig(num_workers=num_workers)
trainer = AccelerateTrainer(
Expand Down
83 changes: 45 additions & 38 deletions python/ray/train/tests/test_gpu.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import json
import os
from pathlib import Path
import time
from typing import Union, List, Dict

from unittest.mock import patch
import pytest
Expand All @@ -11,7 +14,7 @@
import ray
import ray.data
from ray.exceptions import RayTaskError
from ray import train, tune
from ray import train

from ray.train import ScalingConfig
from ray.train.constants import DEFAULT_NCCL_SOCKET_IFNAME
Expand All @@ -36,19 +39,26 @@ def __getitem__(self, index):
return {"x": self.x[index, None], "y": 2}


# Currently in DataParallelTrainers we only report metrics from rank 0.
# For testing purposes here, we need to be able to report from all
# workers.
class TorchTrainerPatchedMultipleReturns(TorchTrainer):
def _report(self, training_iterator) -> None:
for results in training_iterator:
tune.report(results=results)
def write_rank_data(tmp_path: Path, data: Union[int, List, Dict]):
rank = train.get_context().get_world_rank()
with open(tmp_path / f"{rank}.json", "w") as f:
json.dump(data, f)


def get_data_from_all_ranks(tmp_path: Path) -> Dict[int, Union[int, List, Dict]]:
rank_data = {}
for rank_file in tmp_path.glob("*.json"):
rank = int(rank_file.stem)
with open(rank_file, "r") as f:
data = json.load(f)
rank_data[rank] = data
return rank_data


@pytest.mark.parametrize("cuda_visible_devices", ["", "1,2"])
@pytest.mark.parametrize("num_gpus_per_worker", [0.5, 1, 2])
def test_torch_get_device(
shutdown_only, num_gpus_per_worker, cuda_visible_devices, monkeypatch
shutdown_only, num_gpus_per_worker, cuda_visible_devices, monkeypatch, tmp_path
):
if cuda_visible_devices:
# Test if `get_device` is correct even with user specified env var.
Expand All @@ -61,27 +71,26 @@ def train_fn():
if cuda_visible_devices:
visible_devices = os.environ["CUDA_VISIBLE_DEVICES"]
assert visible_devices == "1,2"
if num_gpus_per_worker > 1:
train.report(
dict(
devices=sorted(
[device.index for device in train.torch.get_device()]
)
)
)
else:
train.report(dict(devices=train.torch.get_device().index))

trainer = TorchTrainerPatchedMultipleReturns(

devices = (
sorted([device.index for device in train.torch.get_device()])
if num_gpus_per_worker > 1
else train.torch.get_device().index
)
write_rank_data(tmp_path, devices)

trainer = TorchTrainer(
train_fn,
scaling_config=ScalingConfig(
num_workers=int(2 / num_gpus_per_worker),
use_gpu=True,
resources_per_worker={"GPU": num_gpus_per_worker},
),
)
results = trainer.fit()
devices = [result["devices"] for result in results.metrics["results"]]
trainer.fit()

rank_data = get_data_from_all_ranks(tmp_path)
devices = list(rank_data.values())

if num_gpus_per_worker == 0.5:
assert sorted(devices) == [0, 0, 1, 1]
Expand All @@ -97,21 +106,17 @@ def train_fn():


@pytest.mark.parametrize("num_gpus_per_worker", [0.5, 1, 2])
def test_torch_get_device_dist(ray_2_node_2_gpu, num_gpus_per_worker):
def test_torch_get_device_dist(ray_2_node_2_gpu, num_gpus_per_worker, tmp_path):
@patch("torch.cuda.is_available", lambda: True)
def train_fn():
if num_gpus_per_worker > 1:
train.report(
dict(
devices=sorted(
[device.index for device in train.torch.get_device()]
)
)
)
else:
train.report(dict(devices=train.torch.get_device().index))

trainer = TorchTrainerPatchedMultipleReturns(
devices = (
sorted([device.index for device in train.torch.get_device()])
if num_gpus_per_worker > 1
else train.torch.get_device().index
)
write_rank_data(tmp_path, devices)

trainer = TorchTrainer(
train_fn,
# use gloo instead of nccl, since nccl is not supported
# on this virtual gpu ray environment
Expand All @@ -122,8 +127,10 @@ def train_fn():
resources_per_worker={"GPU": num_gpus_per_worker},
),
)
results = trainer.fit()
devices = [result["devices"] for result in results.metrics["results"]]
trainer.fit()

rank_data = get_data_from_all_ranks(tmp_path)
devices = list(rank_data.values())

# cluster setups: 2 nodes, 2 gpus per node
# `CUDA_VISIBLE_DEVICES` is set to "0,1" on node 1 and node 2
Expand Down
10 changes: 6 additions & 4 deletions python/ray/train/tests/test_gpu_amp.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from timeit import default_timer as timer

from ray.train.torch.torch_checkpoint import LegacyTorchCheckpoint
import os
import torch
import torchvision
from tempfile import TemporaryDirectory

import ray.train as train
from ray.train import ScalingConfig
from ray.train import Checkpoint, ScalingConfig
from ray.train.torch.torch_trainer import TorchTrainer


Expand Down Expand Up @@ -63,14 +64,15 @@ def train_func():
model = torchvision.models.resnet101()
model = train.torch.prepare_model(model)

train.report({}, checkpoint=LegacyTorchCheckpoint.from_model(model))
with TemporaryDirectory() as tmpdir:
torch.save(model, os.path.join(tmpdir, "checkpoint.pt"))
train.report({}, checkpoint=Checkpoint.from_directory(tmpdir))

trainer = TorchTrainer(
train_func, scaling_config=ScalingConfig(num_workers=2, use_gpu=True)
)
results = trainer.fit()
assert results.checkpoint
assert results.checkpoint.get_model()


if __name__ == "__main__":
Expand Down
33 changes: 21 additions & 12 deletions python/ray/train/tests/test_gpu_auto_transfer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import os
from unittest.mock import patch
from tempfile import TemporaryDirectory
import pytest

import torch

import ray
from ray import train
from ray.air.constants import MODEL_KEY
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer, LegacyTorchCheckpoint
from ray.train import ScalingConfig, Checkpoint
from ray.train.torch import TorchTrainer
import ray.train.torch.train_loop_utils


Expand Down Expand Up @@ -105,15 +105,24 @@ def train_func():

assert next(model.parameters()).is_cuda

train.report({}, checkpoint=LegacyTorchCheckpoint.from_model(model))
with TemporaryDirectory() as tmpdir:
state_dict = {
k.replace("module.", ""): v for k, v in model.state_dict().items()
}
torch.save(state_dict, os.path.join(tmpdir, "checkpoint.pt"))
train.report({}, checkpoint=Checkpoint.from_directory(tmpdir))

trainer = TorchTrainer(
train_func, scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=True)
)
results = trainer.fit()

model_checkpoint = results.checkpoint.get_model()
assert not next(model_checkpoint.parameters()).is_cuda
with results.checkpoint.as_directory() as tmpdir:
state_dict = torch.load(os.path.join(tmpdir, "checkpoint.pt"))
checkpoint_model = torch.nn.Linear(1, 1)
checkpoint_model.load_state_dict(state_dict)

assert not next(checkpoint_model.parameters()).is_cuda

# Test the same thing for state dict.

Expand All @@ -130,20 +139,20 @@ def train_func():
for tensor in state_dict.values():
assert tensor.is_cuda

train.report(
{},
checkpoint=LegacyTorchCheckpoint.from_state_dict(state_dict),
)
with TemporaryDirectory() as tmpdir:
torch.save(model.state_dict(), os.path.join(tmpdir, "checkpoint.pt"))
train.report({}, checkpoint=Checkpoint.from_directory(tmpdir))

trainer = TorchTrainer(
train_func, scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=True)
)
results = trainer.fit()

state_dict_checkpoint = results.checkpoint.to_dict()[MODEL_KEY]
with results.checkpoint.as_directory() as tmpdir:
state_dict_checkpoint = torch.load(os.path.join(tmpdir, "checkpoint.pt"))

for tensor in state_dict_checkpoint.values():
assert not tensor.is_cuda
assert tensor.is_cuda


if __name__ == "__main__":
Expand Down
Loading