如何在 python 多处理中利用所有内核

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/19086106/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 12:49:41  来源:igfitidea点击:

How to utilize all cores with python multiprocessing

pythonmultiprocessing

提问by Darkstarone

have been fiddling with Python's multicorefunction for upwards of an hour now, trying to parallelize a rather complex graph traversal function using Processand Manager:

multicore一个多小时以来一直在摆弄 Python 的函数,试图使用Processand并行化一个相当复杂的图遍历函数Manager

import networkx as nx
import csv
import time 
from operator import itemgetter
import os
import multiprocessing as mp

cutoff = 1

exclusionlist = ["cpd:C00024"]

DG = nx.read_gml("KeggComplete.gml", relabel = True)

for exclusion in exclusionlist:
    DG.remove_node(exclusion)

#checks if 'memorizedPaths exists, and if not, creates it
fn = os.path.join(os.path.dirname(__file__), 'memorizedPaths' + str(cutoff+1))
if not os.path.exists(fn):
    os.makedirs(fn)

manager = mp.Manager()
memorizedPaths = manager.dict()
filepaths = manager.dict()
degreelist = sorted(DG.degree_iter(),key=itemgetter(1),reverse=True)

def _all_simple_paths_graph(item, DG, cutoff, memorizedPaths, filepaths):
    source = item[0]
    uniqueTreePaths = []
    if cutoff < 1:
        return
    visited = [source]
    stack = [iter(DG[source])]
    while stack:
        children = stack[-1]
        child = next(children, None)
        if child is None:
            stack.pop()
            visited.pop()
        elif child in memorizedPaths:
            for path in memorizedPaths[child]:
                newPath = (tuple(visited) + tuple(path))
                if (len(newPath) <= cutoff) and (len(set(visited) & set(path)) == 0):
                    uniqueTreePaths.append(newPath)
            continue
        elif len(visited) < cutoff:
            if child not in visited:
                visited.append(child)
                stack.append(iter(DG[child]))
                if visited not in uniqueTreePaths:
                    uniqueTreePaths.append(tuple(visited))
        else: #len(visited) == cutoff:
            if (visited not in uniqueTreePaths) and (child not in visited):
                uniqueTreePaths.append(tuple(visited + [child]))
            stack.pop()
            visited.pop()
    #writes the absolute path of the node path file into the hash table
    filepaths[source] = str(fn) + "/" + str(source) +"path.txt"
    with open (filepaths[source], "wb") as csvfile2:
        writer = csv.writer(csvfile2, delimiter=' ', quotechar='|')
        for path in uniqueTreePaths:
            writer.writerow(path)
    memorizedPaths[source] = uniqueTreePaths

############################################################################

start = time.clock()
if __name__ == '__main__':
    for item in degreelist:
        test = mp.Process(target=_all_simple_paths_graph, args=(DG, cutoff, item, memorizedPaths, filepaths))
        test.start()
        test.join()
end = time.clock()
print (end-start)

Currently - though luck and magic - it works (sort of). My problem is I'm only using 12 of my 24 cores.

目前 - 尽管运气和魔法 - 它有效(有点)。我的问题是我只使用了 24 个内核中的 12 个。

Can someone explain why this might be the case? Perhaps my code isn't the best multiprocessing solution, or is it a feature of my architecture [Intel Xeon CPU E5-2640 @ 2.50GHz x18 running on Ubuntu 13.04 x64]?

有人可以解释为什么会这样吗?也许我的代码不是最好的多处理解决方案,或者它是我架构的一个特性 [Intel Xeon CPU E5-2640 @ 2.50GHz x18 在 Ubuntu 13.04 x64 上运行]?

EDIT:

编辑:

I managed to get:

我设法得到:

p = mp.Pool()
for item in degreelist:
    p.apply_async(_all_simple_paths_graph, args=(DG, cutoff, item, memorizedPaths, filepaths))
p.close()
p.join()

Working, however, it's VERY SLOW! So I assume I'm using the wrong function for the job. hopefully it helps clarify exactly what I'm trying to accomplish!

但是,工作非常慢!所以我假设我在工作中使用了错误的功能。希望它有助于澄清我正在努力实现的目标!

EDIT2: .mapattempt:

EDIT2:.map尝试:

partialfunc = partial(_all_simple_paths_graph, DG=DG, cutoff=cutoff, memorizedPaths=memorizedPaths, filepaths=filepaths)
p = mp.Pool()
for item in processList:
    processVar = p.map(partialfunc, xrange(len(processList)))   
p.close()
p.join()

Works, is slower than singlecore. Time to optimize!

工作,比单核慢。是时候优化了!

采纳答案by Tim Peters

Too much piling up here to address in comments, so, where mpis multiprocessing:

太多堆放在这里的地址在注释中,所以,在这里mpmultiprocessing

mp.cpu_count()should return the number of processors. But test it. Some platforms are funky, and this info isn't always easy to get. Python does the best it can.

mp.cpu_count()应该返回处理器的数量。但是测试一下。有些平台很时髦,而且这些信息并不总是很容易获得。Python 尽其所能。

If you start 24 processes, they'll do exactly what you tell them to do ;-) Looks like mp.Pool()would be most convenient for you. You pass the number of processes you want to create to its constructor. mp.Pool(processes=None)will use mp.cpu_count()for the number of processors.

如果您启动 24 个进程,它们将完全按照您的要求执行 ;-) 看起来mp.Pool()对您来说最方便。您将要创建的进程数传递给其构造函数。 mp.Pool(processes=None)mp.cpu_count()用于处理器的数量。

Then you can use, for example, .imap_unordered(...)on your Poolinstance to spread your degreelistacross processes. Or maybe some other Poolmethod would work better for you - experiment.

然后您可以使用,例如,.imap_unordered(...)在您的Pool实例上degreelist跨进程分布。或者也许其他一些Pool方法更适合您 - 实验。

If you can't bash the problem into Pool's view of the world, you could instead create an mp.Queueto create a work queue, .put()'ing nodes (or slices of nodes, to reduce overhead) to work on in the main program, and write the workers to .get()work items off that queue. Ask if you need examples. Note that you need to put sentinel values (one per process) on the queue, after all the "real" work items, so that worker processes can test for the sentinel to know when they're done.

如果你不能把问题放到Pool's 的世界观中,你可以创建一个mp.Queue来创建一个工作队列,.put()'ing 节点(或节点片,以减少开销)在主程序中工作,然后写.get()工作人员从该队列中处理项目。询问您是否需要示例。请注意,在所有“真实”工作项之后,您需要将哨兵值(每个进程一个)放在队列中,以便工作进程可以测试哨兵以了解它们何时完成。

FYI, I like queues because they're more explicit. Many others like Pools better because they're more magical ;-)

仅供参考,我喜欢队列,因为它们更明确。许多其他人更喜欢Pools,因为它们更神奇;-)

Pool Example

池示例

Here's an executable prototype for you. This shows one way to use imap_unorderedwith Pooland chunksizethat doesn't require changing any function signatures. Of course you'll have to plug in your real code ;-) Note that the init_workerapproach allows passing "most of" the arguments only once per processor, not once for every item in your degreeslist. Cutting the amount of inter-process communication can be crucial for speed.

这是给你的可执行原型。这显示了一种使用imap_unorderedPoolchunksize不需要改变任何函数签名。当然,您必须插入您的真实代码;-) 请注意,该init_worker方法只允许每个处理器传递“大部分”参数一次,而不是为degreeslist. 减少进程间通信的数量对于速度至关重要。

import multiprocessing as mp

def init_worker(mps, fps, cut):
    global memorizedPaths, filepaths, cutoff
    global DG

    print "process initializing", mp.current_process()
    memorizedPaths, filepaths, cutoff = mps, fps, cut
    DG = 1##nx.read_gml("KeggComplete.gml", relabel = True)

def work(item):
    _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths)

def _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths):
    pass # print "doing " + str(item)

if __name__ == "__main__":
    m = mp.Manager()
    memorizedPaths = m.dict()
    filepaths = m.dict()
    cutoff = 1 ##
    # use all available CPUs
    p = mp.Pool(initializer=init_worker, initargs=(memorizedPaths,
                                                   filepaths,
                                                   cutoff))
    degreelist = range(100000) ##
    for _ in p.imap_unordered(work, degreelist, chunksize=500):
        pass
    p.close()
    p.join()

I strongly advise running this exactly as-is, so you can see that it's blazing fast. Then add things to it a bit a time, to see how that affects the time. For example, just adding

我强烈建议完全按原样运行它,这样您就可以看到它的速度非常快。然后一次添加一些东西,看看它如何影响时间。例如,只需添加

   memorizedPaths[item] = item

to _all_simple_paths_graph()slows it down enormously. Why? Because the dict gets bigger and bigger with each addition, and this process-safe dict has to be synchronized (under the covers) among all the processes. The unit of synchronization is "the entire dict" - there's no internal structure the mp machinery can exploit to do incremental updates to the shared dict.

_all_simple_paths_graph()减缓下来极大。为什么?因为每次添加时 dict 都会变得越来越大,并且这个进程安全的 dict 必须在所有进程之间同步(在幕后)。同步的单位是“整个字典”——mp 机器没有可以利用的内部结构来对共享字典进行增量更新。

If you can't afford this expense, then you can't use a Manager.dict()for this. Opportunities for cleverness abound ;-)

如果您负担不起这笔费用,那么您就不能Manager.dict()为此使用 a 。聪明的机会比比皆是;-)