From 1995b6bfa43655993ab22d61f5ce93421bebbdda Mon Sep 17 00:00:00 2001 From: Rahul Huilgol Date: Tue, 10 Apr 2018 12:40:40 -0700 Subject: [PATCH] [MXNET-37] tutorial for distributed training (#9152) * initial draft * WIP tutorial * WIP tutorial, with mnist script changes Signed-off-by: Rahul * WIP tutorial, with mnist script changes Signed-off-by: Rahul * use logger Signed-off-by: Rahul * remove from old page Signed-off-by: Rahul * first draft of tutorial and removing pythonpath inserts for get_data, by moving them to test_utils Signed-off-by: Rahul * fix typos Signed-off-by: Rahul * rename functions Signed-off-by: Rahul * small change in section heading Signed-off-by: Rahul * fix reimport Signed-off-by: Rahul * Update distributed_training.md * Update distributed_training.md Punctuation and minor changes * fix gluon iterators and address some review comments Signed-off-by: Rahul * Update multi_devices.md * Update distributed_training.md indentation change * Update distributed_training.md cmake instruction * retain only doc changes * comments addressed * fix link of gradient compression page * clarifying launch.py usage * update env var info * update broken links * update comment on splitting data --- docs/faq/distributed_training.md | 288 +++++++++++++++++++++++++++++++ docs/faq/index.md | 6 +- docs/faq/multi_devices.md | 132 +------------- 3 files changed, 295 insertions(+), 131 deletions(-) create mode 100644 docs/faq/distributed_training.md diff --git a/docs/faq/distributed_training.md b/docs/faq/distributed_training.md new file mode 100644 index 000000000000..70078ba60957 --- /dev/null +++ b/docs/faq/distributed_training.md @@ -0,0 +1,288 @@ +# Distributed Training in MXNet +MXNet supports distributed training enabling us to leverage multiple machines for faster training. +In this document, we describe how it works, how to launch a distributed training job and +some environment variables which provide more control. + +## Types of Parallelism +There are two ways in which we can distribute the workload of training a neural network across multiple devices (can be either GPU or CPU). +The first way is *data parallelism*, which refers to the case where each device stores a complete copy of the model. +Each device works with a different part of the dataset, and the devices collectively update a shared model. +These devices can be located on a single machine or across multiple machines. +In this document, we describe how to train a model with devices distributed across machines in a data parallel way. + +When models are so large that they don't fit into device memory, then a second way called *model parallelism* is useful. +Here, different devices are assigned the task of learning different parts of the model. +Currently, MXNet supports Model parallelism in a single machine only. Refer [Training with multiple GPUs using model parallelism](https://mxnet.incubator.apache.org/versions/master/faq/model_parallel_lstm.html) for more on this. + +## How Does Distributed Training Work? +The following concepts are key to understanding distributed training in MXNet: +### Types of Processes +MXNet has three types of processes which communicate with each other to accomplish training of a model. +- Worker: A worker node actually performs training on a batch of training samples. +Before processing each batch, the workers pull weights from servers. +The workers also send gradients to the servers after each batch. +Depending on the workload for training a model, it might not be a good idea to run multiple worker processes on the same machine. +- Server: There can be multiple servers which store the model's parameters, and communicate with workers. +A server may or may not be co-located with the worker processes. +- Scheduler: There is only one scheduler. The role of the scheduler is to set up the cluster. This includes waiting for messages that each node has come up and which port the node is listening on. +The scheduler then lets all processes know about every other node in the cluster, so that they can communicate with each other. + +### KV Store +MXNet provides a key-value store, which is a critical component used for multi-device training. The communication of parameters across devices on a single machine, as well as across multiple machines, is relayed through one or more servers with a key-value store for the parameters. Each value in this store is represented by a key and value, where each parameter array in the network is assigned a key, and value refers to the weights of that parameter array. Workers `push` gradients after processing a batch, and `pull` updated weights before processing a new batch. +We can also pass in optimizers for the KVStore to use while updating each weight. Optimizers like Stochastic Gradient Descent define an update rule, +essentially a mathematical formula to compute the new weight based on the old weight, gradient, and some parameters. + +If you are using a Gluon Trainer object or the Module API, +it uses a kvstore object internally to aggregate gradients from multiple devices on the same machine as well as across different machines. + +Although the API remains the same whether or not multiple machines are being used, +the notion of kvstore server exists only during distributed training. +In this case, each `push` and `pull` involves communication with the kvstore servers. When there are multiple devices on a single machine, gradients from these devices are first aggregated on the machine and then sent to the servers. +Note that we need to compile MXNet with the build flag `USE_DIST_KVSTORE=1` to use distributed training. + +The distributed mode of KVStore is enabled by calling `mxnet.kvstore.create` function +with a string argument which contains the word `dist` as follows: +> kv = mxnet.kvstore.create('dist_sync') + +Refer [KVStore API](https://mxnet.incubator.apache.org/versions/master/api/python/kvstore/kvstore.html) for more information about KVStore. + +### Distribution of Keys +Each server doesn't necessarily store all the keys or parameter arrays. +Parameters are distributed across different servers. The decision of which server stores a particular key is made at random. +This distribution of keys across different servers is handled transparently by the KVStore. +It ensures that when a key is pulled, that request is sent to the server which has the corresponding value. +If the value of some key is very large, it may be sharded across different servers. This means that different servers hold different parts of the value. +Again, this is handled transparently so that the worker does not have to do anything different. +The threshold for this sharding can be controlled with the environment variable `MXNET_KVSTORE_BIGARRAY_BOUND`. +See [environment variables](#environment-variables) for more details. + +### Split training data +When running distributed training in data parallel mode, we want each machine to be working on different parts of the dataset. + +For data parallel training on a single worker, +we can use `mxnet.gluon.utils.split_and_load` to split a batch of samples provided by the data iterator, and then load each part of the batch on the device which will process it. + +In the case of distributed training though, we would need to divide the dataset into `n` parts at the beginning, so that each worker gets a different part. Each worker can then use `split_and_load` to again divide that part of the dataset across different devices on a single machine. + +Typically, this split of data for each worker happens through the data iterator, +on passing the number of parts and the index of parts to iterate over. +Some iterators in MXNet that support this feature are [mxnet.io.MNISTIterator](https://mxnet.incubator.apache.org/versions/master/api/python/io/io.html#mxnet.io.MNISTIter) and [mxnet.io.ImageRecordIter](https://mxnet.incubator.apache.org/versions/master/api/python/io/io.html#mxnet.io.ImageRecordIter). +If you are using a different iterator, you can look at how the above iterators implement this. +We can use the kvstore object to get the number of workers (`kv.num_workers`) and rank of the current worker (`kv.rank`). +These can be passed as arguments to the iterator. +You can look at [example/gluon/image_classification.py](https://github.com/apache/incubator-mxnet/blob/master/example/gluon/image_classification.py) +to see an example usage. + +### Different Modes of Distributed Training +Distributed training itself is enabled when kvstore creation string contains the word `dist`. + +Different modes of distributed training can be enabled by using different types of kvstore. + +- `dist_sync`: In synchronous distributed training, all workers use the same synchronized set of model parameters at the start of every batch. +This means that after each batch, the server waits to receive gradients from each worker before it updates the model parameters. +This synchronization comes at a cost because the worker pulling parameters would have to wait till the server finishes this process. +In this mode, if a worker crashes, then it halts the progress of all workers. + +- `dist_async`: In asynchronous distributed training, the server receives gradients from one worker and immediately updates its store, which it uses to respond to any future pulls. +This means that a worker who finishes processing a batch can pull the current parameters from server and start the next batch, +even if other workers haven't finished processing the earlier batch. +This is faster than `dist_sync` but can take more epochs to converge. +In `async` mode, it is required to pass an optimizer because in the absence of an optimizer kvstore would replace the stored weights with received weights and this doesn't make sense for training in asynchronous mode. +The update of weights is atomic, meaning no two updates happen on the same weight at the same time. However, the order of updates is not guaranteed. + +- `dist_sync_device`: Same as `dist_sync` except that when there are multiple GPUs being used on each node, +this mode aggregates gradients and updates weights on GPU while dist_sync does so on CPU memory. +This is faster than `dist_sync` because it reduces expensive communication between GPU and CPU, but it increases memory usage on GPU. + +- `dist_async_device` : The analogue of `dist_sync_device` but in asynchronous mode. + + +### Gradient Compression +When communication is expensive, and the ratio of computation time to communication time is low, communication can become a bottleneck. +In such cases, gradient compression can be used to reduce the cost of communication, thereby speeding up training. +Refer [Gradient compression](https://mxnet.incubator.apache.org/versions/master/faq/gradient_compression.html) for more details. + +Note: For small models when the cost of computation is much lower than cost of communication, +distributed training might actually be slower than training on a single machine because of the overhead of communication and synchronization. + +## How to Start Distributed Training? +MXNet provides a script tools/launch.py to make it easy to launch a distributed training job. This supports various types of cluster resource managers like `ssh`, `mpirun`, `yarn` and `sge`. +If you already have one of these clusters setup, you can skip the next section on setting up a cluster. +If you want to use a type of cluster not mentioned above, skip ahead to Manually launching jobs section. + +### Setting up the Cluster +An easy way to set up a cluster of EC2 instances for distributed deep learning is by using the [AWS CloudFormation template](https://github.com/awslabs/deeplearning-cfn). +If you can not use the above, this section will help you manually set up a cluster of instances +to enable you to use `ssh` for launching a distributed training job. +Let us denote one machine as the `master` of the cluster through which we will launch and monitor the distributed training on all machines. + +If the machines in your cluster are a part of a cloud computing platform like AWS EC2, then your instances should be using key-based authentication already. +Ensure that you create all instances using the same key, say `mxnet-key` and in the same security group. +Next, we need to ensure that master has access to all other machines in the cluster through `ssh` by +adding this key to [ssh-agent](https://en.wikipedia.org/wiki/Ssh-agent) and forwarding it to master when we log in. This will make `mxnet-key` the default key on master. + +``` +ssh-add .ssh/mxnet-key +ssh -A user@MASTER_IP_ADDRESS +``` + + +If your machines use passwords for authentication, see [here](https://help.ubuntu.com/community/SSH/OpenSSH/Keys) for instructions on setting up password-less authentication between machines. + + +It is easier if all these machines have a shared file system so that they can access the training script. One way is to use Amazon Elastic File System to create your network file system. +The options in the following command are the recommended options when mounting an AWS Elastic File System. + +``` +sudo mkdir efs && sudo mount -t nfs4 -o nfsvers=4.1,rsize=1048576,wsize=1048576,hard,timeo=600,retrans=2 NETWORK_FILE_SYSTEM_IP:/ efs +``` + +Tip: You might find it helpful to store large datasets on S3 for easy access from all machines in the cluster. Refer [Using data from S3 for training](https://mxnet.incubator.apache.org/versions/master/faq/s3_integration.html) for more information. + +### Using Launch.py +MXNet provides a script [tools/launch.py](https://github.com/apache/incubator-mxnet/blob/master/tools/launch.py) to make it easy to launch distributed training on a cluster with `ssh`, `mpi`, `sge` or `yarn`. +You can fetch this script by cloning the mxnet repository. + +``` +git clone --recursive https://github.com/apache/incubator-mxnet +``` + +#### Example +Let us consider training a VGG11 model on the CIFAR10 dataset using [example/gluon/image_classification.py](https://github.com/apache/incubator-mxnet/blob/master/example/gluon/image_classification.py). +``` +cd example/gluon/ +``` +On a single machine, we can run this script as follows: +``` +python image_classification.py --dataset cifar10 --model vgg11 --num-epochs 1 +``` + +For distributed training of this example, we would do the following: + +If the mxnet directory which contains the script `image_classification.py` is accessible to all machines in the cluster (for example if they are on a network file system), we can run: +``` +../../tools/launch.py -n 3 -H hosts --launcher ssh python image_classification.py --dataset cifar10 --model vgg11 --num-epochs 1 --kvstore dist_sync +``` + +If the directory with the script is not accessible from the other machines in the cluster, then we can synchronize the current directory to all machines. +``` +../../tools/launch.py -n 3 -H hosts --launcher ssh --sync-dst-dir /tmp/mxnet_job/ python image_classification.py --dataset cifar10 --model vgg11 --num-epochs 1 --kvstore dist_sync +``` + +> Tip: If you don't have a cluster ready and still want to try this out, pass the option `--launcher local` instead of `ssh` + +#### Options +Here, launch.py is used to submit the distributed training job. It takes the following options: +- `-n` denotes the number of worker nodes to be launched. +- `-s` denotes the number of server nodes to be launched. +If it is not specified, it is taken to be equal to the number of worker nodes. +The script tries to cycle through the hosts file to launch the servers and workers. +For example, if you have 5 hosts in the hosts file and you passed `n` as 3 (and nothing for `s`). +The script will launch a total of 3 server processes, +one each for the first three hosts and launch a total of 3 worker processes, one each for the fourth, fifth and first host. +If the hosts file has exactly `n` number of worker nodes, it will launch a server process and a worker process on each of the `n` hosts. +- `--launcher` denotes the mode of communication. The options are: + - `ssh` if machines can communicate through ssh without passwords. This is the default launcher mode. + - `mpi` if Open MPI is available + - `sge` for Sun Grid Engine + - `yarn` for Apache Yarn + - `local` for launching all processes on the same local machine. This can be used for debugging purposes. +- `-H` requires the path of the hosts file + This file contains IPs of the machines in the cluster. These machines should be able to communicate with each other without using passwords. + This file is only applicable and required when the launcher mode is `ssh` or `mpi`. + An example of the contents of the hosts file would be: + ``` + 172.30.0.172 + 172.31.0.173 + 172.30.1.174 + ``` +- `--sync-dst-dir` takes the path of a directory on all hosts to which the current working directory will be synchronized. This only supports `ssh` launcher mode. +This is necessary when the working directory is not accessible to all machines in the cluster. Setting this option synchronizes the current directory using rsync before the job is launched. +If you have not installed MXNet system-wide +then you have to copy the folder `python/mxnet` and the file `lib/libmxnet.so` into the current directory before running `launch.py`. +For example if you are in `example/gluon`, you can do this with `cp -r ../../python/mxnet ../../lib/libmxnet.so .`. This would work if your `lib` folder contains `libmxnet.so`, as would be the case when you use make. If you use CMake, this file would be in your `build` directory. + +- `python image_classification.py --dataset cifar10 --model vgg11 --num-epochs 1 --kvstore dist_sync` +is the command for the training job on each machine. Note the use of `dist_sync` for the kvstore used in the script. + +#### Terminating Jobs +If the training job crashes due to an error or if we try to terminate the launch script while training is running, +jobs on all machines might not have terminated. In such a case, we would need to terminate them manually. +If we are using `ssh` launcher, this can be done by running the following command where `hosts` is the path of the hostfile. +``` +while read -u 10 host; do ssh -o "StrictHostKeyChecking no" $host "pkill -f python" ; done 10=4, we suggest using `device` for better performance. -## Distributed Training with Multiple Machines - -`KVStore` also supports a number of options for running on multiple machines. - -- `dist_sync` behaves similarly to `local` but exhibits one major difference. - With `dist_sync`, `batch-size` now means the batch size used on each machine. - So if there are *n* machines and we use batch size *b*, - then `dist_sync` behaves like `local` with batch size *n\*b*. -- `dist_device_sync` is similar to `dist_sync`. The difference between them is that - `dist_device_sync` aggregates gradients and updates weight on GPUs - while `dist_sync` does so on CPU memory. -- `dist_async` performs asynchronous updates. - The weight is updated whenever gradients are received from any machine. - The update is atomic, i.e., no two updates happen on the same weight at the same time. - However, the order is not guaranteed. - -### How to Launch a Job - -> To use distributed training, we need to compile with `USE_DIST_KVSTORE=1` -> (see [MXNet installation guide](http://mxnet.io/install/index.html) for more options). - -Launching a distributed job is a bit different from running on a single -machine. MXNet provides -[tools/launch.py](https://github.com/dmlc/mxnet/blob/master/tools/launch.py) to -start a job by using `ssh`, `mpi`, `sge`, or `yarn`. - -An easy way to set up a cluster of EC2 instances for distributed deep learning -is using an [AWS CloudFormation template](https://github.com/awslabs/deeplearning-cfn). -If you do not have a cluster, you can check the repository before you continue. - -Assume we are at the directory `mxnet/example/image-classification` -and want to train LeNet to classify MNIST images, as demonstrated here: -[train_mnist.py](https://github.com/dmlc/mxnet/blob/master/example/image-classification/train_mnist.py). - -On a single machine, we can run: - -```bash -python train_mnist.py --network lenet -``` - -Now, say we are given two ssh-able machines and _MXNet_ is installed on both machines. -We want to train LeNet on these two machines. -First, we save the IPs (or hostname) of these two machines in file `hosts`, e.g. - -```bash -$ cat hosts -172.30.0.172 -172.30.0.171 -``` - -Next, if the mxnet folder is accessible from both machines, e.g. on a -[network filesystem](https://help.ubuntu.com/lts/serverguide/network-file-system.html), -then we can run: - -```bash -python ../../tools/launch.py -n 2 --launcher ssh -H hosts python train_mnist.py --network lenet --kv-store dist_sync -``` - -Note that here we - -- use `launch.py` to submit the job. -- provide launcher, `ssh` if all machines are ssh-able, `mpi` if `mpirun` is - available, `sge` for Sun Grid Engine, and `yarn` for Apache Yarn. -- `-n` number of worker nodes to run on -- `-H` the host file which is required by `ssh` and `mpi` -- `--kv-store` use either `dist_sync` or `dist_async` -- `-s` number of server nodes to run on -- If the `-s` argument is not passed, it will keep the number of servers same as number of workers -- The launch.py script tries to cycle through the hosts file to launch the servers and workers. For example, - let's say you have `5` hosts in the hosts file and you passed n as `3`(and nothing for s). - The script will launch a total of `3` server processes, one each for the first three hosts and - launch a total of `3` worker processes, one each for the fourth, fifth and first host. -- If the hosts file has exactly `n` number of worker nodes which is passed as an argument with `-n`, it will launch - a server process and a worker process on each of the `n` hosts. - - -### Synchronize Directory - -Now consider if the mxnet folder is not accessible. -We can first copy the `MXNet` library to this folder by -```bash -cp -r ../../python/mxnet . -cp -r ../../lib/libmxnet.so mxnet -``` - -then ask `launch.py` to synchronize the current directory to all machines' - `/tmp/mxnet` directory with `--sync-dst-dir` - -```bash -python ../../tools/launch.py -n 2 -H hosts --sync-dst-dir /tmp/mxnet \ - python train_mnist.py --network lenet --kv-store dist_sync -``` - - -### Gradient compression - -If your model has fully connected components or recurrent neural networks, you may achieve increased training speed using gradient compression with potentially slight loss of accuracy. Please see [Gradient Compression](https://mxnet.incubator.apache.org/versions/master/faq/gradient_compression.html) for more details on when and how to use it. For the above example, gradient compression can be enabled by running the following: - -```bash -python ../../tools/launch.py -n 2 --launcher ssh -H hosts python train_mnist.py --network lenet \ - --kv-store dist_sync --gc-type 2bit -``` - -In this example, `gc-type` has been set to `2bit`, to enable two bit gradient compression. - - -### Use a Particular Network Interface - -_MXNet_ often chooses the first available network interface. -But for machines that have multiple interfaces, -we can specify which network interface to use for data -communication by the environment variable `DMLC_INTERFACE`. -For example, to use the interface `eth0`, we can - -``` -export DMLC_INTERFACE=eth0; python ../../tools/launch.py ... -``` - -### Debug Connection - -Set`PS_VERBOSE=1` to see the debug logging, e.g -``` -export PS_VERBOSE=1; python ../../tools/launch.py ... -``` - -### More - -- See more launch options by `python ../../tools/launch.py -h` -- See more options of [ps-lite](https://ps-lite.readthedocs.io/en/latest) +## Distributed training with multiple devices across machines +Refer [Distributed training](https://mxnet.incubator.apache.org/versions/master/how_to/distributed_training.html) +for information on how distributed training works and how to use it.