Python multiprocessing.Pool() 比仅使用普通函数慢

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/20727375/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-18 21:06:22  来源:igfitidea点击:

multiprocessing.Pool() slower than just using ordinary functions

pythonperformancemultiprocessingpool

提问by Karim Bahgat

(This question is about how to make multiprocessing.Pool() run code faster. I finally solved it, and the final solution can be found at the bottom of the post.)

(这个问题是关于如何让 multiprocessing.Pool() 运行代码更快。我终于解决了,最终解决方案可以在帖子底部找到。)

Original Question:

原问题:

I'm trying to use Python to compare a word with many other words in a list and retrieve a list of the most similar ones. To do that I am using the difflib.get_close_matches function. I'm on a relatively new and powerful Windows 7 Laptop computer, with Python 2.6.5.

我正在尝试使用 Python 将一个单词与列表中的许多其他单词进行比较,并检索最相似的列表。为此,我使用了 difflib.get_close_matches 函数。我在使用 Python 2.6.5 的相对较新且功能强大的 Windows 7 笔记本电脑上。

What I want is to speed up the comparison process because my comparison list of words is very long and I have to repeat the comparison process several times. When I heard about the multiprocessing module it seemed logical that if the comparison could be broken up into worker tasks and run simultaneously (and thus making use of machine power in exchange for faster speed) my comparison task would finish faster.

我想要的是加快比较过程,因为我的单词比较列表很长,我必须多次重复比较过程。当我听说多处理模块时,如果比较可以分解为工作任务并同时运行(从而利用机器功率来换取更快的速度),我的比较任务将更快地完成,这似乎是合乎逻辑的。

However, even after having tried many different ways, and used methods that have been shown in the docs and suggested in forum posts, the Pool method just seems to be incredibly slow, much slower than just running the original get_close_matches function on the entire list at once. I would like help understanding why Pool() is being so slow and if I am using it correctly. Im only using this string comparison scenario as an example because that is the most recent example I could think of where I was unable to understand or get multiprocessing to work for rather than against me. Below is just an example code from the difflib scenario showing the time differences between the ordinary and the Pooled methods:

然而,即使在尝试了许多不同的方法并使用了文档中显示和论坛帖子中建议的方法之后,Pool 方法似乎还是非常慢,比在整个列表上运行原始 get_close_matches 函数要慢得多。一次。我想帮助理解为什么 Pool() 如此缓慢以及我是否正确使用它。我只使用这个字符串比较场景作为例子,因为这是我能想到的最近的例子,我无法理解或让多处理工作而不是反对我。下面只是来自 difflib 场景的示例代码,显示了普通方法和 Pooled 方法之间的时间差异:

from multiprocessing import Pool
import random, time, difflib

# constants
wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) for lengthofword in xrange(5)]) for nrofwords in xrange(1000000)]
mainword = "hello"

# comparison function
def findclosematch(subwordlist):
    matches = difflib.get_close_matches(mainword,subwordlist,len(subwordlist),0.7)
    if matches <> []:
        return matches

# pool
print "pool method"
if __name__ == '__main__':
    pool = Pool(processes=3)
    t=time.time()
    result = pool.map_async(findclosematch, wordlist, chunksize=100)
    #do something with result
    for r in result.get():
        pass
    print time.time()-t

# normal
print "normal method"
t=time.time()
# run function
result = findclosematch(wordlist)
# do something with results
for r in result:
    pass
print time.time()-t

The word to be found is "hello", and the list of words in which to find close matches is a 1 million long list of 5 randomly joined characters (only for illustration purposes). I use 3 processor cores and the map function with a chunksize of 100 (listitems to be procesed per worker I think??) (I also tried chunksizes of 1000 and 10 000 but there was no real difference). Notice that in both methods I start the timer right before calling on my function and end it right after having looped through the results. As you can see below the timing results are clearly in favor of the original non-Pool method:

要查找的单词是“hello”,查找接近匹配的单词列表是 100 万个长列表,其中包含 5 个随机连接的字符(仅用于说明目的)。我使用 3 个处理器内核和块大小为 100 的 map 函数(我认为每个工人要处理的列表项??)(我也尝试了 1000 和 10 000 的块大小,但没有真正的区别)。请注意,在这两种方法中,我都在调用我的函数之前启动计时器,并在循环结果后立即结束它。正如您在下面看到的,计时结果显然有利于原始的非池方法:

>>> 
pool method
37.1690001488 seconds
normal method
10.5329999924 seconds
>>> 

The Pool method is almost 4 times slower than the original method. Is there something I am missing here, or maybe misunderstanding about how the Pooling/multiprocessing works? I do suspect that part of the problem here could be that the map function returns None and so adds thousands of unneccessary items to the resultslist even though I only want actual matches to be returned to the results and have written it as such in the function. From what I understand that is just how map works. I have heard about some other functions like filter that only collects non-False results, but I dont think that multiprocessing/Pool supports the filter method. Are there any other functions besides map/imap in the multiprocessing module that could help me out in only returning what my function returns? Apply function is more for giving multiple arguments as I understand it.

Pool 方法几乎比原始方法慢 4 倍。我在这里遗漏了什么,或者可能对池化/多处理的工作方式有误解?我确实怀疑这里的部分问题可能是 map 函数返回 None ,因此将数千个不必要的项目添加到结果列表中,即使我只希望将实际匹配项返回到结果并在函数中这样写。据我了解,这就是地图的工作原理。我听说过一些其他功能,例如 filter 只收集非 False 结果,但我认为 multiprocessing/Pool 不支持 filter 方法。除了多处理模块中的 map/imap 之外,还有其他函数可以帮助我只返回函数返回的内容吗?根据我的理解,Apply 函数更多地用于提供多个参数。

I know there's also the imap function, which I tried but without any time-improvements. The reason being the same reason why I have had problems understanding what's so great about the itertools module, supposedly "lightning fast", which I've noticed is true for calling the function, but in my experience and from what I've read that's because calling the function doesn't actually do any calculations, so when it's time to iterate through the results to collect and analyze them (without which there would be no point in calling the cuntion) it takes just as much or sometimes more time than a just using the normal version of the function straightup. But I suppose that's for another post.

我知道还有 imap 功能,我尝试过但没有任何时间改进。原因与我在理解 itertools 模块的伟大之处时遇到问题的原因相同,据说“闪电般快”,我注意到调用函数是正确的,但根据我的经验和我读过的内容因为调用该函数实际上并不做任何计算,所以当需要迭代结果以收集和分析它们时(没有它,调用 cuntion 就没有意义)它花费的时间与 a 一样多,有时甚至更多只需使用函数的普通版本即可。但我想那是另一个帖子。

Anyway, excited to see if someone can nudge me in the right direction here, and really appreciate any help on this. I'm more interested in understanding multiprocessing in general than to get this example to work, though it would be useful with some example solution code suggestions to aid in my understanding.

无论如何,很高兴看到是否有人能在这里将我推向正确的方向,并且非常感谢您对此的任何帮助。我对理解多处理一般比让这个例子工作更感兴趣,尽管一些示例解决方案代码建议有助于我的理解。

The Answer:

答案:

Seems like the slowdown had to do with the slow startup time of additional processes. I couldnt get the .Pool() function to be fast enough. My final solution to make it faster was to manually split the workload list, use multiple .Process() instead of .Pool(), and return the solutions in a Queue. But I wonder if maybe the most crucial change might have been splitting the workload in terms of the main word to look for rather than the words to compare with, perhaps because the difflib search function is already so fast. Here is the new code running 5 processes at the same time, and turned out about x10 faster than running a simple code (6 seconds vs 55 seconds). Very useful for fast fuzzy lookups, on top of how fast difflib already is.

似乎减速与其他进程的缓慢启动时间有关。我无法让 .Pool() 函数足够快。我最终加快速度的解决方案是手动拆分工作负载列表,使用多个 .Process() 而不是 .Pool(),并在队列中返回解决方案。但我想知道是否最关键的变化可能是根据要查找的主要词而不是要比较的词来分配工作量,也许是因为 difflib 搜索功能已经非常快了。这是同时运行 5 个进程的新代码,结果比运行简单代码快 10 倍(6 秒对 55 秒)。除了 difflib 已经有多快之外,对于快速模糊查找非常有用。

from multiprocessing import Process, Queue
import difflib, random, time

def f2(wordlist, mainwordlist, q):
    for mainword in mainwordlist:
        matches = difflib.get_close_matches(mainword,wordlist,len(wordlist),0.7)
        q.put(matches)

if __name__ == '__main__':

    # constants (for 50 input words, find closest match in list of 100 000 comparison words)
    q = Queue()
    wordlist = ["".join([random.choice([letter for letter in "abcdefghijklmnopqersty"]) for lengthofword in xrange(5)]) for nrofwords in xrange(100000)]
    mainword = "hello"
    mainwordlist = [mainword for each in xrange(50)]

    # normal approach
    t = time.time()
    for mainword in mainwordlist:
        matches = difflib.get_close_matches(mainword,wordlist,len(wordlist),0.7)
        q.put(matches)
    print time.time()-t

    # split work into 5 or 10 processes
    processes = 5
    def splitlist(inlist, chunksize):
        return [inlist[x:x+chunksize] for x in xrange(0, len(inlist), chunksize)]
    print len(mainwordlist)/processes
    mainwordlistsplitted = splitlist(mainwordlist, len(mainwordlist)/processes)
    print "list ready"

    t = time.time()
    for submainwordlist in mainwordlistsplitted:
        print "sub"
        p = Process(target=f2, args=(wordlist,submainwordlist,q,))
        p.Daemon = True
        p.start()
    for submainwordlist in mainwordlistsplitted:
        p.join()
    print time.time()-t
    while True:
        print q.get()

采纳答案by Multimedia Mike

My best guess is inter-process communication (IPC) overhead. In the single-process instance, the single process has the word list. When delegating to various other processes, the main process needs to constantly shuttle sections of the list to other processes.

我最好的猜测是进程间通信 (IPC) 开销。在单进程实例中,单进程有单词列表。当委托给各种其他进程时,主进程需要不断地将列表的部分穿梭到其他进程。

Thus, it follows that a better approach might be to spin off nprocesses, each of which is responsible for loading/generating 1/nsegment of the list and checking if the word is in that part of the list.

因此,更好的方法可能是分拆n 个进程,每个进程负责加载/生成列表的1/n段并检查单词是否在列表的该部分中。

I'm not sure how to do that with Python's multiprocessing library, though.

不过,我不确定如何使用 Python 的多处理库来做到这一点。

回答by verdverm

I experienced something similar with the Pool on a different problem. I'm not sure of the actual cause at this point...

我在不同的问题上遇到了与 Pool 类似的事情。我现在不确定真正的原因......

The Answeredit by OP Karim Bahgat is the same solution that worked for me. After switching to a Process & Queue system, I was able to see speedups inline with the number of cores for a machine.

OP Karim Bahgat 编辑的答案与对我有用的解决方案相同。切换到 Process & Queue 系统后,我能够看到与机器内核数量成线性关系的加速。

Here's an example.

这是一个例子。

def do_something(data):
    return data * 2

def consumer(inQ, outQ):
    while True:
        try:
            # get a new message
            val = inQ.get()

            # this is the 'TERM' signal
            if val is None:
                break;

            # unpack the message
            pos = val[0]  # its helpful to pass in/out the pos in the array
            data = val[1]

            # process the data
            ret = do_something(data)

            # send the response / results
            outQ.put( (pos, ret) )


        except Exception, e:
            print "error!", e
            break

def process_data(data_list, inQ, outQ):
    # send pos/data to workers
    for i,dat in enumerate(data_list):
        inQ.put( (i,dat) )

    # process results
    for i in range(len(data_list)):
        ret = outQ.get()
        pos = ret[0]
        dat = ret[1]
        data_list[pos] = dat


def main():
    # initialize things
    n_workers = 4
    inQ = mp.Queue()
    outQ = mp.Queue()
    # instantiate workers
    workers = [mp.Process(target=consumer, args=(inQ,outQ))
               for i in range(n_workers)]

    # start the workers
    for w in workers:
        w.start()

    # gather some data
    data_list = [ d for d in range(1000)]

    # lets process the data a few times
    for i in range(4):
        process_data(data_list)

    # tell all workers, no more data (one msg for each)
    for i in range(n_workers):
        inQ.put(None)
    # join on the workers
    for w in workers:
        w.join()

    # print out final results  (i*16)
    for i,dat in enumerate(data_list):
        print i, dat

回答by The Aelfinn

These problems usually boil down to the following:

这些问题通常归结为以下几点:

The function you are trying to parallelize doesn't require enough CPU resources (i.e. CPU time) to rationalize parallelization!

您尝试并行化的函数不需要足够的 CPU 资源(即 CPU 时间)来合理化并行化!

Sure, when you parallelize with multiprocessing.Pool(8), you theoretically (but not practically)could get a 8xspeed up.

当然,当您与 并行化时multiprocessing.Pool(8),理论上(但实际上并非)可以获得8 倍的加速。

However, keep in mind that this isn't free - you gain this parallelization at the expense of the following overhead:

但是,请记住,这不是免费的 - 您以以下开销为代价获得这种并行化:

  1. Creating a taskfor every chunk(of size chunksize) in your iterpassed to Pool.map(f, iter)
  2. For each task
    1. Serialize the task, and the task'sreturn value (thinkpickle.dumps())
    2. Deserialize the task, and the task'sreturn value (thinkpickle.loads())
    3. Waste significant time waiting for Lockson shared memory Queues, while worker processes and parent processes get()and put()from/to these Queues.
  3. One-time cost of calls to os.fork()for each worker process, which is expensive.
  1. task为您传递给的每个chunk(大小chunksize)创建一个iterPool.map(f, iter)
  2. 对于每个 task
    1. 序列化task, 和task's返回值(想想pickle.dumps()
    2. 反序列化task,和task's返回值(想想pickle.loads()
    3. 浪费大量时间等待Locks共享内存Queues,而工作进程和父进程get()以及put()从/到这些Queues
  3. os.fork()每个工作进程调用 的一次性成本,这是昂贵的。

In essence, when using Pool()you want:

本质上,在使用时Pool()你想要:

  1. High CPU resource requirements
  2. Low data footprint passed to each function call
  3. Reasonably long iterto justify the one-time cost of (3) above.
  1. 高 CPU 资源要求
  2. 传递给每个函数调用的低数据占用
  3. 合理的长iter以证明上述 (3) 的一次性成本是合理的。

For a more in-depth exploration, this post and linked talkwalk-through how large data being passed to Pool.map()(and friends)gets you into trouble.

为了进行更深入的探索,这篇文章和链接的谈话演练了传递给Pool.map()和朋友)的大数据如何让您陷入困境。

Raymond Hettinger also talks about proper use of Python's concurrency here.

Raymond Hettinger 还在这里讨论了正确使用 Python 的并发性。