Python 如何在多核和多线程上运行 TensorFlow
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/39415263/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How-to run TensorFlow on multiple core and threads
提问by bored_to_death
I should start saying that I am completely new to any kind of parallelism/multithreading/multiprocessing programming.
我应该开始说我对任何类型的并行/多线程/多处理编程都是全新的。
Now, I have the chance to run my TensorFlow CNN on 32 cores (each with 2 hyperthreads). I've spent a lot of time trying to understand how should I modify (if I have to) my code in order to exploit all of that computational power. Unfortuantely, I didn't come to anything. I hoped that TF could do that automatically but when I launch my model and check with top
the CPU usage, I see most of the time a 100% CPU usage and a few 200% peaks.
现在,我有机会在 32 个内核(每个内核有 2 个超线程)上运行我的 TensorFlow CNN。我花了很多时间试图了解我应该如何修改(如果必须)我的代码以利用所有这些计算能力。不幸的是,我什么都没来。我希望 TF 可以自动做到这一点,但是当我启动我的模型并检查top
CPU 使用率时,我看到大部分时间都是 100% CPU 使用率和一些 200% 的峰值。
If all the cores were used, I would expect to see a 100*64=6400% usage (correct?). How can I accomplish this?
如果使用所有内核,我希望看到 100*64=6400% 的使用率(正确吗?)。我怎样才能做到这一点?
Should I do something similar to what is explained here?
If that is the case, do I understand correctly that all the multithreading is only applied to calculations which involve Queue?
如果是这种情况,我是否正确理解所有多线程仅适用于涉及队列的计算?
Is this really all that can be done to use all the computational power available (since it appears to me that queue are only used when reading and batching training samples)?
这真的是可以使用所有可用计算能力的全部方法吗(因为在我看来队列仅在读取和批处理训练样本时使用)?
This is what my code looks like, if needed: (main.py)
如果需要,这就是我的代码的样子:(main.py)
# pylint: disable=missing-docstring
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import time
from six.moves import xrange # pylint: disable=redefined-builtin
import tensorflow as tf
from pylab import *
import argparse
import cnn
import freader_2
training_feats_file = ["file_name"]
training_lbls_file = ["file_name"]
test_feats_file = 'file_name'
test_lbls_file = 'file_name'
learning_rate = 0.1
testset_size = 1000
batch_size = 1000
testset_size = 793
tot_samples = 810901
max_steps = 3300
def placeholder_inputs(batch_size):
images_placeholder = tf.placeholder(tf.float32, shape=(testset_size, cnn.IMAGE_HEIGHT, cnn.IMAGE_WIDTH, 1))
labels_placeholder = tf.placeholder(tf.float32, shape=(testset_size, 15))
return images_placeholder, labels_placeholder
def reader(images_file, lbls_file, images_pl, labels_pl, im_height, im_width):
images = loadtxt(images_file)
labels_feed = loadtxt(lbls_file)
images_feed = reshape(images, [images.shape[0], im_height, im_width, 1])
feed_dict = {
images_pl: images_feed,
labels_pl: labels_feed,
}
return feed_dict
tot_training_loss = []
tot_test_loss = []
tot_grad = []
print('Starting TensorFlow session...')
with tf.Graph().as_default():
DS = freader_2.XICSDataSet()
images, labels = DS.trainingset_files_reader(training_feats_file, training_lbls_file)
keep_prob = tf.placeholder(tf.float32)
logits = cnn.inference(images, batch_size, keep_prob)
loss = cnn.loss(logits, labels)
global_step = tf.Variable(0, trainable=False)
train_op, grad_norm = cnn.training(loss, learning_rate, global_step)
summary_op = tf.merge_all_summaries()
test_images_pl, test_labels_pl = placeholder_inputs(testset_size)
test_pred = cnn.inference(test_images_pl, testset_size, keep_prob, True)
test_loss = cnn.loss(test_pred, test_labels_pl)
saver = tf.train.Saver()
sess = tf.Session()
summary_writer = tf.train.SummaryWriter("CNN", sess.graph)
init = tf.initialize_all_variables()
sess.run(init)
tf.train.start_queue_runners(sess=sess)
test_feed = reader(test_feats_file, test_lbls_file, test_images_pl, test_labels_pl, DS.height, DS.width)
test_feed[keep_prob] = 1.
# Start the training loop.
print('Starting training loop...')
start_time = time.time()
for step in xrange(max_steps):
_, grad, loss_value= sess.run([train_op, grad_norm, loss], feed_dict = {keep_prob:0.5})
tot_training_loss.append(loss_value)
tot_grad.append(grad)
_, test_loss_val = sess.run([test_pred, test_loss], feed_dict=test_feed)
tot_test_loss.append(test_loss_val)
if step % 1 == 0:
duration = time.time() - start_time
print('Step %d (%.3f sec):\n training loss = %f\n test loss = %f ' % (step, duration, loss_value, test_loss_val))
print(' gradient = %f'%grad)
# summary_str = sess.run(summary_op)#, feed_dict=feed_dict)
# summary_writer.add_summary(summary_str, step)
# summary_writer.flush()
if (step+1) % 100 == 0:
print('Saving checkpoint...')
saver.save(sess, "chkpts/medias-res", global_step = global_step)
if test_loss_val < 0.01:# or grad < 0.01:
print("Stopping condition reached.")
break
print('Saving final network...')
saver.save(sess, "chkpts/final.chkpt")
print('Total training time: ' + str((time.time() - start_time)/3600) + ' h')
cnn.py:
cn.py:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
import tensorflow as tf
NUM_OUTPUT = 15
IMAGE_WIDTH = 195
IMAGE_HEIGHT = 20
IMAGE_PIXELS = IMAGE_WIDTH * IMAGE_HEIGHT
def inference(images, num_samples, keep_prob, reuse=None):
with tf.variable_scope('conv1', reuse=reuse):
kernel = tf.get_variable(name='weights', shape=[3, 30, 1, 5], initializer=tf.contrib.layers.xavier_initializer(uniform=False))
weight_decay = tf.mul(tf.nn.l2_loss(kernel), 0.001, name='weight_loss')
tf.add_to_collection('losses', weight_decay)
conv = tf.nn.conv2d(images, kernel, [1, 1, 5, 1], padding='VALID')
# output dim: 18x34
biases = tf.Variable(tf.constant(0.0, name='biases', shape=[5]))
bias = tf.nn.bias_add(conv, biases)
conv1 = tf.nn.relu(bias, name='conv1')
pool1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID', name='pool1')
#output dim: 9x17
with tf.variable_scope('conv2', reuse=reuse):
kernel = tf.get_variable(name='weights', shape=[2, 2, 5, 5], initializer=tf.contrib.layers.xavier_initializer(uniform=False))
weight_decay = tf.mul(tf.nn.l2_loss(kernel), 0.001, name='weight_loss')
tf.add_to_collection('losses', weight_decay)
conv = tf.nn.conv2d(pool1, kernel, [1, 1, 1, 1], padding='VALID')
#output dim: 8x16
biases = tf.Variable(tf.constant(0.1, name='biases', shape=[5]))
bias = tf.nn.bias_add(conv, biases)
conv2 = tf.nn.relu(bias, name='conv2')
pool2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID', name='pool2')
#output dim: 4x8
h_fc1_drop = tf.nn.dropout(pool2, keep_prob)
with tf.variable_scope('fully_connected', reuse=reuse):
reshape = tf.reshape(h_fc1_drop, [num_samples, -1])
dim = reshape.get_shape()[1].value
weights = tf.get_variable(name='weights', shape=[dim, 20], initializer=tf.contrib.layers.xavier_initializer(uniform=False))
weight_decay = tf.mul(tf.nn.l2_loss(weights), 0.004, name='weight_loss')
tf.add_to_collection('losses', weight_decay)
biases = tf.Variable(tf.zeros([20], name='biases'))
fully_connected = tf.nn.relu(tf.matmul(reshape, weights) + biases, name='fully_connected')
with tf.variable_scope('identity', reuse=reuse):
weights = tf.get_variable(name='weights', shape=[20,NUM_OUTPUT], initializer=tf.contrib.layers.xavier_initializer(uniform=False))
weight_decay = tf.mul(tf.nn.l2_loss(weights), 0.004, name='weight_loss')
tf.add_to_collection('losses', weight_decay)
biases = tf.Variable(tf.zeros([NUM_OUTPUT], name='biases'))
output = tf.matmul(fully_connected, weights) + biases
return output
def loss(outputs, labels):
rmse = tf.sqrt(tf.reduce_mean(tf.square(tf.sub(labels, outputs))), name="rmse")
loss_list = tf.get_collection('losses')
loss_list.append(rmse)
rmse_tot = tf.add_n(loss_list, name='total_loss')
return rmse_tot
def training(loss, starter_learning_rate, global_step):
tf.scalar_summary(loss.op.name, loss)
# optimizer = tf.train.AdamOptimizer()
learning_rate = tf.train.exponential_decay(starter_learning_rate, global_step, 200, 0.8, staircase=True)
optimizer = tf.train.MomentumOptimizer(learning_rate, 0.8)
grads_and_vars = optimizer.compute_gradients(loss)
grad_norms = [tf.nn.l2_loss(g[0]) for g in grads_and_vars]
grad_norm = tf.add_n(grad_norms)
train_op = optimizer.apply_gradients(grads_and_vars, global_step=global_step)
# train_op = optimizer.minimize(loss, global_step=global_step)
return train_op, grad_norm
freader_2.py:
freader_2.py:
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import collections
import numpy as np
from six.moves import xrange
import tensorflow as tf
class XICSDataSet:
def __init__(self, height=20, width=195, batch_size=1000, noutput=15):
self.depth = 1
self.height = height
self.width = width
self.batch_size = batch_size
self.noutput = noutput
def trainingset_files_reader(self, im_file_name, lb_file_name, nfiles=1):
im_filename_queue = tf.train.string_input_producer(im_file_name, shuffle=False)
lb_filename_queue = tf.train.string_input_producer(lb_file_name, shuffle=False)
imreader = tf.TextLineReader()
lbreader = tf.TextLineReader()
imkey, imvalue = imreader.read(im_filename_queue)
lbkey, lbvalue = lbreader.read(lb_filename_queue)
im_record_defaults = [[.0]]*self.height*self.width
lb_record_defaults = [[.0]]*self.noutput
im_data_tuple = tf.decode_csv(imvalue, record_defaults=im_record_defaults, field_delim = ' ')
lb_data_tuple = tf.decode_csv(lbvalue, record_defaults=lb_record_defaults, field_delim = ' ')
features = tf.pack(im_data_tuple)
label = tf.pack(lb_data_tuple)
depth_major = tf.reshape(features, [self.height, self.width, self.depth])
min_after_dequeue = 10
capacity = min_after_dequeue + 3 * self.batch_size
example_batch, label_batch = tf.train.shuffle_batch([depth_major, label], batch_size=self.batch_size, capacity=capacity,
min_after_dequeue=min_after_dequeue)
return example_batch, label_batch
回答by Marco D.G.
According to Tensorflow:
根据Tensorflow:
The two configurations listed below are used to optimize CPU performance by adjusting the thread pools.
intra_op_parallelism_threads
: Nodes that can use multiple threads to parallelize their execution will schedule the individual pieces into this pool.inter_op_parallelism_threads
: All ready nodes are scheduled in this pool.These configurations are set via the
tf.ConfigProto
and passed totf.Session
in theconfig
attribute as shown in the snippet below. For both configuration options, if they are unset or set to 0, will default to the number of logical CPU cores. Testing has shown that the default is effective for systems ranging from one CPU with 4 cores to multiple CPUs with 70+ combined logical cores. A common alternative optimization is to set the number of threads in both pools equal to the number of physical cores rather than logical coresconfig = tf.ConfigProto() config.intra_op_parallelism_threads = 44 config.inter_op_parallelism_threads = 44 tf.session(config=config)
下面列出的两个配置用于通过调整线程池来优化 CPU 性能。
intra_op_parallelism_threads
:可以使用多个线程并行执行的节点会将各个部分调度到此池中。inter_op_parallelism_threads
:所有准备好的节点都安排在这个池中。这些配置通过设置
tf.ConfigProto
并传递给tf.Session
在config
属性作为显示在下面的代码段。对于这两个配置选项,如果未设置或设置为 0,则默认为逻辑 CPU 内核数。测试表明,默认值适用于从具有 4 个内核的单个 CPU 到具有 70 多个组合逻辑内核的多个 CPU 的系统。一个常见的替代优化是将两个池中的线程数设置为等于物理内核数而不是逻辑内核数config = tf.ConfigProto() config.intra_op_parallelism_threads = 44 config.inter_op_parallelism_threads = 44 tf.session(config=config)
In versions of TensorFlow before 1.2, It is recommended using multi-threaded, queue-based input pipelines for performance. Beginning with TensorFlow 1.4, however, It is recommended using the tf.data module instead.
在 TensorFlow 1.2 之前的版本中,建议使用多线程、基于队列的输入管道以提高性能。但是,从 TensorFlow 1.4 开始,建议改用 tf.data 模块。
Yes, in Linux, you can check your CPU usage with top
and press 1to show the usage per CPU. note: The percentage depends on the Irix/Solaris mode.
是的,在 Linux 中,您可以检查 CPU 使用情况top
并按1显示每个 CPU 的使用情况。注意:百分比取决于 Irix/Solaris 模式。
回答by TheLoneDeranger
This is a comment, but I'm posting it as an answer because I don't have enough rep to post comments yet. Marco D.G.'s answer is correct, I just wanted to add the fun-fact that with tf.device('/cpu:0')
automatically tries to use all available cores. Happy flowing!
这是一条评论,但我将其发布为答案,因为我还没有足够的代表发表评论。Marco DG 的回答是正确的,我只是想添加一个有趣的事实,即with tf.device('/cpu:0')
自动尝试使用所有可用内核。快乐流淌!
回答by burglarhobbit
For me, it worked this way:
对我来说,它是这样工作的:
from multiprocessing.dummy import Pool as ThreadPool
....
pool = ThreadPool()
outputs = pool.starmap(run_on_sess,[(tf_vars,data1),(tf_vars,data2),])
pool.close()
pool.join()
You should initialize the session and make session related variables available globally as a part of tf_vars
. Create a run_on_sess
function that'll perform the sess.run
step and other posterior computations for a single batchs named data1 and data2 in a pythonic multithreaded environment.
您应该初始化会话并使会话相关变量作为tf_vars
. 创建一个run_on_sess
函数,该函数将sess.run
在 pythonic 多线程环境中为名为 data1 和 data2 的单个批次执行步骤和其他后验计算。