Tensorflow 分布式训练 horovod概念



在写这篇文章之前,压根就没使用过 tensorflow 分布式学习,以前日常工作的数据量也不需要这么大的计算资源,现在需要面临处理数亿级别的数据,所以为了提升模型训练的效率,必须要使用分布式训练,所以还是需要去学习分布式模型训练,并且还要修改源码支持分布式训练。

 为什么不选择传统分布式 TensorFlow?

The primary motivation for this project is to make it easy to take a single-GPU TensorFlow program and successfully train it on many GPUs faster. This has two aspects:

1、How much modification does one have to make to a program to make it distributed, and how easy is it to run it.

2、How much faster would it run in distributed mode?

Internally at Uber we found the MPI model to be much more straightforward and require far less code changes than the Distributed TensorFlow with parameter servers. See the Usage section for more details.

In addition to being easy to use, Horovod is fast. Below is a chart representing the benchmark that was done on 128 servers with 4 Pascal GPUs each connected by RoCE-capable 25 Gbit/s network:


Horovod achieves 90% scaling efficiency for both Inception V3 and ResNet-101, and 68% scaling efficiency for VGG-16. See the Benchmarks page to find out how to reproduce these numbers.

While installing MPI and NCCL itself may seem like an extra hassle, it only needs to be done once by the team dealing with infrastructure, while everyone else in the company who builds the models can enjoy the simplicity of training them at scale.


To install Horovod:

  1. Install Open MPI or another MPI implementation.

Steps to install Open MPI are listed here.

Note: Open MPI 3.1.3 has an issue that may cause hangs. It is recommended to downgrade to Open MPI 3.1.2 or upgrade to Open MPI 4.0.0.

  1. Install the horovod pip package.
$ pip install horovod

This basic installation is good for laptops and for getting to know Horovod. If you’re installing Horovod on a server with GPUs, read the Horovod on GPU page. If you want to use Docker, read the Horovod in Docker page.


Horovod core principles are based on MPI concepts such as sizeranklocal rankallreduceallgather and broadcast. See here for more details.


To use Horovod, make the following additions to your program:

  1. Run hvd.init() 初始化
  2. config.gpu_options.visible_device_list 主要是配置分布式训练所需要的gpu 设备,因为后面训练的时候对应的GPU 对应的相应的进程程序。每一个 gpu 设备都会有一个编号,这个会通过 local  rank 来确定。默认都是从 0开始的。
  3. Scale the learning rate by number of workers. Effective batch size in synchronous distributed training is scaled by the number of workers. An increase in learning rate compensates for the increased batch size.根据 worker 数量来扩大学习率。 同步分布式训练中的 batchsize 按 worker 数量进行调整。 学习率的增加补偿了 bastchsize 的增加。
  4. Wrap optimizer in hvd.DistributedOptimizer. The distributed optimizer delegates gradient computation to the original optimizer, averages gradients using allreduce or allgather, and then applies those averaged gradients. 这个 hvd.DistributedOptimizer  就涉及到分布式训练的优化策略,以前单机模型训练的时候所有的数据都是在一台机器上,数据都保存在一台机器的内存中,所以不存在通信的问题,现在在每一台机器上都在更新模型,那么互相之间需要去通信,通信的内容涉及到梯度的同步更新,这个算是分布式训练的核心部分。
  5. Add hvd.BroadcastGlobalVariablesHook(0) to broadcast initial variable states from rank 0 to all other processes. This is necessary to ensure consistent initialization of all workers when training is started with random weights or restored from a checkpoint. Alternatively, if you’re not using MonitoredTrainingSession, you can simply execute the hvd.broadcast_global_variables op after global variables have been initialized.  哈哈, BroadcastGlobalVariablesHook 让我想到了以前在使用 spark 撰写相关代码的时候使用广播的方式来实现数据的在各个worker上的拷贝,这样就不需要反复向master请求获取数据,避免了通信消耗的代价,这里也是同样的意思,就是事先在每个 worker 都同步一份数据过去即可。想要这样做也很简单,使用  MonitoredTrainingSession  (这个api 在在保存和恢复模型中会遇到)或者  BroadcastGlobalVariablesHook
  6.  Modify your code to save checkpoints only on worker 0 to prevent other workers from corrupting them. This can be accomplished by passing checkpoint_dir=None to tf.train.MonitoredTrainingSession if hvd.rank() != 0.  分布式训练的时候,保存 checkpoint 也要进行相应的修改了,这里是想在worker  0 上保存相应的模型。

Example (see the examples directory for full training examples):

<span class="pl-k">import</span> tensorflow <span class="pl-k">as</span> tf
<span class="pl-k">import</span> horovod.tensorflow <span class="pl-k">as</span> hvd

<span class="pl-c"># Initialize Horovod</span>

<span class="pl-c"># Pin GPU to be used to process local rank (one GPU per process)</span>
config <span class="pl-k">=</span> tf.ConfigProto()
config.gpu_options.visible_device_list <span class="pl-k">=</span> <span class="pl-c1">str</span>(hvd.local_rank())

<span class="pl-c"># Build model...</span>
loss <span class="pl-k">=</span> <span class="pl-c1">...</span>
opt <span class="pl-k">=</span> tf.train.AdagradOptimizer(<span class="pl-c1">0.01</span> <span class="pl-k">*</span> hvd.size())

<span class="pl-c"># Add Horovod Distributed Optimizer</span>
opt <span class="pl-k">=</span> hvd.DistributedOptimizer(opt)

<span class="pl-c"># Add hook to broadcast variables from rank 0 to all other processes during</span>
<span class="pl-c"># initialization.</span>
hooks <span class="pl-k">=</span> [hvd.BroadcastGlobalVariablesHook(<span class="pl-c1">0</span>)]

<span class="pl-c"># Make training operation</span>
train_op <span class="pl-k">=</span> opt.minimize(loss)

<span class="pl-c"># Save checkpoints only on worker 0 to prevent other workers from corrupting them.</span>
checkpoint_dir <span class="pl-k">=</span> <span class="pl-s"><span class="pl-pds">'</span>/tmp/train_logs<span class="pl-pds">'</span></span> <span class="pl-k">if</span> hvd.rank() <span class="pl-k">==</span> <span class="pl-c1">0</span> <span class="pl-k">else</span> <span class="pl-c1">None</span>

<span class="pl-c"># The MonitoredTrainingSession takes care of session initialization,</span>
<span class="pl-c"># restoring from a checkpoint, saving to a checkpoint, and closing when done</span>
<span class="pl-c"># or an error occurs.</span>
<span class="pl-k">with</span> tf.train.MonitoredTrainingSession(<span class="pl-v">checkpoint_dir</span><span class="pl-k">=</span>checkpoint_dir,
                                       <span class="pl-v">config</span><span class="pl-k">=</span>config,
                                       <span class="pl-v">hooks</span><span class="pl-k">=</span>hooks) <span class="pl-k">as</span> mon_sess:
  <span class="pl-k">while</span> <span class="pl-k">not</span> mon_sess.should_stop():
    <span class="pl-c"># Perform synchronous training.</span>

Running Horovod

The example commands below show how to run distributed training. See the Running Horovod page for more instructions, including RoCE/InfiniBand tweaks and tips for dealing with hangs.

  1. To run on a machine with 4 GPUs:
$ horovodrun -np 4 -H localhost:4 python train.py
  1. To run on 4 machines with 4 GPUs each:
$ horovodrun -np 16 -H server1:4,server2:4,server3:4,server4:4 python train.py
  1. To run in Docker, see the Horovod in Docker page.
  2. To run in Kubernetes, see KubeflowMPI OperatorHelm Chart, and FfDL.
  3. To run in Spark, see the Spark page.