Python 如何在多核和多线程上运行 TensorFlow

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/39415263/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 22:14:15  来源:igfitidea点击:

How-to run TensorFlow on multiple core and threads

pythonmultithreadingneural-networktensorflowconv-neural-network

提问by bored_to_death

I should start saying that I am completely new to any kind of parallelism/multithreading/multiprocessing programming.

我应该开始说我对任何类型的并行/多线程/多处理编程都是全新的。

Now, I have the chance to run my TensorFlow CNN on 32 cores (each with 2 hyperthreads). I've spent a lot of time trying to understand how should I modify (if I have to) my code in order to exploit all of that computational power. Unfortuantely, I didn't come to anything. I hoped that TF could do that automatically but when I launch my model and check with topthe CPU usage, I see most of the time a 100% CPU usage and a few 200% peaks.

现在,我有机会在 32 个内核(每个内核有 2 个超线程)上运行我的 TensorFlow CNN。我花了很多时间试图了解我应该如何修改(如果必须)我的代码以利用所有这些计算能力。不幸的是,我什么都没来。我希望 TF 可以自动做到这一点,但是当我启动我的模型并检查topCPU 使用率时,我看到大部分时间都是 100% CPU 使用率和一些 200% 的峰值。

If all the cores were used, I would expect to see a 100*64=6400% usage (correct?). How can I accomplish this?

如果使用所有内核,我希望看到 100*64=6400% 的使用率(正确吗?)。我怎样才能做到这一点?

Should I do something similar to what is explained here?

我应该做一些类似于这里解释的事情吗?

If that is the case, do I understand correctly that all the multithreading is only applied to calculations which involve Queue?

如果是这种情况,我是否正确理解所有多线程仅适用于涉及队列的计算?

Is this really all that can be done to use all the computational power available (since it appears to me that queue are only used when reading and batching training samples)?

这真的是可以使用所有可用计算能力的全部方法吗(因为在我看来队列仅在读取和批处理训练样本时使用)?

This is what my code looks like, if needed: (main.py)

如果需要,这就是我的代码的样子:(main.py)

# pylint: disable=missing-docstring
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import time

from six.moves import xrange  # pylint: disable=redefined-builtin
import tensorflow as tf
from pylab import *

import argparse
import cnn
import freader_2

training_feats_file = ["file_name"]
training_lbls_file = ["file_name"]
test_feats_file = 'file_name'
test_lbls_file = 'file_name'
learning_rate = 0.1
testset_size = 1000
batch_size = 1000
testset_size = 793
tot_samples = 810901
max_steps = 3300

def placeholder_inputs(batch_size):

    images_placeholder = tf.placeholder(tf.float32, shape=(testset_size, cnn.IMAGE_HEIGHT, cnn.IMAGE_WIDTH, 1))
    labels_placeholder = tf.placeholder(tf.float32, shape=(testset_size, 15))
    return images_placeholder, labels_placeholder

def reader(images_file, lbls_file, images_pl, labels_pl, im_height, im_width):

    images = loadtxt(images_file)
    labels_feed = loadtxt(lbls_file)
    images_feed = reshape(images, [images.shape[0], im_height, im_width, 1])

    feed_dict = {
        images_pl: images_feed,
        labels_pl: labels_feed,
    }

    return feed_dict

tot_training_loss = []
tot_test_loss = []
tot_grad = []

print('Starting TensorFlow session...')
with tf.Graph().as_default():

    DS = freader_2.XICSDataSet()
    images, labels = DS.trainingset_files_reader(training_feats_file, training_lbls_file)
    keep_prob = tf.placeholder(tf.float32) 
    logits = cnn.inference(images, batch_size, keep_prob)
    loss = cnn.loss(logits, labels)
    global_step = tf.Variable(0, trainable=False)
    train_op, grad_norm = cnn.training(loss, learning_rate, global_step)
    summary_op = tf.merge_all_summaries()   

    test_images_pl, test_labels_pl = placeholder_inputs(testset_size)
    test_pred = cnn.inference(test_images_pl, testset_size, keep_prob, True)
    test_loss = cnn.loss(test_pred, test_labels_pl)

    saver = tf.train.Saver()
    sess = tf.Session()
    summary_writer = tf.train.SummaryWriter("CNN", sess.graph)

    init = tf.initialize_all_variables()
    sess.run(init)
    tf.train.start_queue_runners(sess=sess)
    test_feed = reader(test_feats_file, test_lbls_file, test_images_pl, test_labels_pl, DS.height, DS.width)
    test_feed[keep_prob] = 1.    

    # Start the training loop.
    print('Starting training loop...')
    start_time = time.time()
    for step in xrange(max_steps):

        _, grad, loss_value= sess.run([train_op, grad_norm, loss], feed_dict = {keep_prob:0.5})  
        tot_training_loss.append(loss_value)
        tot_grad.append(grad)

        _, test_loss_val = sess.run([test_pred, test_loss], feed_dict=test_feed)
        tot_test_loss.append(test_loss_val)

        if step % 1 == 0:        
            duration = time.time() - start_time
            print('Step %d (%.3f sec):\n training loss = %f\n test loss = %f ' % (step, duration, loss_value, test_loss_val))
            print(' gradient = %f'%grad)
#            summary_str = sess.run(summary_op)#, feed_dict=feed_dict)
#            summary_writer.add_summary(summary_str, step)
#            summary_writer.flush()

        if (step+1) % 100 == 0:
            print('Saving checkpoint...')
            saver.save(sess, "chkpts/medias-res", global_step = global_step)

        if test_loss_val < 0.01:# or grad < 0.01:
            print("Stopping condition reached.")
            break

    print('Saving final network...')
    saver.save(sess, "chkpts/final.chkpt")
    print('Total training time: ' + str((time.time() - start_time)/3600) + ' h')

cnn.py:

cn.py:

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import math

import tensorflow as tf

NUM_OUTPUT = 15

IMAGE_WIDTH = 195
IMAGE_HEIGHT = 20
IMAGE_PIXELS = IMAGE_WIDTH * IMAGE_HEIGHT

def inference(images, num_samples, keep_prob, reuse=None):

    with tf.variable_scope('conv1', reuse=reuse):
        kernel = tf.get_variable(name='weights', shape=[3, 30, 1, 5], initializer=tf.contrib.layers.xavier_initializer(uniform=False))        
        weight_decay = tf.mul(tf.nn.l2_loss(kernel), 0.001, name='weight_loss')
        tf.add_to_collection('losses', weight_decay)
        conv = tf.nn.conv2d(images, kernel, [1, 1, 5, 1], padding='VALID')
        # output dim: 18x34
        biases = tf.Variable(tf.constant(0.0, name='biases', shape=[5]))
        bias = tf.nn.bias_add(conv, biases)
        conv1 = tf.nn.relu(bias, name='conv1')

    pool1 = tf.nn.max_pool(conv1, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID', name='pool1')    
    #output dim: 9x17

    with tf.variable_scope('conv2', reuse=reuse):
        kernel = tf.get_variable(name='weights', shape=[2, 2, 5, 5], initializer=tf.contrib.layers.xavier_initializer(uniform=False))
        weight_decay = tf.mul(tf.nn.l2_loss(kernel), 0.001, name='weight_loss')
        tf.add_to_collection('losses', weight_decay)
        conv = tf.nn.conv2d(pool1, kernel, [1, 1, 1, 1], padding='VALID')
        #output dim: 8x16
        biases = tf.Variable(tf.constant(0.1, name='biases', shape=[5]))
        bias = tf.nn.bias_add(conv, biases)
        conv2 = tf.nn.relu(bias, name='conv2')


    pool2 = tf.nn.max_pool(conv2, ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='VALID', name='pool2')
    #output dim: 4x8

    h_fc1_drop = tf.nn.dropout(pool2, keep_prob)

    with tf.variable_scope('fully_connected', reuse=reuse):
        reshape = tf.reshape(h_fc1_drop, [num_samples, -1])
        dim = reshape.get_shape()[1].value
        weights = tf.get_variable(name='weights', shape=[dim, 20], initializer=tf.contrib.layers.xavier_initializer(uniform=False))
        weight_decay = tf.mul(tf.nn.l2_loss(weights), 0.004, name='weight_loss')
        tf.add_to_collection('losses', weight_decay)
        biases = tf.Variable(tf.zeros([20], name='biases'))
        fully_connected = tf.nn.relu(tf.matmul(reshape, weights) + biases, name='fully_connected')

    with tf.variable_scope('identity', reuse=reuse):
        weights = tf.get_variable(name='weights', shape=[20,NUM_OUTPUT], initializer=tf.contrib.layers.xavier_initializer(uniform=False))
        weight_decay = tf.mul(tf.nn.l2_loss(weights), 0.004, name='weight_loss')
        tf.add_to_collection('losses', weight_decay)
        biases = tf.Variable(tf.zeros([NUM_OUTPUT], name='biases'))
        output = tf.matmul(fully_connected, weights) + biases

    return output


def loss(outputs, labels):

    rmse = tf.sqrt(tf.reduce_mean(tf.square(tf.sub(labels, outputs))), name="rmse")
    loss_list = tf.get_collection('losses')
    loss_list.append(rmse)
    rmse_tot = tf.add_n(loss_list, name='total_loss')  
    return rmse_tot


def training(loss, starter_learning_rate, global_step):

      tf.scalar_summary(loss.op.name, loss)
#      optimizer = tf.train.AdamOptimizer()
      learning_rate = tf.train.exponential_decay(starter_learning_rate, global_step, 200, 0.8, staircase=True)
      optimizer = tf.train.MomentumOptimizer(learning_rate, 0.8)
      grads_and_vars = optimizer.compute_gradients(loss)
      grad_norms = [tf.nn.l2_loss(g[0]) for g in grads_and_vars]      
      grad_norm = tf.add_n(grad_norms)
      train_op = optimizer.apply_gradients(grads_and_vars, global_step=global_step)
#      train_op = optimizer.minimize(loss, global_step=global_step)
      return train_op, grad_norm

freader_2.py:

freader_2.py:

# -*- coding: utf-8 -*-

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import os 
import collections
import numpy as np

from six.moves import xrange  
import tensorflow as tf

class XICSDataSet:    
    def __init__(self, height=20, width=195, batch_size=1000, noutput=15):
        self.depth = 1
        self.height = height
        self.width = width
        self.batch_size = batch_size
        self.noutput = noutput

    def trainingset_files_reader(self, im_file_name, lb_file_name, nfiles=1):

        im_filename_queue = tf.train.string_input_producer(im_file_name, shuffle=False)
        lb_filename_queue = tf.train.string_input_producer(lb_file_name, shuffle=False)

        imreader = tf.TextLineReader()
        lbreader = tf.TextLineReader()
        imkey, imvalue = imreader.read(im_filename_queue)
        lbkey, lbvalue = lbreader.read(lb_filename_queue)
        im_record_defaults = [[.0]]*self.height*self.width
        lb_record_defaults = [[.0]]*self.noutput
        im_data_tuple = tf.decode_csv(imvalue, record_defaults=im_record_defaults, field_delim = ' ')
        lb_data_tuple = tf.decode_csv(lbvalue, record_defaults=lb_record_defaults, field_delim = ' ')
        features = tf.pack(im_data_tuple)
        label = tf.pack(lb_data_tuple)

        depth_major = tf.reshape(features, [self.height, self.width, self.depth])

        min_after_dequeue = 10
        capacity = min_after_dequeue + 3 * self.batch_size
        example_batch, label_batch = tf.train.shuffle_batch([depth_major, label], batch_size=self.batch_size, capacity=capacity,
                                                            min_after_dequeue=min_after_dequeue)

        return example_batch, label_batch

回答by Marco D.G.

According to Tensorflow:

根据Tensorflow

The two configurations listed below are used to optimize CPU performance by adjusting the thread pools.

  • intra_op_parallelism_threads: Nodes that can use multiple threads to parallelize their execution will schedule the individual pieces into this pool.
  • inter_op_parallelism_threads: All ready nodes are scheduled in this pool.

These configurations are set via the tf.ConfigProtoand passed to tf.Sessionin the configattribute as shown in the snippet below. For both configuration options, if they are unset or set to 0, will default to the number of logical CPU cores. Testing has shown that the default is effective for systems ranging from one CPU with 4 cores to multiple CPUs with 70+ combined logical cores. A common alternative optimization is to set the number of threads in both pools equal to the number of physical cores rather than logical cores

config = tf.ConfigProto()
config.intra_op_parallelism_threads = 44
config.inter_op_parallelism_threads = 44
tf.session(config=config)

下面列出的两个配置用于通过调整线程池来优化 CPU 性能。

  • intra_op_parallelism_threads:可以使用多个线程并行执行的节点会将各个部分调度到此池中。
  • inter_op_parallelism_threads:所有准备好的节点都安排在这个池中。

这些配置通过设置tf.ConfigProto并传递给 tf.Sessionconfig属性作为显示在下面的代码段。对于这两个配置选项,如果未设置或设置为 0,则默认为逻辑 CPU 内核数。测试表明,默认值适用于从具有 4 个内核的单个 CPU 到具有 70 多个组合逻辑内核的多个 CPU 的系统。一个常见的替代优化是将两个池中的线程数设置为等于物理内核数而不是逻辑内核数

config = tf.ConfigProto()
config.intra_op_parallelism_threads = 44
config.inter_op_parallelism_threads = 44
tf.session(config=config)


In versions of TensorFlow before 1.2, It is recommended using multi-threaded, queue-based input pipelines for performance. Beginning with TensorFlow 1.4, however, It is recommended using the tf.data module instead.

在 TensorFlow 1.2 之前的版本中,建议使用多线程、基于队列的输入管道以提高性能。但是,从 TensorFlow 1.4 开始,建议改用 tf.data 模块。



Yes, in Linux, you can check your CPU usage with topand press 1to show the usage per CPU. note: The percentage depends on the Irix/Solaris mode.

是的,在 Linux 中,您可以检查 CPU 使用情况top并按1显示每个 CPU 的使用情况。注意:百分比取决于 Irix/Solaris 模式。

回答by TheLoneDeranger

This is a comment, but I'm posting it as an answer because I don't have enough rep to post comments yet. Marco D.G.'s answer is correct, I just wanted to add the fun-fact that with tf.device('/cpu:0')automatically tries to use all available cores. Happy flowing!

这是一条评论,但我将其发布为答案,因为我还没有足够的代表发表评论。Marco DG 的回答是正确的,我只是想添加一个有趣的事实,即with tf.device('/cpu:0')自动尝试使用所有可用内核。快乐流淌!

回答by burglarhobbit

For me, it worked this way:

对我来说,它是这样工作的:

from multiprocessing.dummy import Pool as ThreadPool 
....
pool = ThreadPool()
outputs = pool.starmap(run_on_sess,[(tf_vars,data1),(tf_vars,data2),])
pool.close()
pool.join()

You should initialize the session and make session related variables available globally as a part of tf_vars. Create a run_on_sessfunction that'll perform the sess.runstep and other posterior computations for a single batchs named data1 and data2 in a pythonic multithreaded environment.

您应该初始化会话并使会话相关变量作为tf_vars. 创建一个run_on_sess函数,该函数将sess.run在 pythonic 多线程环境中为名为 data1 和 data2 的单个批次执行步骤和其他后验计算。