This repository was archived by the owner on Mar 31, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
This repository was archived by the owner on Mar 31, 2025. It is now read-only.
Implement Distributed Pytorch training #155
Copy link
Copy link
Open
Description
Modify train/_main.py to implement distributed training such as in the example code below. Also will need to request more than one cpu per task in configs/pipeline.yaml.
# The following has been insipred by the following tutorial:
# https://pytorch.org/tutorials/beginner/ddp_series_multigpu.html
import torch ### This module is needed for training
import torch.nn as nn ### This module is needed for training
import torch.optim as optim ### This module is needed for training
import torch.multiprocessing as mp ### This module is needed for DISTRIBUTED training
from torch.nn.parallel import DistributedDataParallel as DDP ### This module is needed for DISTRIBUTED training
from torch.utils.data.distributed import DistributedSampler ### This module is needed for DISTRIBUTED training
from torch.distributed import init_process_group, destroy_process_group ### This module is needed for DISTRIBUTED training
import torch.backends.cudnn as cudnn ### This module is needed for DISTRIBUTED training
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F
import os
############################### NeuralNetwork is a class that is defined as the the prediction model.
class NeuralNetwork(nn.Module):
def __init__(self):
super(NeuralNetwork, self).__init__()
self.layer1 = nn.Linear(20, 5)
self.relu = nn.ReLU()
self.layer2 = nn.Linear(5, 1)
def forward(self, input):
return self.layer2(self.relu(self.layer1(input)))
################################
############################### Random data is created for training purpose.
############################### In a real-world training task the following lines need to be replaced.
class MyTrainDataset(Dataset):
def __init__(self, size):
self.size = size
self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]
def __len__(self):
return self.size
def __getitem__(self, index):
return self.data[index]
##############################
##############################
############################## Constructing the process group for distributed training
# Rank is the id of each process. In this case, for each GPU. Indexing starts by zero.
# World_size is the total number of processes. In this case, the number of GPUs.
def ddp_setup(rank, world_size):
'''
Set up distributed data training.
Args:
rank (int): Unique identifier for each gpu
world_size (int): Total number of processes
'''
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355' # This is just a random port
init_process_group(backend="nccl", rank=rank, world_size=world_size) # nccl für cuda, gloo für cpu
torch.cuda.set_device(rank)
cudnn.benchmark = True
torch.manual_seed(123)
##############################
############################# main() function for distributed training
def main(rank: int, world_size: int):
ddp_setup(rank, world_size) # <--- This function is called to set up the distributed training environment
print('RANK:', rank) # DEBUG
data = MyTrainDataset(20480)
data_loader = DataLoader(
data,
batch_size=8,
pin_memory=True,
shuffle=False,
sampler=DistributedSampler(data) # <--- DistributedSampler required for distributed training on multiple GPUs
)
model = NeuralNetwork()
model = model.to(rank)
model = DDP(model, device_ids=[rank]) # <--- DDP required for distributed training on multiple GPUs
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
for epoch in range(5):
print('EPOCH:', epoch) # DEBUG
data_loader.sampler.set_epoch(epoch) # <--- This is required for DistributedSampler to shuffle the data
print('NUM BATCHES', len(data_loader)) # DEBUG
for i, (source, targets) in enumerate(data_loader):
source = source.to(rank)
targets = targets.to(rank)
optimizer.zero_grad()
output = model(source)
loss = F.cross_entropy(output, targets)
loss.backward()
optimizer.step()
if i % 10000 == 0:
print("Batch: ", i)
destroy_process_group()
#############################
############################# Running Distributed on multiple GPU Training
if __name__ == "__main__":
world_size = torch.cuda.device_count()
print("world_size: ", world_size)
mp.spawn(main, args=(world_size, ), nprocs=world_size) # <--- This is the function that starts the distributed training (rank is auto-allocated by DDP when calling mp.spawn)
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels