Skip to content
This repository was archived by the owner on Mar 31, 2025. It is now read-only.
This repository was archived by the owner on Mar 31, 2025. It is now read-only.

Implement Distributed Pytorch training #155

@qsimeon

Description

@qsimeon

Modify train/_main.py to implement distributed training such as in the example code below. Also will need to request more than one cpu per task in configs/pipeline.yaml.


# The following has been insipred by the following tutorial:
# https://pytorch.org/tutorials/beginner/ddp_series_multigpu.html

import torch                    ### This module is needed for training
import torch.nn as nn           ### This module is needed for training
import torch.optim as optim     ### This module is needed for training
import torch.multiprocessing as mp ### This module is needed for DISTRIBUTED training
from torch.nn.parallel import DistributedDataParallel as DDP ### This module is needed for DISTRIBUTED training
from torch.utils.data.distributed import DistributedSampler ### This module is needed for DISTRIBUTED training
from torch.distributed import init_process_group, destroy_process_group ### This module is needed for DISTRIBUTED training
import torch.backends.cudnn as cudnn ### This module is needed for DISTRIBUTED training
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F
import os

############################### NeuralNetwork is a class that is defined as the the prediction model. 

class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.layer1 = nn.Linear(20, 5)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(5, 1)

    def forward(self, input):
        return self.layer2(self.relu(self.layer1(input)))

################################


############################### Random data is created for training purpose.
############################### In a real-world training task the following lines need to be replaced. 

class MyTrainDataset(Dataset):
    def __init__(self, size):
        self.size = size
        self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]

    def __len__(self):
        return self.size
    
    def __getitem__(self, index):
        return self.data[index]

##############################


##############################
############################## Constructing the process group for distributed training

# Rank is the id of each process. In this case, for each GPU. Indexing starts by zero.
# World_size is the total number of processes. In this case, the number of GPUs.

def ddp_setup(rank, world_size):
    '''
    Set up distributed data training.

    Args:
        rank (int): Unique identifier for each gpu
        world_size (int): Total number of processes
    '''

    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355' # This is just a random port
    
    init_process_group(backend="nccl", rank=rank, world_size=world_size) # nccl für cuda, gloo für cpu
    torch.cuda.set_device(rank)
    cudnn.benchmark = True
    torch.manual_seed(123)

##############################


############################# main() function for distributed training

def main(rank: int, world_size: int):
    
    ddp_setup(rank, world_size) # <--- This function is called to set up the distributed training environment
    
    print('RANK:', rank) # DEBUG
    data = MyTrainDataset(20480)

    data_loader = DataLoader(
        data,
        batch_size=8,
        pin_memory=True,
        shuffle=False,
        sampler=DistributedSampler(data) # <--- DistributedSampler required for distributed training on multiple GPUs
    )

    model = NeuralNetwork()
    model = model.to(rank)
    model = DDP(model, device_ids=[rank]) # <--- DDP required for distributed training on multiple GPUs

    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

    for epoch in range(5):
        
        print('EPOCH:', epoch) # DEBUG
        data_loader.sampler.set_epoch(epoch) # <--- This is required for DistributedSampler to shuffle the data
        
        print('NUM BATCHES', len(data_loader)) # DEBUG
        for i, (source, targets) in enumerate(data_loader):

            source = source.to(rank)
            targets = targets.to(rank)

            optimizer.zero_grad()

            output = model(source)
            loss = F.cross_entropy(output, targets)
            loss.backward()
            optimizer.step()

            if i % 10000 == 0:
                print("Batch: ", i)

    destroy_process_group()


#############################
############################# Running Distributed on multiple GPU Training

if __name__ == "__main__":
    world_size = torch.cuda.device_count()
    print("world_size: ", world_size)
    mp.spawn(main, args=(world_size, ), nprocs=world_size) # <--- This is the function that starts the distributed training (rank is auto-allocated by DDP when calling mp.spawn)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions