背景

点击率预估模型训练,在早期阶段由于模型结构比较简单,稀疏Embedding占比非常大而稠密参数较少,因此异步训练存在的参数更新冲突和延迟问题并不严重,异步训练是普遍采用的方式。随着Attention等复杂结构在稠密网络部分的应用,稠密参数的影响力变大,异步训练带来的参数更新问题越来越严重,制约着模型训练效果,另外随着GPU的应用,同步训练的性能问题也有缓解,所以同步训练渐渐成为主流。 同步训练有两种方式,一种是基于Parameter Server的同步训练,一种是基于AllReduce方式的训练。以目前推荐系统领域依然重度使用的TensorFlow为例,第一种经常采用TensorFlow SyncReplicasOptimizer,第二种经常采用Horovod TensorFlow。但这两种方式都存在一个简单却多年无人去解决的问题,对于用户群体这么大的框架来说,有点匪夷所思。

问题描述

同步训练的逻辑如下:

epoch = 0
while epoch < max_epoch:
try read batch_data from data_iterator:
1)前向计算;
2)后向计算获得梯度;
3)所有worker同步梯度;
4)参数更新;
except OutOfRange:
save_checkpoint
epoch++
第3步需要收集所有worker的梯度,这里会存在一个问题: 数据并行并不能保证每个worker的数据量一模一样,导致worker的batch_num = all_data_num / worker_num / batch_size可能稍有不同。在收集梯度时,可能有些worker上的数据已经消耗完,这样就永远无法集齐work_num个梯度,导致训练任务卡住。 这个问题在PS模式或AllReduce都存在:

不完美的解决方案

由于每个worker的batch_num都不一样,那就不用它了,用一个全局的max_step做停止条件。 具体的方式为:

step = 0
while step < max_step:
1)前向计算;
2)后向计算获得梯度;
3)所有worker同步梯度;
4)参数更新;
5)step++;
6)每n_step保存checkpoint;
这种方式是业界比较常见的方式,但还是有几个问题:

  • 由于每个worker都运行相同的步数,但每个worker的数据量不一致,就会有少量的数据多训练或少训练了一定次数。
  • max_step要提前算好,对于大数据集可能比较浪费时间,人工拍一个值的话又不准。
  • 对于稀疏特征的频次准入,只需要在第一轮进行累计和判断,第二轮起就不能再累计了,否则会严重过拟合,所以按轮次训练更优。

这里还有需要注意的几个点:

1)SyncReplicaOptimizer奇葩的同步机制设计问题:

https://github.com/tensorflow/tensorflow/issues/11753 https://stackoverflow.com/questions/36762872/distributed-tensorflow-tf-train-syncreplicasoptimizer-seems-not-synchronized

SyncReplicaOptimizer does not really care if the replicas_to_aggregate gradients come from the different workers or not. Even if other workers are waiting or not initialized, the chief starts training immediately. And if you print the global_steps you will see same global_steps for replicas_to_aggregate times. This means that the chief pushes enough gradients for tf.train.SyncReplicaOptimizer to average and apply the gradients. So, start the chief worker process only after starting all other workers.

原因是:

https://www.tensorflow.org/versions/r1.15/api_docs/python/tf/train/SyncReplicasOptimizer

Only after all variables have been updated, increment the global step. Only after step 4, pushes global_step in the token_queue, once for each worker replica. The workers can now fetch the global step, use it to update its local_step variable and start the next batch. Please note that some workers can consume multiple minibatches, while some may not consume even one. This is because each worker fetches minibatches as long as a token exists. If one worker is stuck for some reason and does not consume a token, another worker can use it.

为了解决这个问题,不要用:

hook = optimizer.make_session_run_hook(is_chief)
要用:
hook = optimizer.make_session_run_hook(is_chief, num_tokens=0)

2)Exception in thread QueueRunnerThread-dummy_queue-sync_token_q_EnqueueMany

https://github.com/tensorflow/tensorflow/issues/20833

因为每个worker的数据量不一致,在最后一个step的时候,有些worker已经结束了,但梯度收集器还在等待梯度入队列,等不到就会超时报异常。 为了解决这个问题,dataset不要只repeat N轮,需要设成无限,通过max_step进行停止。

不要用:

tf.data.SomeDataset(...).repeat(epoch=N).batch(batch_size)
要用:
tf.data.SomeDataset(...).repeat(epoch=-1).batch(batch_size)

3)根据样本量计算出来的max_step,实际运行的step数比这个少就退出了。

原因是丢弃了一些过时梯度。

https://www.tensorflow.org/versions/r1.15/api_docs/python/tf/train/SyncReplicasOptimizer

In a typical asynchronous training environment, it's common to have some stale gradients. For example, with a N-replica asynchronous training, gradients will be applied to the variables N times independently. Depending on each replica's training speed, some gradients might be calculated from copies of the variable from several steps back (N-1 steps on average). This optimizer avoids stale gradients by collecting gradients from all replicas, averaging them, then applying them to the variables in one shot, after which replicas can fetch the new variables and continue.

解决方案:同2,给数据多留一些buffer。

完美的解决方案

在TensorFlow/DNN时代之前,为了训练大规模离散LR、FFM类模型,我在公司内部开发了一个基于PS的分布式训练框架,其支持BSP、ASP、SSP三种同步方式,其中的BSP训练方式当时也面临着同样问题。我的解法是: 当有worker已经读完一轮数据后,它向ps节点发送一个worker_done信号,ps侧每轮会从0开始累加这个信号,记为worker_done_num,每次进行梯度收集时,只收集worker_num - worker_done_num个梯度就进行参数更新,这样就能完美的保证每条数据都训练max_epoch次并且不会卡住。