如何在 python 多处理中利用所有内核
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/19086106/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to utilize all cores with python multiprocessing
提问by Darkstarone
have been fiddling with Python's multicore
function for upwards of an hour now, trying to parallelize a rather complex graph traversal function using Process
and Manager
:
multicore
一个多小时以来一直在摆弄 Python 的函数,试图使用Process
and并行化一个相当复杂的图遍历函数Manager
:
import networkx as nx
import csv
import time
from operator import itemgetter
import os
import multiprocessing as mp
cutoff = 1
exclusionlist = ["cpd:C00024"]
DG = nx.read_gml("KeggComplete.gml", relabel = True)
for exclusion in exclusionlist:
DG.remove_node(exclusion)
#checks if 'memorizedPaths exists, and if not, creates it
fn = os.path.join(os.path.dirname(__file__), 'memorizedPaths' + str(cutoff+1))
if not os.path.exists(fn):
os.makedirs(fn)
manager = mp.Manager()
memorizedPaths = manager.dict()
filepaths = manager.dict()
degreelist = sorted(DG.degree_iter(),key=itemgetter(1),reverse=True)
def _all_simple_paths_graph(item, DG, cutoff, memorizedPaths, filepaths):
source = item[0]
uniqueTreePaths = []
if cutoff < 1:
return
visited = [source]
stack = [iter(DG[source])]
while stack:
children = stack[-1]
child = next(children, None)
if child is None:
stack.pop()
visited.pop()
elif child in memorizedPaths:
for path in memorizedPaths[child]:
newPath = (tuple(visited) + tuple(path))
if (len(newPath) <= cutoff) and (len(set(visited) & set(path)) == 0):
uniqueTreePaths.append(newPath)
continue
elif len(visited) < cutoff:
if child not in visited:
visited.append(child)
stack.append(iter(DG[child]))
if visited not in uniqueTreePaths:
uniqueTreePaths.append(tuple(visited))
else: #len(visited) == cutoff:
if (visited not in uniqueTreePaths) and (child not in visited):
uniqueTreePaths.append(tuple(visited + [child]))
stack.pop()
visited.pop()
#writes the absolute path of the node path file into the hash table
filepaths[source] = str(fn) + "/" + str(source) +"path.txt"
with open (filepaths[source], "wb") as csvfile2:
writer = csv.writer(csvfile2, delimiter=' ', quotechar='|')
for path in uniqueTreePaths:
writer.writerow(path)
memorizedPaths[source] = uniqueTreePaths
############################################################################
start = time.clock()
if __name__ == '__main__':
for item in degreelist:
test = mp.Process(target=_all_simple_paths_graph, args=(DG, cutoff, item, memorizedPaths, filepaths))
test.start()
test.join()
end = time.clock()
print (end-start)
Currently - though luck and magic - it works (sort of). My problem is I'm only using 12 of my 24 cores.
目前 - 尽管运气和魔法 - 它有效(有点)。我的问题是我只使用了 24 个内核中的 12 个。
Can someone explain why this might be the case? Perhaps my code isn't the best multiprocessing solution, or is it a feature of my architecture [Intel Xeon CPU E5-2640 @ 2.50GHz x18 running on Ubuntu 13.04 x64]?
有人可以解释为什么会这样吗?也许我的代码不是最好的多处理解决方案,或者它是我架构的一个特性 [Intel Xeon CPU E5-2640 @ 2.50GHz x18 在 Ubuntu 13.04 x64 上运行]?
EDIT:
编辑:
I managed to get:
我设法得到:
p = mp.Pool()
for item in degreelist:
p.apply_async(_all_simple_paths_graph, args=(DG, cutoff, item, memorizedPaths, filepaths))
p.close()
p.join()
Working, however, it's VERY SLOW! So I assume I'm using the wrong function for the job. hopefully it helps clarify exactly what I'm trying to accomplish!
但是,工作非常慢!所以我假设我在工作中使用了错误的功能。希望它有助于澄清我正在努力实现的目标!
EDIT2: .map
attempt:
EDIT2:.map
尝试:
partialfunc = partial(_all_simple_paths_graph, DG=DG, cutoff=cutoff, memorizedPaths=memorizedPaths, filepaths=filepaths)
p = mp.Pool()
for item in processList:
processVar = p.map(partialfunc, xrange(len(processList)))
p.close()
p.join()
Works, is slower than singlecore. Time to optimize!
工作,比单核慢。是时候优化了!
采纳答案by Tim Peters
Too much piling up here to address in comments, so, where mp
is multiprocessing
:
太多堆放在这里的地址在注释中,所以,在这里mp
是multiprocessing
:
mp.cpu_count()
should return the number of processors. But test it. Some platforms are funky, and this info isn't always easy to get. Python does the best it can.
mp.cpu_count()
应该返回处理器的数量。但是测试一下。有些平台很时髦,而且这些信息并不总是很容易获得。Python 尽其所能。
If you start 24 processes, they'll do exactly what you tell them to do ;-) Looks like mp.Pool()
would be most convenient for you. You pass the number of processes you want to create to its constructor. mp.Pool(processes=None)
will use mp.cpu_count()
for the number of processors.
如果您启动 24 个进程,它们将完全按照您的要求执行 ;-) 看起来mp.Pool()
对您来说最方便。您将要创建的进程数传递给其构造函数。 mp.Pool(processes=None)
将mp.cpu_count()
用于处理器的数量。
Then you can use, for example, .imap_unordered(...)
on your Pool
instance to spread your degreelist
across processes. Or maybe some other Pool
method would work better for you - experiment.
然后您可以使用,例如,.imap_unordered(...)
在您的Pool
实例上degreelist
跨进程分布。或者也许其他一些Pool
方法更适合您 - 实验。
If you can't bash the problem into Pool
's view of the world, you could instead create an mp.Queue
to create a work queue, .put()
'ing nodes (or slices of nodes, to reduce overhead) to work on in the main program, and write the workers to .get()
work items off that queue. Ask if you need examples. Note that you need to put sentinel values (one per process) on the queue, after all the "real" work items, so that worker processes can test for the sentinel to know when they're done.
如果你不能把问题放到Pool
's 的世界观中,你可以创建一个mp.Queue
来创建一个工作队列,.put()
'ing 节点(或节点片,以减少开销)在主程序中工作,然后写.get()
工作人员从该队列中处理项目。询问您是否需要示例。请注意,在所有“真实”工作项之后,您需要将哨兵值(每个进程一个)放在队列中,以便工作进程可以测试哨兵以了解它们何时完成。
FYI, I like queues because they're more explicit. Many others like Pool
s better because they're more magical ;-)
仅供参考,我喜欢队列,因为它们更明确。许多其他人更喜欢Pool
s,因为它们更神奇;-)
Pool Example
池示例
Here's an executable prototype for you. This shows one way to use imap_unordered
with Pool
and chunksize
that doesn't require changing any function signatures. Of course you'll have to plug in your real code ;-) Note that the init_worker
approach allows passing "most of" the arguments only once per processor, not once for every item in your degreeslist
. Cutting the amount of inter-process communication can be crucial for speed.
这是给你的可执行原型。这显示了一种使用imap_unordered
与Pool
和chunksize
不需要改变任何函数签名。当然,您必须插入您的真实代码;-) 请注意,该init_worker
方法只允许每个处理器传递“大部分”参数一次,而不是为degreeslist
. 减少进程间通信的数量对于速度至关重要。
import multiprocessing as mp
def init_worker(mps, fps, cut):
global memorizedPaths, filepaths, cutoff
global DG
print "process initializing", mp.current_process()
memorizedPaths, filepaths, cutoff = mps, fps, cut
DG = 1##nx.read_gml("KeggComplete.gml", relabel = True)
def work(item):
_all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths)
def _all_simple_paths_graph(DG, cutoff, item, memorizedPaths, filepaths):
pass # print "doing " + str(item)
if __name__ == "__main__":
m = mp.Manager()
memorizedPaths = m.dict()
filepaths = m.dict()
cutoff = 1 ##
# use all available CPUs
p = mp.Pool(initializer=init_worker, initargs=(memorizedPaths,
filepaths,
cutoff))
degreelist = range(100000) ##
for _ in p.imap_unordered(work, degreelist, chunksize=500):
pass
p.close()
p.join()
I strongly advise running this exactly as-is, so you can see that it's blazing fast. Then add things to it a bit a time, to see how that affects the time. For example, just adding
我强烈建议完全按原样运行它,这样您就可以看到它的速度非常快。然后一次添加一些东西,看看它如何影响时间。例如,只需添加
memorizedPaths[item] = item
to _all_simple_paths_graph()
slows it down enormously. Why? Because the dict gets bigger and bigger with each addition, and this process-safe dict has to be synchronized (under the covers) among all the processes. The unit of synchronization is "the entire dict" - there's no internal structure the mp machinery can exploit to do incremental updates to the shared dict.
以_all_simple_paths_graph()
减缓下来极大。为什么?因为每次添加时 dict 都会变得越来越大,并且这个进程安全的 dict 必须在所有进程之间同步(在幕后)。同步的单位是“整个字典”——mp 机器没有可以利用的内部结构来对共享字典进行增量更新。
If you can't afford this expense, then you can't use a Manager.dict()
for this. Opportunities for cleverness abound ;-)
如果您负担不起这笔费用,那么您就不能Manager.dict()
为此使用 a 。聪明的机会比比皆是;-)