背景

2018年,公司的分布式模型训练普遍向TensorFlow on Yarn迁移。在公司的Hadoop集群上,使用TensorFlow通过DataSet读数据方式进行分布式训练时,在每个Epoch的最后一个Batch会卡住,导致任务一直停在那里无法结束。集群节点都是CentOS, linux kernel 3.10.0。如果用老的Queue读取数据不会出现这个问题,并且这个问题不是必现,只有在分布式且节点比较多的时候发生的概率比较高。

复现条件

开始时我尝试将把线程数inter_op_parallelism_threadsintra_op_parallelism_threads都设为1,并且PS节点数等于1的话,一定不会出现这个问题。

server_config = tf.ConfigProto(inter_op_parallelism_threads=1,
intra_op_parallelism_threads=1)
self.server = tf.train.Server(self.cluster, job_name=self.job_name,
task_index=self.task_index, config=server_config)

经过多次尝试,发现只有在inter_op_parallelism_threads大于1并且PS节点数大于1的情况下才会偶发,其他条件下一定不会出现,和intra_op_parallelism_threads的设置没有关系。

出现问题的代码,是最常用的使用方式。

def data_iter(batch_size=1000):
def _parse_function(examples):
features = {}
features['label'] = tf.FixedLenFeature([1], tf.float32)
features['user_id'] = tf.FixedLenFeature([1], tf.int64)
features['item_id'] = tf.FixedLenFeature([1], tf.int64)
instance = tf.parse_example(examples, features)
return instance['user_id'], instance['item_id'], instance['label']
with tf.name_scope('input'):
files = tf.data.Dataset.list_files('./inputs/part-*')
dataset = files.apply(tf.contrib.data.parallel_interleave(
lambda file: tf.data.TFRecordDataset(file),
cycle_length=4, sloppy=True))
dataset = dataset.prefetch(buffer_size=batch_size*2)
dataset = dataset.batch(batch_size)
dataset = dataset.map(_parse_function, num_parallel_calls=4)
iterator = dataset.make_initializable_iterator()
return iterator

def model(user_id, item_id):
...
user_embedding = tf.embedding_lookup(user_id)
item_embedding == tf.embedding_lookup(item_id)
return tf.reduce_sum(query_embedding * item_embedding, 1, keep_dims=True)

# 图构造部分
train_iterator = data_iter(FLAGS.batch_size)
train_user_id, train_item_id, train_label = train_iterator.get_next()
train_score = model(train_user_id, train_item_id)
train_loss = some_loss_function(train_score, train_label)
opt = tf.train.AdamOptimizer(learning_rate=0.001)
train_op = opt.minimize(train_loss)

# 执行部分
with tf.Session(...) as sess:
...
while True:
try:
sess.run(train_op)
except tf.errors.OutOfRangeError:
break

解决方案

从问题表象来看,是DataSet多线程读取数据的问题,看了一段时间TensorFlow这部分的代码,没有找到解决方案。但是,从使用层面来看,DataSet主要是解耦了数据读取和模型执行部分,因为大部分模型来说,数据部分往往是瓶颈,两部分解耦后并行执行可以大大提高训练的吞吐效率。另外,session执行时直接执行最终的train_optrain_op依赖的数据读取op会自动执行。

sess.run(train_op)

所以直觉上,我怀疑很可能是模型部分和读数据部分在线程协调上出了问题。通过将读数据和模型执行两部分强制分开,问题果然消失了。
feature_value = sess.run([feature_dataset_next_iter])
sess.run(train_op, feed_dict={feature_placeholder: feature_value})

完整的代码如下。

def data_iter(batch_size=1000):
def _parse_function(examples):
features = {}
features['label'] = tf.FixedLenFeature([1], tf.float32)
features['user_id'] = tf.FixedLenFeature([1], tf.int64)
features['item_id'] = tf.FixedLenFeature([1], tf.int64)
instance = tf.parse_example(examples, features)
return instance['user_id'], instance['item_id'], instance['label']
with tf.name_scope('input'):
files = tf.data.Dataset.list_files('./inputs/part-*')
dataset = files.apply(tf.contrib.data.parallel_interleave(
lambda file: tf.data.TFRecordDataset(file),
cycle_length=4, sloppy=True))
dataset = dataset.prefetch(buffer_size=batch_size*2)
dataset = dataset.batch(batch_size)
dataset = dataset.map(_parse_function, num_parallel_calls=4)
iterator = dataset.make_initializable_iterator()
return iterator

def model(user_id, item_id):
...
user_embedding = tf.embedding_lookup(user_id)
item_embedding == tf.embedding_lookup(item_id)
return tf.reduce_sum(query_embedding * item_embedding, 1, keep_dims=True)

# 图构造部分: 模型部分使用placeholder,不直接使用DataSet.iterator的输出
train_iterator = data_iter(FLAGS.batch_size)
train_user_id, train_item_id, train_label = train_iterator.get_next()
train_user_id_placeholder = tf.placeholder(tf.int64, [None, 1], name="train_user_id_placeholder")
train_item_id_placeholder = tf.placeholder(tf.int64, [None, 1], name="train_item_id_placeholder")
train_label_placeholder = tf.placeholder(tf.float32, [None, 1], name="train_label_placeholder")
train_score = model(train_user_id_placeholder, train_item_id_placeholder)
train_loss = some_loss_function(train_score, train_label_placeholder)
opt = tf.train.AdamOptimizer(learning_rate=0.001)
train_op = opt.minimize(train_loss)

# 执行部分
with tf.Session(...) as sess:
...
while True:
try:
# 将数据读取和模型部分拆开,分别执行sess.run
train_user_id_val, train_item_id_val, train_label_val = sess.run([train_user_id, train_item_id, train_label])
sess.run(train_op,
feed_dict={
train_user_id_placeholder: train_user_id_val,
train_item_id_placeholder: train_item_id_val,
train_label_placeholder: train_label_val
})
except tf.errors.OutOfRangeError:
break

这个问题在TensorFlow-1.x都存在,在2.x上就不了解了,下一步需要在核心层面解决这个问题。