在写这篇文章之前，压根就没使用过 tensorflow 分布式学习，以前日常工作的数据量也不需要这么大的计算资源，现在需要面临处理数亿级别的数据，所以为了提升模型训练的效率，必须要使用分布式训练，所以还是需要去学习分布式模型训练，并且还要修改源码支持分布式训练。
The primary motivation for this project is to make it easy to take a single-GPU TensorFlow program and successfully train it on many GPUs faster. This has two aspects:
1、How much modification does one have to make to a program to make it distributed, and how easy is it to run it.
2、How much faster would it run in distributed mode?
Internally at Uber we found the MPI model to be much more straightforward and require far less code changes than the Distributed TensorFlow with parameter servers. See the Usage section for more details.
In addition to being easy to use, Horovod is fast. Below is a chart representing the benchmark that was done on 128 servers with 4 Pascal GPUs each connected by RoCE-capable 25 Gbit/s network:
Horovod achieves 90% scaling efficiency for both Inception V3 and ResNet-101, and 68% scaling efficiency for VGG-16. See the Benchmarks page to find out how to reproduce these numbers.
While installing MPI and NCCL itself may seem like an extra hassle, it only needs to be done once by the team dealing with infrastructure, while everyone else in the company who builds the models can enjoy the simplicity of training them at scale.
To install Horovod:
- Install Open MPI or another MPI implementation.
Steps to install Open MPI are listed here.
Note: Open MPI 3.1.3 has an issue that may cause hangs. It is recommended to downgrade to Open MPI 3.1.2 or upgrade to Open MPI 4.0.0.
- Install the
$ pip install horovod
This basic installation is good for laptops and for getting to know Horovod. If you’re installing Horovod on a server with GPUs, read the Horovod on GPU page. If you want to use Docker, read the Horovod in Docker page.
To use Horovod, make the following additions to your program:
config.gpu_options.visible_device_list主要是配置分布式训练所需要的gpu 设备，因为后面训练的时候对应的GPU 对应的相应的进程程序。每一个 gpu 设备都会有一个编号，这个会通过 local rank 来确定。默认都是从 0开始的。
- Scale the learning rate by number of workers. Effective batch size in synchronous distributed training is scaled by the number of workers. An increase in learning rate compensates for the increased batch size.根据 worker 数量来扩大学习率。 同步分布式训练中的 batchsize 按 worker 数量进行调整。 学习率的增加补偿了 bastchsize 的增加。
- Wrap optimizer in
hvd.DistributedOptimizer. The distributed optimizer delegates gradient computation to the original optimizer, averages gradients using allreduce or allgather, and then applies those averaged gradients. 这个
hvd.BroadcastGlobalVariablesHook(0)to broadcast initial variable states from rank 0 to all other processes. This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint. Alternatively, if you’re not using
MonitoredTrainingSession, you can simply execute the
hvd.broadcast_global_variablesop after global variables have been initialized. 哈哈，
BroadcastGlobalVariablesHook让我想到了以前在使用 spark 撰写相关代码的时候使用广播的方式来实现数据的在各个worker上的拷贝，这样就不需要反复向master请求获取数据，避免了通信消耗的代价，这里也是同样的意思，就是事先在每个 worker 都同步一份数据过去即可。想要这样做也很简单，使用
- Modify your code to save checkpoints only on worker 0 to prevent other workers from corrupting them. This can be accomplished by passing
hvd.rank() != 0. 分布式训练的时候，保存 checkpoint 也要进行相应的修改了，这里是想在worker 0 上保存相应的模型。
Example (see the examples directory for full training examples):
import tensorflow as tf import horovod.tensorflow as hvd # Initialize Horovod hvd.init() # Pin GPU to be used to process local rank (one GPU per process) config = tf.ConfigProto() config.gpu_options.visible_device_list = str(hvd.local_rank()) # Build model... loss = ... opt = tf.train.AdagradOptimizer(0.01 * hvd.size()) # Add Horovod Distributed Optimizer opt = hvd.DistributedOptimizer(opt) # Add hook to broadcast variables from rank 0 to all other processes during # initialization. hooks = [hvd.BroadcastGlobalVariablesHook(0)] # Make training operation train_op = opt.minimize(loss) # Save checkpoints only on worker 0 to prevent other workers from corrupting them. checkpoint_dir = '/tmp/train_logs' if hvd.rank() == 0 else None # The MonitoredTrainingSession takes care of session initialization, # restoring from a checkpoint, saving to a checkpoint, and closing when done # or an error occurs. with tf.train.MonitoredTrainingSession(checkpoint_dir=checkpoint_dir, config=config, hooks=hooks) as mon_sess: while not mon_sess.should_stop(): # Perform synchronous training. mon_sess.run(train_op)
The example commands below show how to run distributed training. See the Running Horovod page for more instructions, including RoCE/InfiniBand tweaks and tips for dealing with hangs.
- To run on a machine with 4 GPUs:
$ horovodrun -np 4 -H localhost:4 python train.py
- To run on 4 machines with 4 GPUs each:
$ horovodrun -np 16 -H server1:4,server2:4,server3:4,server4:4 python train.py
- To run in Docker, see the Horovod in Docker page.
- To run in Kubernetes, see Kubeflow, MPI Operator, Helm Chart, and FfDL.
- To run in Spark, see the Spark page.