Skip to content

Commit

Permalink
[docs] add pages about examples on training language models with fair…
Browse files Browse the repository at this point in the history
…seq (ray-project#5755)

* add pages about examples on training language models with fairseq and ray autoscaler

* better format

* update ray_train.sh

* Move EFS to the autoscaler file

* nits

* add comments to the code & use a new way to implement checkpoint hook

* small bug fix

* polish the doc

* fix formatting

* yaml

* update docs

* fix the bugs and add preprocess.sh

* fix lint

* Reduce batch size & fix lint

* shorttitle
  • Loading branch information
zhuohan123 authored and richardliaw committed Oct 21, 2019
1 parent 6b36ef1 commit f286356
Show file tree
Hide file tree
Showing 8 changed files with 715 additions and 4 deletions.
130 changes: 130 additions & 0 deletions doc/examples/lm/lm-cluster.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# An unique identifier for the head node and workers of this cluster.
cluster_name: lm-cluster

# The minimum number of workers nodes to launch in addition to the head
# node. This number should be >= 0.
min_workers: 1

# The maximum number of workers nodes to launch in addition to the head
# node. This takes precedence over min_workers.
max_workers: 2

# The initial number of worker nodes to launch in addition to the head
# node. When the cluster is first brought up (or when it is refreshed with a
# subsequent `ray up`) this number of nodes will be started.
initial_workers: 1

# Whether or not to autoscale aggressively. If this is enabled, if at any point
# we would start more workers, we start at least enough to bring us to
# initial_workers.
autoscaling_mode: default


# The autoscaler will scale up the cluster to this target fraction of resource
# usage. For example, if a cluster of 10 nodes is 100% busy and
# target_utilization is 0.8, it would resize the cluster to 13. This fraction
# can be decreased to increase the aggressiveness of upscaling.
# This value must be less than 1.0 for scaling to happen.
target_utilization_fraction: 0.48

# If a node is idle for this many minutes, it will be removed.
idle_timeout_minutes: 5

# Cloud-provider specific configuration.
provider:
type: aws
region: us-west-2
# Availability zone(s), comma-separated, that nodes may be launched in.
# Nodes are currently spread between zones by a round-robin approach,
# however this implementation detail should not be relied upon.
availability_zone: us-west-2a,us-west-2b

# How Ray will authenticate with newly launched nodes.
auth:
ssh_user: ubuntu
# By default Ray creates a new private keypair, but you can also use your own.
# If you do so, make sure to also set "KeyName" in the head and worker node
# configurations below.
# ssh_private_key: /path/to/your/key.pem

# Provider-specific config for the head node, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as SubnetId and KeyName.
# For more documentation on available fields, see:
# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
head_node:
InstanceType: m5.xlarge
ImageId: ami-0b294f219d14e6a82 # Deep Learning AMI (Ubuntu) Version 21.0
SecurityGroupIds:
- "{{SecurityGroupId}}"
# You can provision additional disk space with a conf as follows
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs:
VolumeSize: 100

# Additional options in the boto docs.

# Provider-specific config for worker nodes, e.g. instance type. By default
# Ray will auto-configure unspecified fields such as SubnetId and KeyName.
# For more documentation on available fields, see:
# http://boto3.readthedocs.io/en/latest/reference/services/ec2.html#EC2.ServiceResource.create_instances
worker_nodes:
InstanceType: p3.2xlarge
ImageId: ami-0b294f219d14e6a82 # Deep Learning AMI (Ubuntu) Version 21.0
SecurityGroupIds:
- "{{SecurityGroupId}}"
# Run workers on spot by default. Comment this out to use on-demand.
InstanceMarketOptions:
MarketType: spot
# Additional options can be found in the boto docs, e.g.
# SpotOptions:
# MaxPrice: MAX_HOURLY_PRICE

# Additional options in the boto docs.

# List of shell commands to run to set up nodes.
setup_commands:
# Note: if you're developing Ray, you probably want to create an AMI that
# has your Ray repo pre-cloned. Then, you can replace the pip installs
# below with a git checkout <your_sha> (and possibly a recompile).
- echo 'export PATH="$HOME/anaconda3/envs/pytorch_p36/bin:$PATH"' >> ~/.bashrc;
source ~/.bashrc;
pip install -U ray;
pip install -U fairseq==0.8.0;
- sudo kill -9 `sudo lsof /var/lib/dpkg/lock-frontend | awk '{print $2}' | tail -n 1`;
sudo pkill -9 apt-get;
sudo pkill -9 dpkg;
sudo dpkg --configure -a;
sudo apt-get -y install binutils;
cd $HOME;
git clone https://github.com/aws/efs-utils;
cd $HOME/efs-utils;
./build-deb.sh;
sudo apt-get -y install ./build/amazon-efs-utils*deb;
cd $HOME;
mkdir efs;
sudo mount -t efs {{FileSystemId}}:/ efs;
sudo chmod 777 efs;

# Custom commands that will be run on the head node after common setup.
head_setup_commands:
- pip install boto3==1.4.8 # 1.4.8 adds InstanceMarketOptions

# Custom commands that will be run on worker nodes after common setup.
worker_setup_commands: []

# Command to start ray on the head node. You don't need to change this.
head_start_ray_commands:
- ray stop
- ulimit -n 65536;
ray start --head --redis-port=6379
--object-manager-port=8076
--autoscaling-config=~/ray_bootstrap_config.yaml

# Command to start ray on worker nodes. You don't need to change this.
worker_start_ray_commands:
- ray stop
- ulimit -n 65536;
ray start
--redis-address=$RAY_HEAD_IP:6379
--object-manager-port=8076
29 changes: 29 additions & 0 deletions doc/examples/lm/preprocess.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
cd ~/efs/lm

# download the dataset
wget https://s3.amazonaws.com/research.metamind.io/wikitext/wikitext-103-raw-v1.zip
unzip wikitext-103-raw-v1.zip
# encode it with the GPT-2 BPE
mkdir -p gpt2_bpe
wget -O gpt2_bpe/encoder.json https://dl.fbaipublicfiles.com/fairseq/gpt2_bpe/encoder.json
wget -O gpt2_bpe/vocab.bpe https://dl.fbaipublicfiles.com/fairseq/gpt2_bpe/vocab.bpe
wget https://raw.githubusercontent.com/pytorch/fairseq/master/examples/roberta/multiprocessing_bpe_encoder.py
for SPLIT in train valid test; do \
python multiprocessing_bpe_encoder.py \
--encoder-json gpt2_bpe/encoder.json \
--vocab-bpe gpt2_bpe/vocab.bpe \
--inputs wikitext-103-raw/wiki.${SPLIT}.raw \
--outputs wikitext-103-raw/wiki.${SPLIT}.bpe \
--keep-empty \
--workers 60; \
done
# preprocess/binarize the data using the GPT-2 fairseq dictionary
wget -O gpt2_bpe/dict.txt https://dl.fbaipublicfiles.com/fairseq/gpt2_bpe/dict.txt
fairseq-preprocess \
--only-source \
--srcdict gpt2_bpe/dict.txt \
--trainpref wikitext-103-raw/wiki.train.bpe \
--validpref wikitext-103-raw/wiki.valid.bpe \
--testpref wikitext-103-raw/wiki.test.bpe \
--destdir data-bin/wikitext-103 \
--workers 60
181 changes: 181 additions & 0 deletions doc/examples/lm/ray_train.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
#!/usr/bin/env python3 -u

import math
import copy
import socket
import time

import ray

import fairseq
from fairseq import options
from fairseq_cli.train import main
from contextlib import closing

_original_save_checkpoint = fairseq.checkpoint_utils.save_checkpoint


class RayDistributedActor:
"""Actor to perform distributed training."""

def run(self, url, world_rank, args):
"""Runs the fairseq training.
We set args for different ray actors for communication,
add a checkpoint hook, and call the main function of fairseq.
"""

# Set the init_method and rank of the process for distributed training.
print("Ray worker at {url} rank {rank}".format(
url=url, rank=world_rank))
self.url = url
self.world_rank = world_rank
args.distributed_rank = world_rank
args.distributed_init_method = url

# Add a checkpoint hook to make use of new resources.
self.add_checkpoint_hook(args)

# Call the original main function of fairseq.
main(args, init_distributed=(args.distributed_world_size > 1))

def add_checkpoint_hook(self, args):
"""Add a hook to the original save_checkpoint function.
This checks if there are new computational resources available.
If so, raise exception to restart the training process and
make use of the new resources.
"""

if args.cpu:
original_n_cpus = args.distributed_world_size

def _new_save_checkpoint(*args, **kwargs):
_original_save_checkpoint(*args, **kwargs)
n_cpus = int(ray.cluster_resources()["CPU"])
if n_cpus > original_n_cpus:
raise Exception(
"New CPUs find (original %d CPUs, now %d CPUs)" %
(original_n_cpus, n_cpus))
else:
original_n_gpus = args.distributed_world_size

def _new_save_checkpoint(*args, **kwargs):
_original_save_checkpoint(*args, **kwargs)
n_gpus = int(ray.cluster_resources().get("GPU", 0))
if n_gpus > original_n_gpus:
raise Exception(
"New GPUs find (original %d GPUs, now %d GPUs)" %
(original_n_gpus, n_gpus))

fairseq.checkpoint_utils.save_checkpoint = _new_save_checkpoint

def get_node_ip(self):
"""Returns the IP address of the current node."""
return ray.services.get_node_ip_address()

def find_free_port(self):
"""Finds a free port on the current node."""
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(("", 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]


def run_fault_tolerant_loop():
"""Entrance function to the fairseq library, providing fault-tolerance."""

# Parse the command line arguments.
parser = options.get_training_parser()
add_ray_args(parser)
args = options.parse_args_and_arch(parser)
original_args = copy.deepcopy(args)

# Main loop for fault-tolerant training.
retry = True
while retry:
args = copy.deepcopy(original_args)

# Initialize Ray.
ray.init(address=args.ray_address)

set_num_resources(args)
set_batch_size(args)

# Set up Ray distributed actors.
Actor = ray.remote(
num_cpus=1, num_gpus=int(not args.cpu))(RayDistributedActor)
workers = [Actor.remote() for i in range(args.distributed_world_size)]

# Get the IP address and a free port of actor 0, which is used for
# fairseq distributed training.
ip = ray.get(workers[0].get_node_ip.remote())
port = ray.get(workers[0].find_free_port.remote())
address = "tcp://{ip}:{port}".format(ip=ip, port=port)

# Start the remote processes, and check whether their are any process
# fails. If so, restart all the processes.
unfinished = [
worker.run.remote(address, i, args)
for i, worker in enumerate(workers)
]
try:
while len(unfinished) > 0:
finished, unfinished = ray.wait(unfinished)
finished = ray.get(finished)
retry = False
except Exception as inst:
print("Ray restart because following error occurs:")
print(inst)
retry = True
ray.shutdown()


def add_ray_args(parser):
"""Add ray and fault-tolerance related parser arguments to the parser."""
group = parser.add_argument_group("Ray related arguments")
group.add_argument(
"--ray-address",
default="auto",
type=str,
help="address for ray initialization")
group.add_argument(
"--fix-batch-size",
default=None,
metavar="B1,B2,...,B_N",
type=lambda uf: options.eval_str_list(uf, type=int),
help="fix the actual batch size (max_sentences * update_freq "
"* n_GPUs) to be the fixed input values by adjusting update_freq "
"accroding to actual n_GPUs; the batch size is fixed to B_i for "
"epoch i; all epochs >N are fixed to B_N")
return group


def set_num_resources(args):
"""Get the number of resources and set the corresponding fields."""
if args.cpu:
args.distributed_world_size = int(ray.cluster_resources()["CPU"])
else:
n_gpus = int(ray.cluster_resources().get("GPU", 0))
while n_gpus == 0:
print("No GPUs available, wait 10 seconds")
time.sleep(10)
n_gpus = int(ray.cluster_resources().get("GPU", 0))
args.distributed_world_size = n_gpus


def set_batch_size(args):
"""Fixes the total batch_size to be agnostic to the GPU count."""
if args.fix_batch_size is not None:
args.update_freq = [
math.ceil(batch_size /
(args.max_sentences * args.distributed_world_size))
for batch_size in args.fix_batch_size
]
print("Training on %d GPUs, max_sentences=%d, update_freq=%s" %
(args.distributed_world_size, args.max_sentences,
repr(args.update_freq)))


if __name__ == "__main__":
run_fault_tolerant_loop()
26 changes: 26 additions & 0 deletions doc/examples/lm/ray_train.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/bash

TOTAL_UPDATES=125000 # Total number of training steps
WARMUP_UPDATES=10000 # Warmup the learning rate over this many updates
PEAK_LR=0.0005 # Peak learning rate, adjust as needed
TOKENS_PER_SAMPLE=512 # Max sequence length
MAX_POSITIONS=512 # Num. positional embeddings (usually same as above)
MAX_SENTENCES=8 # Number of sequences per batch on one GPU (batch size)
FIX_BATCH_SIZE=2048 # Number of batch size in total (max_sentences * update_freq * n_gpus)
SAVE_INTERVAL_UPDATES=1000 # save a checkpoint every N updates

LOG_DIR=$HOME/efs/lm/log/
DATA_DIR=$HOME/efs/lm/data-bin/wikitext-103/
mkdir -p $LOG_DIR

python $HOME/efs/lm/ray_train.py --fp16 $DATA_DIR \
--task masked_lm --criterion masked_lm \
--arch roberta_base --sample-break-mode complete --tokens-per-sample $TOKENS_PER_SAMPLE \
--optimizer adam --adam-betas '(0.9, 0.98)' --adam-eps 1e-6 --clip-norm 0.0 \
--lr-scheduler polynomial_decay --lr $PEAK_LR --warmup-updates $WARMUP_UPDATES --total-num-update $TOTAL_UPDATES \
--dropout 0.1 --attention-dropout 0.1 --weight-decay 0.01 \
--max-sentences $MAX_SENTENCES \
--fix-batch-size $FIX_BATCH_SIZE \
--max-update $TOTAL_UPDATES --log-format simple --log-interval 1 \
--save-interval-updates $SAVE_INTERVAL_UPDATES \
--save-dir $LOG_DIR --ddp-backend=no_c10d
4 changes: 4 additions & 0 deletions doc/examples/overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,7 @@ Examples Overview
.. customgalleryitem::
:tooltip: Implement a simple streaming application using Ray’s actors.
:description: :doc:`/auto_examples/plot_streaming`

.. customgalleryitem::
:tooltip: Distributed Fault-Tolerant BERT training for FAIRSeq using Ray.
:description: :doc:`/auto_examples/plot_example-lm`
Loading

0 comments on commit f286356

Please sign in to comment.