Skip to content

Commit 5370f6e

Browse files
woshiyyyajustinvyu
authored andcommitted
[2.7 CI][New Persistent Mode][4/n] 📺 🚂 Train GPU tests & 🚂 Datasets Train Integration GPU Tests and Examples (ray-project#38910)
Signed-off-by: woshiyyya <xiaoyunxuan1998@gmail.com> Signed-off-by: Justin Yu <justinvyu@anyscale.com> Co-authored-by: Justin Yu <justinvyu@anyscale.com> Signed-off-by: Victor <vctr.y.m@example.com>
1 parent ce7d937 commit 5370f6e

File tree

8 files changed

+122
-91
lines changed

8 files changed

+122
-91
lines changed

.buildkite/pipeline.gpu_large.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
- ./ci/env/install-horovod.sh
1010
- ./ci/env/env_info.sh
1111
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=gpu,gpu_only,-ray_air
12-
--test_env=RAY_AIR_NEW_PERSISTENCE_MODE=0
1312
python/ray/train/...
1413

1514
- label: ":tv: :database: :steam_locomotive: Datasets Train Integration GPU Tests and Examples (Python 3.7)"
@@ -20,7 +19,6 @@
2019
- pip install -Ur ./python/requirements/ml/dl-gpu-requirements.txt
2120
- ./ci/env/env_info.sh
2221
- bazel test --config=ci $(./ci/run/bazel_export_options) --build_tests_only --test_tag_filters=datasets_train,-doctest
23-
--test_env=RAY_AIR_NEW_PERSISTENCE_MODE=0
2422
doc/...
2523

2624
- label: ":tv: :brain: RLlib: Multi-GPU Tests"

doc/source/ray-core/_examples/datasets_train/datasets_train.py

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import sys
1515
import time
1616
from typing import Tuple
17+
from tempfile import TemporaryDirectory
1718

1819
import boto3
1920
import mlflow
@@ -464,19 +465,20 @@ def to_torch_dataset(torch_batch_iterator):
464465
)
465466

466467
# Checkpoint model.
467-
checkpoint = Checkpoint.from_dict(dict(model=net.state_dict()))
468-
469-
# Record and log stats.
470-
print(f"train report on {train.get_context().get_world_rank()}")
471-
train.report(
472-
dict(
473-
train_acc=train_acc,
474-
train_loss=train_running_loss,
475-
test_acc=test_acc,
476-
test_loss=test_running_loss,
477-
),
478-
checkpoint=checkpoint,
479-
)
468+
with TemporaryDirectory() as tmpdir:
469+
torch.save(net.module.state_dict(), os.path.join(tmpdir, "checkpoint.pt"))
470+
471+
# Record and log stats.
472+
print(f"train report on {train.get_context().get_world_rank()}")
473+
train.report(
474+
dict(
475+
train_acc=train_acc,
476+
train_loss=train_running_loss,
477+
test_acc=test_acc,
478+
test_loss=test_running_loss,
479+
),
480+
checkpoint=Checkpoint.from_directory(tmpdir),
481+
)
480482

481483

482484
if __name__ == "__main__":
@@ -640,7 +642,9 @@ def to_torch_dataset(torch_batch_iterator):
640642
dataset_config=DataConfig(datasets_to_split=["train", "test"]),
641643
)
642644
results = trainer.fit()
643-
state_dict = results.checkpoint.to_dict()["model"]
645+
646+
with results.checkpoint.as_directory() as tmpdir:
647+
state_dict = torch.load(os.path.join(tmpdir, "checkpoint.pt"))
644648

645649
def load_model_func():
646650
num_layers = config["num_layers"]

python/ray/train/examples/pytorch/torch_fashion_mnist_example.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import os
2+
from filelock import FileLock
13
from typing import Dict
24

35
import torch
@@ -16,21 +18,22 @@ def get_dataloaders(batch_size):
1618
# Transform to normalize the input images
1719
transform = transforms.Compose([ToTensor(), Normalize((0.5,), (0.5,))])
1820

19-
# Download training data from open datasets.
20-
training_data = datasets.FashionMNIST(
21-
root="~/data",
22-
train=True,
23-
download=True,
24-
transform=transform,
25-
)
21+
with FileLock(os.path.expanduser("~/data.lock")):
22+
# Download training data from open datasets.
23+
training_data = datasets.FashionMNIST(
24+
root="~/data",
25+
train=True,
26+
download=True,
27+
transform=transform,
28+
)
2629

27-
# Download test data from open datasets.
28-
test_data = datasets.FashionMNIST(
29-
root="~/data",
30-
train=False,
31-
download=True,
32-
transform=transform,
33-
)
30+
# Download test data from open datasets.
31+
test_data = datasets.FashionMNIST(
32+
root="~/data",
33+
train=False,
34+
download=True,
35+
transform=transform,
36+
)
3437

3538
# Create data loaders.
3639
train_dataloader = DataLoader(training_data, batch_size=batch_size)

python/ray/train/tests/test_accelerate_trainer_gpu.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
1+
import os
12
import pytest
23
import torch
4+
from tempfile import TemporaryDirectory
35

46
import ray
57
import torch.nn as nn
68
from ray.train.examples.pytorch.torch_linear_example import LinearDataset
79
from ray.train import ScalingConfig
810
import ray.train as train
9-
from ray.train.torch.torch_checkpoint import LegacyTorchCheckpoint
11+
from ray.train import Checkpoint
1012
from ray.train.huggingface import AccelerateTrainer
1113
from accelerate import Accelerator
1214

@@ -239,9 +241,10 @@ def linear_train_func(accelerator: Accelerator, config):
239241

240242
result = dict(loss=loss)
241243
results.append(result)
242-
train.report(
243-
result, checkpoint=LegacyTorchCheckpoint.from_state_dict(state_dict)
244-
)
244+
245+
with TemporaryDirectory() as tmpdir:
246+
torch.save(state_dict, os.path.join(tmpdir, "checkpoint.pt"))
247+
train.report(result, checkpoint=Checkpoint.from_directory(tmpdir))
245248

246249
return results
247250

@@ -307,7 +310,9 @@ def train_func():
307310
assert accelerator.process_index == train.get_context().get_world_rank()
308311
model = torch.nn.Linear(3, 1)
309312
model = accelerator.prepare(model)
310-
train.report({}, checkpoint=LegacyTorchCheckpoint.from_model(model))
313+
with TemporaryDirectory() as tmpdir:
314+
torch.save(model, os.path.join(tmpdir, "checkpoint.pt"))
315+
train.report({}, checkpoint=Checkpoint.from_directory(tmpdir))
311316

312317
scaling_config = ScalingConfig(num_workers=num_workers)
313318
trainer = AccelerateTrainer(

python/ray/train/tests/test_gpu.py

Lines changed: 45 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
import json
12
import os
3+
from pathlib import Path
24
import time
5+
from typing import Union, List, Dict
36

47
from unittest.mock import patch
58
import pytest
@@ -11,7 +14,7 @@
1114
import ray
1215
import ray.data
1316
from ray.exceptions import RayTaskError
14-
from ray import train, tune
17+
from ray import train
1518

1619
from ray.train import ScalingConfig
1720
from ray.train.constants import DEFAULT_NCCL_SOCKET_IFNAME
@@ -36,19 +39,26 @@ def __getitem__(self, index):
3639
return {"x": self.x[index, None], "y": 2}
3740

3841

39-
# Currently in DataParallelTrainers we only report metrics from rank 0.
40-
# For testing purposes here, we need to be able to report from all
41-
# workers.
42-
class TorchTrainerPatchedMultipleReturns(TorchTrainer):
43-
def _report(self, training_iterator) -> None:
44-
for results in training_iterator:
45-
tune.report(results=results)
42+
def write_rank_data(tmp_path: Path, data: Union[int, List, Dict]):
43+
rank = train.get_context().get_world_rank()
44+
with open(tmp_path / f"{rank}.json", "w") as f:
45+
json.dump(data, f)
46+
47+
48+
def get_data_from_all_ranks(tmp_path: Path) -> Dict[int, Union[int, List, Dict]]:
49+
rank_data = {}
50+
for rank_file in tmp_path.glob("*.json"):
51+
rank = int(rank_file.stem)
52+
with open(rank_file, "r") as f:
53+
data = json.load(f)
54+
rank_data[rank] = data
55+
return rank_data
4656

4757

4858
@pytest.mark.parametrize("cuda_visible_devices", ["", "1,2"])
4959
@pytest.mark.parametrize("num_gpus_per_worker", [0.5, 1, 2])
5060
def test_torch_get_device(
51-
shutdown_only, num_gpus_per_worker, cuda_visible_devices, monkeypatch
61+
shutdown_only, num_gpus_per_worker, cuda_visible_devices, monkeypatch, tmp_path
5262
):
5363
if cuda_visible_devices:
5464
# Test if `get_device` is correct even with user specified env var.
@@ -61,27 +71,26 @@ def train_fn():
6171
if cuda_visible_devices:
6272
visible_devices = os.environ["CUDA_VISIBLE_DEVICES"]
6373
assert visible_devices == "1,2"
64-
if num_gpus_per_worker > 1:
65-
train.report(
66-
dict(
67-
devices=sorted(
68-
[device.index for device in train.torch.get_device()]
69-
)
70-
)
71-
)
72-
else:
73-
train.report(dict(devices=train.torch.get_device().index))
74-
75-
trainer = TorchTrainerPatchedMultipleReturns(
74+
75+
devices = (
76+
sorted([device.index for device in train.torch.get_device()])
77+
if num_gpus_per_worker > 1
78+
else train.torch.get_device().index
79+
)
80+
write_rank_data(tmp_path, devices)
81+
82+
trainer = TorchTrainer(
7683
train_fn,
7784
scaling_config=ScalingConfig(
7885
num_workers=int(2 / num_gpus_per_worker),
7986
use_gpu=True,
8087
resources_per_worker={"GPU": num_gpus_per_worker},
8188
),
8289
)
83-
results = trainer.fit()
84-
devices = [result["devices"] for result in results.metrics["results"]]
90+
trainer.fit()
91+
92+
rank_data = get_data_from_all_ranks(tmp_path)
93+
devices = list(rank_data.values())
8594

8695
if num_gpus_per_worker == 0.5:
8796
assert sorted(devices) == [0, 0, 1, 1]
@@ -97,21 +106,17 @@ def train_fn():
97106

98107

99108
@pytest.mark.parametrize("num_gpus_per_worker", [0.5, 1, 2])
100-
def test_torch_get_device_dist(ray_2_node_2_gpu, num_gpus_per_worker):
109+
def test_torch_get_device_dist(ray_2_node_2_gpu, num_gpus_per_worker, tmp_path):
101110
@patch("torch.cuda.is_available", lambda: True)
102111
def train_fn():
103-
if num_gpus_per_worker > 1:
104-
train.report(
105-
dict(
106-
devices=sorted(
107-
[device.index for device in train.torch.get_device()]
108-
)
109-
)
110-
)
111-
else:
112-
train.report(dict(devices=train.torch.get_device().index))
113-
114-
trainer = TorchTrainerPatchedMultipleReturns(
112+
devices = (
113+
sorted([device.index for device in train.torch.get_device()])
114+
if num_gpus_per_worker > 1
115+
else train.torch.get_device().index
116+
)
117+
write_rank_data(tmp_path, devices)
118+
119+
trainer = TorchTrainer(
115120
train_fn,
116121
# use gloo instead of nccl, since nccl is not supported
117122
# on this virtual gpu ray environment
@@ -122,8 +127,10 @@ def train_fn():
122127
resources_per_worker={"GPU": num_gpus_per_worker},
123128
),
124129
)
125-
results = trainer.fit()
126-
devices = [result["devices"] for result in results.metrics["results"]]
130+
trainer.fit()
131+
132+
rank_data = get_data_from_all_ranks(tmp_path)
133+
devices = list(rank_data.values())
127134

128135
# cluster setups: 2 nodes, 2 gpus per node
129136
# `CUDA_VISIBLE_DEVICES` is set to "0,1" on node 1 and node 2

python/ray/train/tests/test_gpu_amp.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
from timeit import default_timer as timer
22

3-
from ray.train.torch.torch_checkpoint import LegacyTorchCheckpoint
3+
import os
44
import torch
55
import torchvision
6+
from tempfile import TemporaryDirectory
67

78
import ray.train as train
8-
from ray.train import ScalingConfig
9+
from ray.train import Checkpoint, ScalingConfig
910
from ray.train.torch.torch_trainer import TorchTrainer
1011

1112

@@ -63,14 +64,15 @@ def train_func():
6364
model = torchvision.models.resnet101()
6465
model = train.torch.prepare_model(model)
6566

66-
train.report({}, checkpoint=LegacyTorchCheckpoint.from_model(model))
67+
with TemporaryDirectory() as tmpdir:
68+
torch.save(model, os.path.join(tmpdir, "checkpoint.pt"))
69+
train.report({}, checkpoint=Checkpoint.from_directory(tmpdir))
6770

6871
trainer = TorchTrainer(
6972
train_func, scaling_config=ScalingConfig(num_workers=2, use_gpu=True)
7073
)
7174
results = trainer.fit()
7275
assert results.checkpoint
73-
assert results.checkpoint.get_model()
7476

7577

7678
if __name__ == "__main__":

python/ray/train/tests/test_gpu_auto_transfer.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import os
22
from unittest.mock import patch
3+
from tempfile import TemporaryDirectory
34
import pytest
45

56
import torch
67

78
import ray
89
from ray import train
9-
from ray.air.constants import MODEL_KEY
10-
from ray.train import ScalingConfig
11-
from ray.train.torch import TorchTrainer, LegacyTorchCheckpoint
10+
from ray.train import ScalingConfig, Checkpoint
11+
from ray.train.torch import TorchTrainer
1212
import ray.train.torch.train_loop_utils
1313

1414

@@ -105,15 +105,24 @@ def train_func():
105105

106106
assert next(model.parameters()).is_cuda
107107

108-
train.report({}, checkpoint=LegacyTorchCheckpoint.from_model(model))
108+
with TemporaryDirectory() as tmpdir:
109+
state_dict = {
110+
k.replace("module.", ""): v for k, v in model.state_dict().items()
111+
}
112+
torch.save(state_dict, os.path.join(tmpdir, "checkpoint.pt"))
113+
train.report({}, checkpoint=Checkpoint.from_directory(tmpdir))
109114

110115
trainer = TorchTrainer(
111116
train_func, scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=True)
112117
)
113118
results = trainer.fit()
114119

115-
model_checkpoint = results.checkpoint.get_model()
116-
assert not next(model_checkpoint.parameters()).is_cuda
120+
with results.checkpoint.as_directory() as tmpdir:
121+
state_dict = torch.load(os.path.join(tmpdir, "checkpoint.pt"))
122+
checkpoint_model = torch.nn.Linear(1, 1)
123+
checkpoint_model.load_state_dict(state_dict)
124+
125+
assert not next(checkpoint_model.parameters()).is_cuda
117126

118127
# Test the same thing for state dict.
119128

@@ -130,20 +139,20 @@ def train_func():
130139
for tensor in state_dict.values():
131140
assert tensor.is_cuda
132141

133-
train.report(
134-
{},
135-
checkpoint=LegacyTorchCheckpoint.from_state_dict(state_dict),
136-
)
142+
with TemporaryDirectory() as tmpdir:
143+
torch.save(model.state_dict(), os.path.join(tmpdir, "checkpoint.pt"))
144+
train.report({}, checkpoint=Checkpoint.from_directory(tmpdir))
137145

138146
trainer = TorchTrainer(
139147
train_func, scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=True)
140148
)
141149
results = trainer.fit()
142150

143-
state_dict_checkpoint = results.checkpoint.to_dict()[MODEL_KEY]
151+
with results.checkpoint.as_directory() as tmpdir:
152+
state_dict_checkpoint = torch.load(os.path.join(tmpdir, "checkpoint.pt"))
144153

145154
for tensor in state_dict_checkpoint.values():
146-
assert not tensor.is_cuda
155+
assert tensor.is_cuda
147156

148157

149158
if __name__ == "__main__":

0 commit comments

Comments
 (0)