如何从python中的线程池中获取结果?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/26104512/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 00:05:30  来源:igfitidea点击:

How to obtain the results from a pool of threads in python?

pythonmultithreadingqueuereturn-value

提问by Rafael Rios

I have searched here about how to do threading in python, but by far i haven't been able to get the answer i need. I'm not very familiar with the Queue and Threading python classes and for that reason some of the answers present here makes no sense at all to me.

我在这里搜索了有关如何在 python 中进行线程处理的信息,但到目前为止我还没有得到我需要的答案。我对 Queue 和 Threading python 类不是很熟悉,因此这里提供的一些答案对我来说毫无意义。

I want to create a pool of threads which i can give different task and when all of them have ended get the result values and process them. So far i have tried to do this but i'm not able to get the results. The code i have written is:

我想创建一个线程池,我可以提供不同的任务,当所有线程都结束时,获取结果值并处理它们。到目前为止,我已尝试这样做,但无法获得结果。我写的代码是:

from threading import Thread
from Queue import Queue

class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.result = None
        self.start()
    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                self.result = func(*args, **kargs)
            except Exception, e:
                print e
            self.tasks.task_done()
    def get_result(self):
        return self.result

class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        self.results = []
        for _ in range(num_threads):
            w = Worker(self.tasks)
            self.results.append(w.get_result())
    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))
    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()
    def get_results(self):
        return self.results

def foo(word, number):
    print word*number
    return number

words = ['hello', 'world', 'test', 'word', 'another test']
numbers = [1,2,3,4,5]
pool = ThreadPool(5)
for i in range(0, len(words)):
    pool.add_task(foo, words[i], numbers[i])

pool.wait_completion()
results = pool.get_results()
print results

The output prints the strings with word given times the number given but the results list is full with None values, so where i should put the return values of the func.

输出用给定的单词乘以给定的数字打印字符串,但结果列表中充满了 None 值,所以我应该把 func 的返回值放在哪里。

Or the easy way is to create a list where i fill the Queue and add a dictionary or some variable to store the result as an argument to my function, and after the task is added to the Queue add this result argument to a list of results:

或者,简单的方法是创建一个列表,在其中填充队列并添加字典或某个变量以将结果存储为我的函数的参数,并在将任务添加到队列后将此结果参数添加到结果列表:

def foo(word, number, r):
    print word*number
    r[(word,number)] = number
    return number

words = ['hello', 'world', 'test', 'word', 'another test']
numbers = [1,2,3,4,5]
pool = ThreadPool(5)
results = []
for i in range(0, len(words)):
    r = {}
    pool.add_task(foo, words[i], numbers[i], r)
    results.append(r)
print results

采纳答案by dano

Python actually has a built-in thread pool you can use, its just not well documented:

Python 实际上有一个您可以使用的内置线程池,只是没有很好的文档记录

from multiprocessing.pool import ThreadPool

def foo(word, number):
    print (word * number)
    r[(word,number)] = number
    return number

words = ['hello', 'world', 'test', 'word', 'another test']
numbers = [1,2,3,4,5]
pool = ThreadPool(5)
results = []
for i in range(0, len(words)):
    results.append(pool.apply_async(foo, args=(words[i], numbers[i])))

pool.close()
pool.join()
results = [r.get() for r in results]
print results

Or (using mapinstead of apply_async):

或(使用map代替apply_async):

from multiprocessing.pool import ThreadPool

def foo(word, number):
    print word*number
    return number

def starfoo(args):
    """ 

    We need this because map only supports calling functions with one arg. 
    We need to pass two args, so we use this little wrapper function to
    expand a zipped list of all our arguments.

    """    
    return foo(*args)

words = ['hello', 'world', 'test', 'word', 'another test']
numbers = [1,2,3,4,5]
pool = ThreadPool(5)
# We need to zip together the two lists because map only supports calling functions
# with one argument. In Python 3.3+, you can use starmap instead.
results = pool.map(starfoo, zip(words, numbers))
print results

pool.close()
pool.join()