1
+ import json
1
2
import os
3
+ from pathlib import Path
2
4
import time
5
+ from typing import Union , List , Dict
3
6
4
7
from unittest .mock import patch
5
8
import pytest
11
14
import ray
12
15
import ray .data
13
16
from ray .exceptions import RayTaskError
14
- from ray import train , tune
17
+ from ray import train
15
18
16
19
from ray .train import ScalingConfig
17
20
from ray .train .constants import DEFAULT_NCCL_SOCKET_IFNAME
@@ -36,19 +39,26 @@ def __getitem__(self, index):
36
39
return {"x" : self .x [index , None ], "y" : 2 }
37
40
38
41
39
- # Currently in DataParallelTrainers we only report metrics from rank 0.
40
- # For testing purposes here, we need to be able to report from all
41
- # workers.
42
- class TorchTrainerPatchedMultipleReturns (TorchTrainer ):
43
- def _report (self , training_iterator ) -> None :
44
- for results in training_iterator :
45
- tune .report (results = results )
42
+ def write_rank_data (tmp_path : Path , data : Union [int , List , Dict ]):
43
+ rank = train .get_context ().get_world_rank ()
44
+ with open (tmp_path / f"{ rank } .json" , "w" ) as f :
45
+ json .dump (data , f )
46
+
47
+
48
+ def get_data_from_all_ranks (tmp_path : Path ) -> Dict [int , Union [int , List , Dict ]]:
49
+ rank_data = {}
50
+ for rank_file in tmp_path .glob ("*.json" ):
51
+ rank = int (rank_file .stem )
52
+ with open (rank_file , "r" ) as f :
53
+ data = json .load (f )
54
+ rank_data [rank ] = data
55
+ return rank_data
46
56
47
57
48
58
@pytest .mark .parametrize ("cuda_visible_devices" , ["" , "1,2" ])
49
59
@pytest .mark .parametrize ("num_gpus_per_worker" , [0.5 , 1 , 2 ])
50
60
def test_torch_get_device (
51
- shutdown_only , num_gpus_per_worker , cuda_visible_devices , monkeypatch
61
+ shutdown_only , num_gpus_per_worker , cuda_visible_devices , monkeypatch , tmp_path
52
62
):
53
63
if cuda_visible_devices :
54
64
# Test if `get_device` is correct even with user specified env var.
@@ -61,27 +71,26 @@ def train_fn():
61
71
if cuda_visible_devices :
62
72
visible_devices = os .environ ["CUDA_VISIBLE_DEVICES" ]
63
73
assert visible_devices == "1,2"
64
- if num_gpus_per_worker > 1 :
65
- train .report (
66
- dict (
67
- devices = sorted (
68
- [device .index for device in train .torch .get_device ()]
69
- )
70
- )
71
- )
72
- else :
73
- train .report (dict (devices = train .torch .get_device ().index ))
74
-
75
- trainer = TorchTrainerPatchedMultipleReturns (
74
+
75
+ devices = (
76
+ sorted ([device .index for device in train .torch .get_device ()])
77
+ if num_gpus_per_worker > 1
78
+ else train .torch .get_device ().index
79
+ )
80
+ write_rank_data (tmp_path , devices )
81
+
82
+ trainer = TorchTrainer (
76
83
train_fn ,
77
84
scaling_config = ScalingConfig (
78
85
num_workers = int (2 / num_gpus_per_worker ),
79
86
use_gpu = True ,
80
87
resources_per_worker = {"GPU" : num_gpus_per_worker },
81
88
),
82
89
)
83
- results = trainer .fit ()
84
- devices = [result ["devices" ] for result in results .metrics ["results" ]]
90
+ trainer .fit ()
91
+
92
+ rank_data = get_data_from_all_ranks (tmp_path )
93
+ devices = list (rank_data .values ())
85
94
86
95
if num_gpus_per_worker == 0.5 :
87
96
assert sorted (devices ) == [0 , 0 , 1 , 1 ]
@@ -97,21 +106,17 @@ def train_fn():
97
106
98
107
99
108
@pytest .mark .parametrize ("num_gpus_per_worker" , [0.5 , 1 , 2 ])
100
- def test_torch_get_device_dist (ray_2_node_2_gpu , num_gpus_per_worker ):
109
+ def test_torch_get_device_dist (ray_2_node_2_gpu , num_gpus_per_worker , tmp_path ):
101
110
@patch ("torch.cuda.is_available" , lambda : True )
102
111
def train_fn ():
103
- if num_gpus_per_worker > 1 :
104
- train .report (
105
- dict (
106
- devices = sorted (
107
- [device .index for device in train .torch .get_device ()]
108
- )
109
- )
110
- )
111
- else :
112
- train .report (dict (devices = train .torch .get_device ().index ))
113
-
114
- trainer = TorchTrainerPatchedMultipleReturns (
112
+ devices = (
113
+ sorted ([device .index for device in train .torch .get_device ()])
114
+ if num_gpus_per_worker > 1
115
+ else train .torch .get_device ().index
116
+ )
117
+ write_rank_data (tmp_path , devices )
118
+
119
+ trainer = TorchTrainer (
115
120
train_fn ,
116
121
# use gloo instead of nccl, since nccl is not supported
117
122
# on this virtual gpu ray environment
@@ -122,8 +127,10 @@ def train_fn():
122
127
resources_per_worker = {"GPU" : num_gpus_per_worker },
123
128
),
124
129
)
125
- results = trainer .fit ()
126
- devices = [result ["devices" ] for result in results .metrics ["results" ]]
130
+ trainer .fit ()
131
+
132
+ rank_data = get_data_from_all_ranks (tmp_path )
133
+ devices = list (rank_data .values ())
127
134
128
135
# cluster setups: 2 nodes, 2 gpus per node
129
136
# `CUDA_VISIBLE_DEVICES` is set to "0,1" on node 1 and node 2
0 commit comments