本文共 12967 字,大约阅读时间需要 43 分钟。
2017年2月16日,Google正式对外发布Google TensorFlow 1.0版本,并保证本次的发布版本API接口完全满足生产环境稳定性要求。这是TensorFlow的一个重要里程碑,标志着它可以正式在生产环境放心使用。在国内,从InfoQ的判断来看,TensorFlow仍处于创新传播曲线的创新者使用阶段,大部分人对于TensorFlow还缺乏了解,社区也缺少帮助落地和使用的中文资料。InfoQ期望通过深入浅出TensorFlow系列文章能够推动Tensorflow在国内的发展。欢迎加入QQ群(群号:183248479)深入讨论和交流。下面为本系列的前六篇文章:
\\ \\ \\ \\ \\ \\ \\在前面的文章中介绍了使用TensorFlow实现各种深度学习的算法。然而要将深度学习应用到实际问题中,一个非常大的问题在于训练深度学习模型需要的计算量太大。比如要将Inception-v3模型在单机单卡上训练到78%的正确率需要将近半年的时间,这样的训练速度是完全无法应用到实际生产中的。为了加速训练过程,本文将介绍如何通过TensorFlow利用GPU或/和分布式计算进行模型训练。
\\TensorFlow程序可以通过tf.device函数来指定运行每一个操作的设备,这个设备可以是本地的CPU或者GPU,也可以是某一台远程的服务器。TensorFlow会给每一个可用的设备一个名称,tf.device函数可以通过设备的名称来指定执行运算的设备。比如CPU在TensorFlow中的名称为/cpu:0。
\\在默认情况下,即使机器有多个CPU,TensorFlow也不会区分它们,所有的CPU都使用/cpu:0作为名称。而一台机器上不同GPU的名称是不同的,第n个GPU在TensorFlow中的名称为/gpu:n。比如第一个GPU的名称为/gpu:0,第二个GPU名称为/gpu:1,以此类推。
\\TensorFlow提供了一个快捷的方式来查看运行每一个运算的设备。在生成会话时,可以通过设置log_device_placement参数来打印运行每一个运算的设备。下面的程序展示了如何使用log_device_placement这个参数。
\\\import tensorflow as tf\\a = tf.constant([1.0, 2.0, 3.0], shape=[3], name='a')\b = tf.constant([1.0, 2.0, 3.0], shape=[3], name='b')\c = a + b\# 通过log_device_placement参数来输出运行每一个运算的设备。\sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))\print sess.run(c)\\'''\ 在没有GPU的机器上运行以上代码可以得到以下输出:\Device mapping: no known devices.\\add: /job:localhost/replica:0/task:0/cpu:0\b: /job:localhost/replica:0/task:0/cpu:0\a: /job:localhost/replica:0/task:0/cpu:0\[ 2. 4. 6.]\'''\\
在以上代码中,TensorFlow程序生成会话时加入了参数log_device_placement=True,所以程序会将运行每一个操作的设备输出到屏幕。于是除了可以看到最后的计算结果之外,还可以看到类似“add:/job:localhost/replica:0/task:0/cpu:0”这样的输出。这些输出显示了执行每一个运算的设备。比如加法操作add是通过CPU来运行的,因为它的设备名称中包含了/cpu:0。
\\在配置好GPU环境的TensorFlow中,如果操作没有明确地指定运行设备,那么TensorFlow会优先选择GPU。比如将以上代码在亚马逊(Amazon Web Services, AWS)的 g2.8xlarge实例上运行时,会得到以下运行结果。
\\\Device mapping:\/job:localhost/replica:0/task:0/gpu:0 -\u0026gt; device: 0, name: GRID K520, pci bus id: 0000:00:03.0\/job:localhost/replica:0/task:0/gpu:1 -\u0026gt; device: 1, name: GRID K520, pci bus id: 0000:00:04.0\/job:localhost/replica:0/task:0/gpu:2 -\u0026gt; device: 2, name: GRID K520, pci bus id: 0000:00:05.0\/job:localhost/replica:0/task:0/gpu:3 -\u0026gt; device: 3, name: GRID K520, pci bus id: 0000:00:06.0\\add: /job:localhost/replica:0/task:0/gpu:0\b: /job:localhost/replica:0/task:0/gpu:0\a: /job:localhost/replica:0/task:0/gpu:0\[ 2. 4. 6.]\\
从上面的输出可以看到在配置好GPU环境的TensorFlow中,TensorFlow会自动优先将运算放置在GPU上。不过,尽管g2.8xlarge实例有4个GPU,在默认情况下,TensorFlow只会将运算优先放到/gpu:0上。于是可以看见在上面的程序中,所有的运算都被放在了/gpu:0上。如果需要将某些运算放到不同的GPU或者CPU上,就需要通过tf.device来手工指定。下面的程序给出了一个通过tf.device手工指定运行设备的样例。
\\\import tensorflow as tf\\# 通过tf.device将运算指定到特定的设备上。\with tf.device('/cpu:0'):\ a = tf.constant([1.0, 2.0, 3.0], shape=[3], name='a')\ b = tf.constant([1.0, 2.0, 3.0], shape=[3], name='b')\with tf.device('/gpu:1'):\ c = a + b\\sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))\print sess.run(c)\\'''\在AWS g2.8xlarge实例上运行上述代码可以得到一下结果:\Device mapping:\/job:localhost/replica:0/task:0/gpu:0 -\u0026gt; device: 0, name: GRID K520, pci bus id: 0000:00:03.0\/job:localhost/replica:0/task:0/gpu:1 -\u0026gt; device: 1, name: GRID K520, pci bus id: 0000:00:04.0\/job:localhost/replica:0/task:0/gpu:2 -\u0026gt; device: 2, name: GRID K520, pci bus id: 0000:00:05.0\/job:localhost/replica:0/task:0/gpu:3 -\u0026gt; device: 3, name: GRID K520, pci bus id: 0000:00:06.0\\add: /job:localhost/replica:0/task:0/gpu:1\b: /job:localhost/replica:0/task:0/cpu:0\a: /job:localhost/replica:0/task:0/cpu:0\[ 2. 4. 6.]\'''\\
在以上代码中可以看到生成常量a和b的操作被加载到了CPU上,而加法操作被放到了第二个GPU“/gpu:1”上。在TensorFlow中,不是所有的操作都可以被放在GPU上,如果强行将无法放在GPU上的操作指定到GPU上,那么程序将会报错。
\\下面将给出具体的TensorFlow代码在一台机器的多个GPU上并行训练深度学习模型。因为一般来说一台机器上的多个GPU性能相似,所以在这种设置下会更多地采用同步模式训练深度学习模型。下面将给出具体的代码,在多GPU上训练深度学习模型解决MNIST问题。(该代码可以在TensorFlow 0.9.0下运行,对于更新TensorFlow的版本,请参考Github代码库:)
\\\# -*- coding: utf-8 -*-\\from datetime import datetime\import os\import time\\import tensorflow as tf\\# 定义训练神经网络时需要用到的配置。\BATCH_SIZE = 100 \LEARNING_RATE_BASE = 0.001\LEARNING_RATE_DECAY = 0.99\REGULARAZTION_RATE = 0.0001\TRAINING_STEPS = 1000\MOVING_AVERAGE_DECAY = 0.99 \N_GPU = 4\\# 定义日志和模型输出的路径。\MODEL_SAVE_PATH = \"/path/to/logs_and_models/\"\MODEL_NAME = \"model.ckpt\"\\# 定义数据存储的路径。因为需要为不同的GPU提供不同的训练数据,所以通过placerholder\# 的方式就需要手动准备多份数据。为了方便训练数据的获取过程,可以采用输入队列的方式从\# TFRecord中读取数据。于是在这里提供的数据文件路径为将MNIST训练数据转化为\# TFRecords格式之后的路径。\DATA_PATH = \"/path/to/data.tfrecords\" \\# 定义输入队列得到训练数据,具体细节可以参考《TensorFlow:实战Google深度学习框架》\# 第七章。\def get_input():\ filename_queue = tf.train.string_input_producer([DATA_PATH]) \ reader = tf.TFRecordReader()\_, serialized_example = reader.read(filename_queue)\# 定义数据解析格式。\ features = tf.parse_single_example(\ serialized_example,\ features={\ 'image_raw': tf.FixedLenFeature([], tf.string),\ 'pixels': tf.FixedLenFeature([], tf.int64),\ 'label': tf.FixedLenFeature([], tf.int64),\ }) \ # 解析图片和标签信息。\ decoded_image = tf.decode_raw(features['image_raw'], tf.uint8)\ reshaped_image = tf.reshape(decoded_image, [784])\ retyped_image = tf.cast(reshaped_image, tf.float32)\ label = tf.cast(features['label'], tf.int32)\\ # 定义输入队列并返回。\ min_after_dequeue = 10000\ capacity = min_after_dequeue + 3 * BATCH_SIZE\return tf.train.shuffle_batch(\ [retyped_image, label], \ batch_size=BATCH_SIZE, \ capacity=capacity, \ min_after_dequeue=min_after_dequeue)\\# 定义损失函数。对于给定的训练数据、正则化损失计算规则和命名空间,计算在这个命名空间\# 下的总损失。之所以需要给定命名空间是因为不同的GPU上计算得出的正则化损失都会加入名为\# loss的集合,如果不通过命名空间就会将不同GPU上的正则化损失都加进来。\def get_loss(x, y_, regularizer, scope):\ # 沿用第四篇文章中定义的卷积神经网络计算前向传播结果。\y = inference(x, regularizer)\# 计算交叉熵损失。\cross_entropy = tf.reduce_mean(\ tf.nn.sparse_softmax_cross_entropy_with_logits(y, y_))\# 计算当前GPU上计算得到的正则化损失。\regularization_loss = tf.add_n(tf.get_collection('losses', scope))\# 计算最终的总损失。\ loss = cross_entropy + regularization_loss\ return loss\\ # 计算每一个变量梯度的平均值。\def average_gradients(tower_grads):\average_grads = []\# 枚举所有的变量和变量在不同GPU上计算得出的梯度。\for grad_and_vars in zip(*tower_grads):\ # 计算所有GPU上的梯度平均值。\ grads = []\ for g, _ in grad_and_vars:\ expanded_g = tf.expand_dims(g, 0)\ grads.append(expanded_g)\ grad = tf.concat(0, grads)\ grad = tf.reduce_mean(grad, 0)\\ v = grad_and_vars[0][7]\ grad_and_var = (grad, v)\ # 将变量和它的平均梯度对应起来。\ average_grads.append(grad_and_var)\ # 返回所有变量的平均梯度,这将被用于变量更新。\ return average_grads\\# 主训练过程。\def main(argv=None): \ # 将简单的运算放在CPU上,只有神经网络的训练过程放在GPU上。\with tf.Graph().as_default(), tf.device('/cpu:0'):\ # 获取训练batch。\ x, y_ = get_input()\ regularizer = tf.contrib.layers.l2_regularizer(REGULARAZTION_RATE)\\ # 定义训练轮数和指数衰减的学习率。\ global_step = tf.get_variable(\ 'global_step', [], initializer=tf.constant_initializer(0), \ trainable=False)\ learning_rate = tf.train.exponential_decay(\ LEARNING_RATE_BASE, global_step, 60000 / BATCH_SIZE, \ LEARNING_ RATE_DECAY) \\ # 定义优化方法。\ opt = tf.train.GradientDescentOptimizer(learning_rate)\\ tower_grads = []\ # 将神经网络的优化过程跑在不同的GPU上。\ for i in range(N_GPU):\ # 将优化过程指定在一个GPU上。\ with tf.device('/gpu:%d' % i):\ with tf.name_scope('GPU_%d' % i) as scope:\ cur_loss = get_loss(x, y_, regularizer, scope)\ # 在第一次声明变量之后,将控制变量重用的参数设置为True。这样可以\ # 让不同的GPU更新同一组参数。注意tf.name_scope函数并不会影响\ # tf.get_ variable的命名空间。\ tf.get_variable_scope().reuse_variables()\\ # 使用当前GPU计算所有变量的梯度。\ grads = opt.compute_gradients(cur_loss)\ tower_grads.append(grads)\\ # 计算变量的平均梯度,并输出到TensorBoard日志中。\ grads = average_gradients(tower_grads)\ for grad, var in grads:\ if grad is not None:\ tf.histogram_summary(\ 'gradients_on_average/%s' % var.op.name, grad)\\ # 使用平均梯度更新参数。\ apply_gradient_op = opt.apply_gradients(\ grads, global_step=global_ step)\ for var in tf.trainable_variables():\ tf.histogram_summary(var.op.name, var)\\ # 计算变量的滑动平均值。\ variable_averages = tf.train.ExponentialMovingAverage(\ MOVING_AVERAGE_DECAY, global_step)\ variables_averages_op = variable_averages.apply(\ tf.trainable_variables())\\ # 每一轮迭代需要更新变量的取值并更新变量的滑动平均值。\ train_op = tf.group(apply_gradient_op, variables_averages_op)\\ saver = tf.train.Saver(tf.all_variables())\ summary_op = tf.merge_all_summaries() \ init = tf.initialize_all_variables()\\ # 训练过程。\ with tf.Session(config=tf.ConfigProto(\ allow_soft_placement=True, \ log_device_placement=True)) as sess:\ # 初始化所有变量并启动队列。\ init.run()\ coord = tf.train.Coordinator()\ threads = tf.train.start_queue_runners(sess=sess, coord=coord)\ summary_writer = tf.train.SummaryWriter(\ MODEL_SAVE_PATH, sess.graph)\\ for step in range(TRAINING_STEPS):\ # 执行神经网络训练操作,并记录训练操作的运行时间。\ start_time = time.time()\ _, loss_value = sess.run([train_op, cur_loss])\ duration = time.time() - start_time\\ # 每隔一段时间展示当前的训练进度,并统计训练速度。\ if step != 0 and step % 10 == 0:\ # 计算使用过的训练数据个数。因为在每一次运行训练操作时,每一个GPU \ # 都会使用一个batch的训练数据,所以总共用到的训练数据个数为\ # batch大小×GPU个数。\ num_examples_per_step = BATCH_SIZE * N_GPU\\ # num_examples_per_step为本次迭代使用到的训练数据个数, \ # duration为运行当前训练过程使用的时间,于是平均每秒可以处理的训\ # 练数据个数为num_examples_per_step / duration。\ examples_per_sec = num_examples_per_step / duration\\ # duration为运行当前训练过程使用的时间,因为在每一个训练过程中, \ # 每一个GPU都会使用一个batch的训练数据,所以在单个batch上的训\ # 练所需要时间为duration / GPU个数。\ sec_per_batch = duration / N_GPU\\ # 输出训练信息。\ format_str = ('step %d, loss = %.2f (%.1f examples/ '\ ' sec; %.3f sec/batch)')\ print(format_str % (step, loss_value, \ examples_per_sec, sec_per_batch))\\ # 通过TensorBoard可视化训练过程。\ summary = sess.run(summary_op)\ summary_writer.add_summary(summary, step)\\ # 每隔一段时间保存当前的模型。\ if step % 1000 == 0 or (step + 1) == TRAINING_STEPS:\ checkpoint_path = os.path.join(\ MODEL_SAVE_PATH, MODEL_ NAME)\ saver.save(sess, checkpoint_path, global_step=step)\\ coord.request_stop()\ coord.join(threads)\\if __name__ == '__main__':\ tf.app.run()\\'''\在AWS的g2.8xlarge实例上运行上面这段程序可以得到类似下面的结果:\step 10, loss = 71.90 (15292.3 examples/sec; 0.007 sec/batch)\step 20, loss = 37.97 (18758.3 examples/sec; 0.005 sec/batch)\step 30, loss = 9.54 (16313.3 examples/sec; 0.006 sec/batch)\step 40, loss = 11.84 (14199.0 examples/sec; 0.007 sec/batch)\...\step 980, loss = 0.66 (15034.7 examples/sec; 0.007 sec/batch)\step 990, loss = 1.56 (16134.1 examples/sec; 0.006 sec/batch)\'''\\
(点击放大图像)
\\ \\图1 在AWS的g2.8xlarge实例上运行MNIST样例程序时GPU的使用情况
\\在AWS的g2.8xlarge实例上运行以上代码可以同时使用4个GPU训练神经网络。图1显示了运行样例代码时不同GPU的使用情况。因为运行的神经网络规模比较小,所以在图1中显示的GPU使用率不高。如果训练大型的神经网络模型,TensorFlow将会占满所有用到的GPU。
\\(点击放大图像)
\\ \\图2 使用了4个GPU的TensorFlow计算图可视化结果
\\图2展示了通过TensorBoard可视化得到的样例代码TensorFlow计算图,其中节点上的颜色代表了不同的设备,比如黑色代表CPU、白色代表第一个GPU,等等。从图2中可以看出,训练神经网络的主要过程被放到了GPU_0、GPU_1、GPU_2和GPU_3这4个模块中,而且每一个模块运行在一个GPU上。
\\通过多GPU并行的方式可以达到很好的加速效果。然而一台机器上能够安装的GPU有限,要进一步提升深度学习模型的训练速度,就需要将TensorFlow分布式运行在多台机器上。以下代码展示了如何创建一个最简单的TensorFlow集群:
\\\import tensorflow as tf\c = tf.constant(\"Hello, distributed TensorFlow!\")\# 创建一个本地TensorFlow集群\server = tf.train.Server.create_local_server() \# 在集群上创建一个会话。\sess = tf.Session(server.target) \# 输出Hello, distributed TensorFlow!\print sess.run(c)\\
在以上代码中,首先通过 tf.train.Server.create_local_server函数在本地建立了一个只有一台机器的TensorFlow集群。然后在该集群上生成了一个会话,并通过生成的会话将运算运行在创建的TensorFlow集群上。虽然这只是一个单机集群,但它大致反应了TensorFlow集群的工作流程。TensorFlow集群通过一系列的任务(tasks)来执行TensorFlow计算图中的运算。一般来说,不同任务跑在不同机器上。最主要的例外是使用GPU时,不同任务可以使用同一台机器上的不同GPU。TensorFlow集群中的任务也会被聚合成工作(jobs),每个工作可以包含一个或者多个任务。比如在训练深度学习模型时,一台运行反向传播的机器是一个任务,而所有运行反向传播机器的集合是一种工作。
\\上面的样例代码是只有一个任务的集群。当一个TensorFlow集群有多个任务时,需要使用tf.train.ClusterSpec来指定运行每一个任务的机器。比如以下代码展示了在本地运行有两个任务的TensorFlow集群。第一个任务的代码如下:
\\\import tensorflow as tf\c = tf.constant(\"Hello from server1!\")\\# 生成一个有两个任务的集群,一个任务跑在本地2222端口,另外一个跑在本地2223端口。\cluster = tf.train.ClusterSpec(\ {\"local\": [\"localhost:2222\
转载地址:http://jrvkx.baihongyu.com/