Python 多处理:如何在类中定义的函数上使用 Pool.map?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/3288595/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Multiprocessing: How to use Pool.map on a function defined in a class?
提问by Mermoz
When I run something like:
当我运行类似的东西时:
from multiprocessing import Pool
p = Pool(5)
def f(x):
return x*x
p.map(f, [1,2,3])
it works fine. However, putting this as a function of a class:
它工作正常。但是,将其作为类的函数:
class calculate(object):
def run(self):
def f(x):
return x*x
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
Gives me the following error:
给了我以下错误:
Exception in thread Thread-1:
Traceback (most recent call last):
File "/sw/lib/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/sw/lib/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File "/sw/lib/python2.6/multiprocessing/pool.py", line 225, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
I've seen a post from Alex Martelli dealing with the same kind of problem, but it wasn't explicit enough.
我看过 Alex Martelli 的一篇关于同样问题的帖子,但不够明确。
采纳答案by mrule
I also was annoyed by restrictions on what sort of functions pool.map could accept. I wrote the following to circumvent this. It appears to work, even for recursive use of parmap.
我也对 pool.map 可以接受的函数类型的限制感到恼火。我写了以下内容来规避这一点。它似乎有效,即使是递归使用 parmap 也是如此。
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(pipe,x):
pipe.send(f(x))
pipe.close()
return fun
def parmap(f,X):
pipe=[Pipe() for x in X]
proc=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]
[p.start() for p in proc]
[p.join() for p in proc]
return [p.recv() for (p,c) in pipe]
if __name__ == '__main__':
print parmap(lambda x:x**x,range(1,5))
回答by robert
Functions defined in classes (even within functions within classes) don't really pickle. However, this works:
在类中定义的函数(甚至在类中的函数中)并没有真正的泡菜。但是,这有效:
def f(x):
return x*x
class calculate(object):
def run(self):
p = Pool()
return p.map(f, [1,2,3])
cl = calculate()
print cl.run()
回答by Eric O Lebigot
There is currently no solution to your problem, as far as I know: the function that you give to map()must be accessible through an import of your module. This is why robert's code works: the function f()can be obtained by importing the following code:
据我所知,目前没有解决您的问题的方法:您提供的功能map()必须可以通过导入模块访问。这就是 robert 的代码起作用的原因:f()可以通过导入以下代码获得该函数:
def f(x):
return x*x
class Calculate(object):
def run(self):
p = Pool()
return p.map(f, [1,2,3])
if __name__ == '__main__':
cl = Calculate()
print cl.run()
I actually added a "main" section, because this follows the recommendations for the Windows platform("Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects").
我实际上添加了一个“主要”部分,因为这遵循了Windows 平台的建议(“确保新的 Python 解释器可以安全地导入主模块,而不会导致意外的副作用”)。
I also added an uppercase letter in front of Calculate, so as to follow PEP 8. :)
我还在 前面添加了一个大写字母Calculate,以便遵循PEP 8。:)
回答by Brandt
I've also struggled with this. I had functions as data members of a class, as a simplified example:
我也为此苦苦挣扎。作为一个简化的例子,我将函数作为类的数据成员:
from multiprocessing import Pool
import itertools
pool = Pool()
class Example(object):
def __init__(self, my_add):
self.f = my_add
def add_lists(self, list1, list2):
# Needed to do something like this (the following line won't work)
return pool.map(self.f,list1,list2)
I needed to use the function self.f in a Pool.map() call from within the same class and self.f did not take a tuple as an argument. Since this function was embedded in a class, it was not clear to me how to write the type of wrapper other answers suggested.
我需要在同一个类中的 Pool.map() 调用中使用函数 self.f 并且 self.f 没有将元组作为参数。由于此函数嵌入在一个类中,因此我不清楚如何编写其他答案建议的包装器类型。
I solved this problem by using a different wrapper that takes a tuple/list, where the first element is the function, and the remaining elements are the arguments to that function, called eval_func_tuple(f_args). Using this, the problematic line can be replaced by return pool.map(eval_func_tuple, itertools.izip(itertools.repeat(self.f), list1, list2)). Here is the full code:
我通过使用一个不同的包装器解决了这个问题,它接受一个元组/列表,其中第一个元素是函数,其余元素是该函数的参数,称为 eval_func_tuple(f_args)。使用这个,有问题的行可以替换为 return pool.map(eval_func_tuple, itertools.izip(itertools.repeat(self.f), list1, list2))。这是完整的代码:
File: util.py
文件:util.py
def add(a, b): return a+b
def eval_func_tuple(f_args):
"""Takes a tuple of a function and args, evaluates and returns result"""
return f_args[0](*f_args[1:])
File: main.py
文件:main.py
from multiprocessing import Pool
import itertools
import util
pool = Pool()
class Example(object):
def __init__(self, my_add):
self.f = my_add
def add_lists(self, list1, list2):
# The following line will now work
return pool.map(util.eval_func_tuple,
itertools.izip(itertools.repeat(self.f), list1, list2))
if __name__ == '__main__':
myExample = Example(util.add)
list1 = [1, 2, 3]
list2 = [10, 20, 30]
print myExample.add_lists(list1, list2)
Running main.py will give [11, 22, 33]. Feel free to improve this, for example eval_func_tuple could also be modified to take keyword arguments.
运行 main.py 将给出 [11, 22, 33]。随意改进这一点,例如 eval_func_tuple 也可以修改为采用关键字参数。
On another note, in another answers, the function "parmap" can be made more efficient for the case of more Processes than number of CPUs available. I'm copying an edited version below. This is my first post and I wasn't sure if I should directly edit the original answer. I also renamed some variables.
另一方面,在另一个答案中,对于比可用 CPU 数量更多的进程,可以使函数“parmap”更有效。我正在复制下面的编辑版本。这是我的第一篇文章,我不确定是否应该直接编辑原始答案。我还重命名了一些变量。
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(pipe,x):
pipe.send(f(x))
pipe.close()
return fun
def parmap(f,X):
pipe=[Pipe() for x in X]
processes=[Process(target=spawn(f),args=(c,x)) for x,(p,c) in izip(X,pipe)]
numProcesses = len(processes)
processNum = 0
outputList = []
while processNum < numProcesses:
endProcessNum = min(processNum+multiprocessing.cpu_count(), numProcesses)
for proc in processes[processNum:endProcessNum]:
proc.start()
for proc in processes[processNum:endProcessNum]:
proc.join()
for proc,c in pipe[processNum:endProcessNum]:
outputList.append(proc.recv())
processNum = endProcessNum
return outputList
if __name__ == '__main__':
print parmap(lambda x:x**x,range(1,5))
回答by Bob McElrath
The solution by mrule is correct but has a bug: if the child sends back a large amount of data, it can fill the pipe's buffer, blocking on the child's pipe.send(), while the parent is waiting for the child to exit on pipe.join(). The solution is to read the child's data before join()ing the child. Furthermore the child should close the parent's end of the pipe to prevent a deadlock. The code below fixes that. Also be aware that this parmapcreates one process per element in X. A more advanced solution is to use multiprocessing.cpu_count()to divide Xinto a number of chunks, and then merge the results before returning. I leave that as an exercise to the reader so as not to spoil the conciseness of the nice answer by mrule. ;)
mrule 的解决方案是正确的,但有一个错误:如果孩子发回大量数据,它可以填充管道的缓冲区,阻塞孩子的pipe.send(),而父母正在等待孩子退出pipe.join()。解决办法是在join()调用child之前先读取child的数据。此外,孩子应该关闭管道的父母端以防止死锁。下面的代码解决了这个问题。另请注意,这parmap会为 中的每个元素创建一个进程X。一个更高级的解决方案是使用multiprocessing.cpu_count()分割X成多个块,然后在返回之前合并结果。我把它留给读者作为练习,以免破坏 mrule 的漂亮答案的简洁性。;)
from multiprocessing import Process, Pipe
from itertools import izip
def spawn(f):
def fun(ppipe, cpipe,x):
ppipe.close()
cpipe.send(f(x))
cpipe.close()
return fun
def parmap(f,X):
pipe=[Pipe() for x in X]
proc=[Process(target=spawn(f),args=(p,c,x)) for x,(p,c) in izip(X,pipe)]
[p.start() for p in proc]
ret = [p.recv() for (p,c) in pipe]
[p.join() for p in proc]
return ret
if __name__ == '__main__':
print parmap(lambda x:x**x,range(1,5))
回答by klaus se
I could not use the codes posted so far because the codes using "multiprocessing.Pool" do not work with lambda expressions and the codes not using "multiprocessing.Pool" spawn as many processes as there are work items.
我无法使用到目前为止发布的代码,因为使用“multiprocessing.Pool”的代码不适用于 lambda 表达式,并且不使用“multiprocessing.Pool”的代码会产生与工作项一样多的进程。
I adapted the code s.t. it spawns a predefined amount of workers and only iterates through the input list if there exists an idle worker. I also enabled the "daemon" mode for the workers s.t. ctrl-c works as expected.
我修改了代码 st 它产生了预定义数量的工作人员,并且只有在存在空闲工作人员时才遍历输入列表。我还为工人启用了“守护进程”模式 st ctrl-c 按预期工作。
import multiprocessing
def fun(f, q_in, q_out):
while True:
i, x = q_in.get()
if i is None:
break
q_out.put((i, f(x)))
def parmap(f, X, nprocs=multiprocessing.cpu_count()):
q_in = multiprocessing.Queue(1)
q_out = multiprocessing.Queue()
proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
for _ in range(nprocs)]
for p in proc:
p.daemon = True
p.start()
sent = [q_in.put((i, x)) for i, x in enumerate(X)]
[q_in.put((None, None)) for _ in range(nprocs)]
res = [q_out.get() for _ in range(len(sent))]
[p.join() for p in proc]
return [x for i, x in sorted(res)]
if __name__ == '__main__':
print(parmap(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8]))
回答by Mike McKerns
Multiprocessing and pickling is broken and limited unless you jump outside the standard library.
除非你跳出标准库,否则多处理和酸洗会被破坏和限制。
If you use a fork of multiprocessingcalled pathos.multiprocesssing, you can directly use classes and class methods in multiprocessing's mapfunctions. This is because dillis used instead of pickleor cPickle, and dillcan serialize almost anything in python.
如果你使用一个multiprocessing被调用的分支pathos.multiprocesssing,你可以直接在 multiprocessing 的map函数中使用类和类方法。这是因为dill使用 代替pickleor cPickle,并且dill可以在 python 中序列化几乎任何东西。
pathos.multiprocessingalso provides an asynchronous map function… and it can mapfunctions with multiple arguments (e.g. map(math.pow, [1,2,3], [4,5,6]))
pathos.multiprocessing还提供了一个异步映射函数......它可以map使用多个参数(例如map(math.pow, [1,2,3], [4,5,6]))
See discussions: What can multiprocessing and dill do together?
查看讨论: multiprocessing 和 dill 可以一起做什么?
and: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
和:http: //matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization
It even handles the code you wrote initially, without modification, and from the interpreter.Why do anything else that's more fragile and specific to a single case?
它甚至可以处理您最初编写的代码,无需修改,并且来自解释器。为什么要做其他更脆弱和特定于单个案例的事情?
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> class calculate(object):
... def run(self):
... def f(x):
... return x*x
... p = Pool()
... return p.map(f, [1,2,3])
...
>>> cl = calculate()
>>> print cl.run()
[1, 4, 9]
Get the code here: https://github.com/uqfoundation/pathos
在此处获取代码:https: //github.com/uqfoundation/pathos
And, just to show off a little more of what it can do:
而且,只是为了炫耀一下它的功能:
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>>
>>> p = Pool(4)
>>>
>>> def add(x,y):
... return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> res = p.amap(t.plus, x, y)
>>> res.get()
[4, 6, 8, 10]
回答by aganders3
I modified klaus se's method because while it was working for me with small lists, it would hang when the number of items was ~1000 or greater. Instead of pushing the jobs one at a time with the Nonestop condition, I load up the input queue all at once and just let the processes munch on it until it's empty.
我修改了 klaus se 的方法,因为当它对我使用小列表时,它会在项目数为 ~1000 或更多时挂起。我不是在None停止条件下一次推送一个作业,而是一次性加载所有输入队列,然后让进程咀嚼它直到它为空。
from multiprocessing import cpu_count, Queue, Process
def apply_func(f, q_in, q_out):
while not q_in.empty():
i, x = q_in.get()
q_out.put((i, f(x)))
# map a function using a pool of processes
def parmap(f, X, nprocs = cpu_count()):
q_in, q_out = Queue(), Queue()
proc = [Process(target=apply_func, args=(f, q_in, q_out)) for _ in range(nprocs)]
sent = [q_in.put((i, x)) for i, x in enumerate(X)]
[p.start() for p in proc]
res = [q_out.get() for _ in sent]
[p.join() for p in proc]
return [x for i,x in sorted(res)]
Edit: unfortunately now I am running into this error on my system: Multiprocessing Queue maxsize limit is 32767, hopefully the workarounds there will help.
编辑:不幸的是,现在我在我的系统上遇到了这个错误:Multiprocessing Queue maxsize limit is 32767,希望那里的解决方法会有所帮助。
回答by xApple
I took klaus se's and aganders3's answer, and made a documented module that is more readable and holds in one file. You can just add it to your project. It even has an optional progress bar !
我接受了 klaus se 和 aganders3 的回答,并制作了一个文档模块,该模块更具可读性并保存在一个文件中。您可以将其添加到您的项目中。它甚至还有一个可选的进度条!
"""
The ``processes`` module provides some convenience functions
for using parallel processes in python.
Adapted from http://stackoverflow.com/a/16071616/287297
Example usage:
print prll_map(lambda i: i * 2, [1, 2, 3, 4, 6, 7, 8], 32, verbose=True)
Comments:
"It spawns a predefined amount of workers and only iterates through the input list
if there exists an idle worker. I also enabled the "daemon" mode for the workers so
that KeyboardInterupt works as expected."
Pitfalls: all the stdouts are sent back to the parent stdout, intertwined.
Alternatively, use this fork of multiprocessing:
https://github.com/uqfoundation/multiprocess
"""
# Modules #
import multiprocessing
from tqdm import tqdm
################################################################################
def apply_function(func_to_apply, queue_in, queue_out):
while not queue_in.empty():
num, obj = queue_in.get()
queue_out.put((num, func_to_apply(obj)))
################################################################################
def prll_map(func_to_apply, items, cpus=None, verbose=False):
# Number of processes to use #
if cpus is None: cpus = min(multiprocessing.cpu_count(), 32)
# Create queues #
q_in = multiprocessing.Queue()
q_out = multiprocessing.Queue()
# Process list #
new_proc = lambda t,a: multiprocessing.Process(target=t, args=a)
processes = [new_proc(apply_function, (func_to_apply, q_in, q_out)) for x in range(cpus)]
# Put all the items (objects) in the queue #
sent = [q_in.put((i, x)) for i, x in enumerate(items)]
# Start them all #
for proc in processes:
proc.daemon = True
proc.start()
# Display progress bar or not #
if verbose:
results = [q_out.get() for x in tqdm(range(len(sent)))]
else:
results = [q_out.get() for x in range(len(sent))]
# Wait for them to finish #
for proc in processes: proc.join()
# Return results #
return [x for i, x in sorted(results)]
################################################################################
def test():
def slow_square(x):
import time
time.sleep(2)
return x**2
objs = range(20)
squares = prll_map(slow_square, objs, 4, verbose=True)
print "Result: %s" % squares
EDIT: Added @alexander-mcfarlane suggestion and a test function
编辑:添加@alexander-mcfarlane 建议和测试功能
回答by CpILL
I'm not sure if this approach has been taken but a work around i'm using is:
我不确定是否采用了这种方法,但我正在使用的解决方法是:
from multiprocessing import Pool
t = None
def run(n):
return t.f(n)
class Test(object):
def __init__(self, number):
self.number = number
def f(self, x):
print x * self.number
def pool(self):
pool = Pool(2)
pool.map(run, range(10))
if __name__ == '__main__':
t = Test(9)
t.pool()
pool = Pool(2)
pool.map(run, range(10))
Output should be:
输出应该是:
0
9
18
27
36
45
54
63
72
81
0
9
18
27
36
45
54
63
72
81

