内存使用量随着 Python 的 multiprocessing.pool 不断增长

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/18414020/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 10:40:17  来源:igfitidea点击:

Memory usage keep growing with Python's multiprocessing.pool

pythonmemorymultiprocessingpool

提问by C.B.

Here's the program:

这是程序:

#!/usr/bin/python

import multiprocessing

def dummy_func(r):
    pass

def worker():
    pass

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=16)
    for index in range(0,100000):
        pool.apply_async(worker, callback=dummy_func)

    # clean up
    pool.close()
    pool.join()

I found memory usage (both VIRT and RES) kept growing up till close()/join(), is there any solution to get rid of this? I tried maxtasksperchild with 2.7 but it didn't help either.

我发现内存使用量(VIRT 和 RES)一直在增长,直到 close()/join(),有什么解决方案可以摆脱这种情况吗?我用 2.7 尝试了 maxtasksperchild 但它也没有帮助。

I have a more complicated program that calles apply_async() ~6M times, and at ~1.5M point I've already got 6G+ RES, to avoid all other factors, I simplified the program to above version.

我有一个更复杂的程序,它调用 apply_async() ~6M 次,在 ~1.5M 点我已经有 6G+ RES,为了避免所有其他因素,我将程序简化为以上版本。

EDIT:

编辑:

Turned out this version works better, thanks for everyone's input:

原来这个版本效果更好,感谢大家的投入:

#!/usr/bin/python

import multiprocessing

ready_list = []
def dummy_func(index):
    global ready_list
    ready_list.append(index)

def worker(index):
    return index

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=16)
    result = {}
    for index in range(0,1000000):
        result[index] = (pool.apply_async(worker, (index,), callback=dummy_func))
        for ready in ready_list:
            result[ready].wait()
            del result[ready]
        ready_list = []

    # clean up
    pool.close()
    pool.join()

I didn't put any lock there as I believe main process is single threaded (callback is more or less like a event-driven thing per docs I read).

我没有在那里放任何锁,因为我相信主进程是单线程的(根据我阅读的文档,回调或多或少像一个事件驱动的东西)。

I changed v1's index range to 1,000,000, same as v2 and did some tests - it's weird to me v2 is even ~10% faster than v1 (33s vs 37s), maybe v1 was doing too many internal list maintenance jobs. v2 is definitely a winner on memory usage, it never went over 300M (VIRT) and 50M (RES), while v1 used to be 370M/120M, the best was 330M/85M. All numbers were just 3~4 times testing, reference only.

我将 v1 的索引范围更改为 1,000,000,与 v2 相同并做了一些测试 - 我觉得很奇怪 v2 甚至比 v1 快约 10%(33 秒 vs 37 秒),也许 v1 做了太多的内部列表维护工作。v2绝对是内存使用的赢家,它从来没有超过300M(VIRT)和50M(RES),而v1曾经是370M/120M,最好的是330M/85M。所有数字只是3~4次测试,仅供参考。

回答by Fish Monitor

Use map_asyncinstead of apply_asyncto avoid excessive memory usage.

使用map_async而不是apply_async避免过多的内存使用。

For your first example, change the following two lines:

对于第一个示例,更改以下两行:

for index in range(0,100000):
    pool.apply_async(worker, callback=dummy_func)

to

pool.map_async(worker, range(100000), callback=dummy_func)

It will finish in a blink before you can see its memory usage in top. Change the list to a bigger one to see the difference. But note map_asyncwill first convert the iterable you pass to it to a list to calculate its length if it doesn't have __len__method. If you have an iterator of a huge number of elements, you can use itertools.isliceto process them in smaller chunks.

在您可以在top. 将列表更改为更大的列表以查看差异。但是注意map_async将首先将传递给它的可迭代对象转换为列表以计算它的长度,如果它没有__len__方法。如果你有一个包含大量元素的迭代器,你可以用itertools.islice更小的块来处理它们。

I had a memory problem in a real-life program with much more data and finally found the culprit was apply_async.

我在具有更多数据的实际程序中遇到了内存问题,最终发现罪魁祸首是apply_async.

P.S., in respect of memory usage, your two examples have no obvious difference.

PS,在内存使用方面,您的两个示例没有明显区别。

回答by kitsu.eb

I have a very large 3d point cloud data set I'm processing. I tried using the multiprocessing module to speed up the processing, but I started getting out of memory errors. After some research and testing I determined that I was filling the queue of tasks to be processed much quicker than the subprocesses could empty it. I'm sure by chunking, or using map_async or something I could have adjusted the load, but I didn't want to make major changes to the surrounding logic.

我正在处理一个非常大的 3d 点云数据集。我尝试使用多处理模块来加速处理,但我开始出现内存不足错误。经过一些研究和测试,我确定我填充要处理的任务队列的速度比子进程清空它的速度要快得多。我确定通过分块,或使用 map_async 或其他我可以调整负载的东西,但我不想对周围的逻辑进行重大更改。

The dumb solution I hit on is to check the pool._cachelength intermittently, and if the cache is too large then wait for the queue to empty.

我遇到的愚蠢的解决方案是pool._cache间歇性地检查长度,如果缓存太大,则等待队列清空。

In my mainloop I already had a counter and a status ticker:

在我的主循环中,我已经有了一个计数器和一个状态代码:

# Update status
count += 1
if count%10000 == 0:
    sys.stdout.write('.')
    if len(pool._cache) > 1e6:
        print "waiting for cache to clear..."
        last.wait() # Where last is assigned the latest ApplyResult

So every 10k insertion into the pool I check if there are more than 1 million operations queued (about 1G of memory used in the main process). When the queue is full I just wait for the last inserted job to finish.

因此,每向池中插入 10k 次,我就会检查是否有超过 100 万个操作排队(主进程中使用了大约 1G 的内存)。当队列已满时,我只等待最后插入的作业完成。

Now my program can run for hours without running out of memory. The main process just pauses occasionally while the workers continue processing the data.

现在我的程序可以运行几个小时而不会耗尽内存。当工作人员继续处理数据时,主进程只是偶尔暂停。

BTW the _cache member is documented the the multiprocessing module pool example:

顺便说一句,_cache 成员记录在多处理模块池示例中:

#
# Check there are no outstanding tasks
#

assert not pool._cache, 'cache = %r' % pool._cache

回答by deddu

I had memory issues recently, since I was using multiple times the multiprocessing function, so it keep spawning processes, and leaving them in memory.

我最近遇到了内存问题,因为我多次使用多处理函数,所以它不断产生进程,并将它们留在内存中。

Here's the solution I'm using now:

这是我现在使用的解决方案:

def myParallelProcess(ahugearray)
 from multiprocessing import Pool
 from contextlib import closing
 with closing( Pool(15) ) as p:
    res = p.imap_unordered(simple_matching, ahugearray, 100)
 return res

I ? with

一世 ?

回答by Don Kirkby

I think this is similar to the question I posted, but I'm not sure you have the same delay. My problem was that I was producing results from the multiprocessing pool faster than I was consuming them, so they built up in memory. To avoid that, I used a semaphoreto throttle the inputs into the pool so they didn't get too far ahead of the outputs I was consuming.

我认为这类似于我发布的问题,但我不确定您是否有相同的延迟。我的问题是我从多处理池中产生结果的速度比我消耗它们的速度快,所以它们在内存中建立起来。为了避免这种情况,我使用了一个信号量来限制池中的输入,这样它们就不会比我消耗的输出领先太多。

回答by Ullullu

Simply create the pool within your loop and close it at the end of the loop with pool.close().

只需在循环中创建池并在循环结束时使用 pool.close().