Python 为什么我可以将实例方法传递给 multiprocessing.Process,而不是 multiprocessing.Pool?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/27318290/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Why can I pass an instance method to multiprocessing.Process, but not a multiprocessing.Pool?
提问by dpitch40
I am trying to write an application that applies a function concurrently with a multiprocessing.Pool
. I would like this function to be an instance method (so I can define it differently in different subclasses). This doesn't seem to be possible; as I have learned elsewhere, apparently bound methods can't be pickled. So why does starting a multiprocessing.Process
with a bound method as a target work? The following code:
我正在尝试编写一个应用程序,该应用程序与multiprocessing.Pool
. 我希望这个函数是一个实例方法(所以我可以在不同的子类中对其进行不同的定义)。这似乎不可能;正如我在别处了解到的,显然绑定方法不能被腌制。那么为什么multiprocessing.Process
以绑定方法作为目标开始工作呢?以下代码:
import multiprocessing
def test1():
print "Hello, world 1"
def increment(x):
return x + 1
class testClass():
def process(self):
process1 = multiprocessing.Process(target=test1)
process1.start()
process1.join()
process2 = multiprocessing.Process(target=self.test2)
process2.start()
process2.join()
def pool(self):
pool = multiprocessing.Pool(1)
for answer in pool.imap(increment, range(10)):
print answer
print
for answer in pool.imap(self.square, range(10)):
print answer
def test2(self):
print "Hello, world 2"
def square(self, x):
return x * x
def main():
c = testClass()
c.process()
c.pool()
if __name__ == "__main__":
main()
Produces this output:
产生这个输出:
Hello, world 1
Hello, world 2
1
2
3
4
5
6
7
8
9
10
Exception in thread Thread-2:
Traceback (most recent call last):
File "C:\Python27\Lib\threading.py", line 551, in __bootstrap_inner
self.run()
File "C:\Python27\Lib\threading.py", line 504, in run
self.__target(*self.__args, **self.__kwargs)
File "C:\Python27\Lib\multiprocessing\pool.py", line 319, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed
Why can Processes handle bound methods, but not Pools?
为什么进程可以处理绑定方法,而不能处理池?
采纳答案by dano
The pickle
module normally can't pickle instance methods:
该pickle
模块通常不能咸菜实例方法:
>>> import pickle
>>> class A(object):
... def z(self): print "hi"
...
>>> a = A()
>>> pickle.dumps(a.z)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps
Pickler(file, protocol).dump(obj)
File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/local/lib/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle instancemethod objects
However, the multiprocessing
module has a custom Pickler
that adds some code to enable this feature:
但是,该multiprocessing
模块有一个自定义功能Pickler
,可以添加一些代码来启用此功能:
#
# Try making some callable types picklable
#
from pickle import Pickler
class ForkingPickler(Pickler):
dispatch = Pickler.dispatch.copy()
@classmethod
def register(cls, type, reduce):
def dispatcher(self, obj):
rv = reduce(obj)
self.save_reduce(obj=obj, *rv)
cls.dispatch[type] = dispatcher
def _reduce_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
ForkingPickler.register(type(ForkingPickler.save), _reduce_method)
You can replicate this using the copy_reg
module to see it work for yourself:
您可以使用copy_reg
模块复制它以查看它是否适合自己:
>>> import copy_reg
>>> def _reduce_method(m):
... if m.im_self is None:
... return getattr, (m.im_class, m.im_func.func_name)
... else:
... return getattr, (m.im_self, m.im_func.func_name)
...
>>> copy_reg.pickle(type(a.z), _reduce_method)
>>> pickle.dumps(a.z)
"c__builtin__\ngetattr\np0\n(ccopy_reg\n_reconstructor\np1\n(c__main__\nA\np2\nc__builtin__\nobject\np3\nNtp4\nRp5\nS'z'\np6\ntp7\nRp8\n."
When you use Process.start
to spawn a new process on Windows, it pickles all the parameters you passed to the child process using this custom ForkingPickler
:
当您用来Process.start
在 Windows 上生成一个新进程时,它会使用此自定义来腌制您传递给子进程的所有参数ForkingPickler
:
#
# Windows
#
else:
# snip...
from pickle import load, HIGHEST_PROTOCOL
def dump(obj, file, protocol=None):
ForkingPickler(file, protocol).dump(obj)
#
# We define a Popen class similar to the one from subprocess, but
# whose constructor takes a process object as its argument.
#
class Popen(object):
'''
Start a subprocess to run the code of a process object
'''
_tls = thread._local()
def __init__(self, process_obj):
# create pipe for communication with child
rfd, wfd = os.pipe()
# get handle for read end of the pipe and make it inheritable
...
# start process
...
# set attributes of self
...
# send information to child
prep_data = get_preparation_data(process_obj._name)
to_child = os.fdopen(wfd, 'wb')
Popen._tls.process_handle = int(hp)
try:
dump(prep_data, to_child, HIGHEST_PROTOCOL)
dump(process_obj, to_child, HIGHEST_PROTOCOL)
finally:
del Popen._tls.process_handle
to_child.close()
Note the "send information to the child" section. It's using the dump
function, which uses ForkingPickler
to pickle the data, which means your instance method can be pickled.
请注意“向孩子发送信息”部分。它正在使用用于腌制数据的dump
函数,这ForkingPickler
意味着您的实例方法可以被腌制。
Now, when you use methods on multiprocessing.Pool
to send a method to a child process, it's using a multiprocessing.Pipe
to pickle the data. In Python 2.7, multiprocessing.Pipe
is implemented in C, and calls pickle_dumps
directly, so it doesn't take advantage of the ForkingPickler
. That means pickling the instance method doesn't work.
现在,当您使用方法 onmultiprocessing.Pool
将方法发送到子进程时,它使用 amultiprocessing.Pipe
来腌制数据。在 Python 2.7 中,multiprocessing.Pipe
是用 C 实现的,并且pickle_dumps
直接调用,所以它没有利用ForkingPickler
. 这意味着酸洗实例方法不起作用。
However, if you use copy_reg
to register the instancemethod
type, rather than a custom Pickler
, allattempts at pickling will be affected. So you can use that to enable pickling instance methods, even via Pool
:
但是,如果您使用copy_reg
注册instancemethod
类型,而不是自定义Pickler
,则所有酸洗尝试都会受到影响。所以你可以使用它来启用酸洗实例方法,甚至通过Pool
:
import multiprocessing
import copy_reg
import types
def _reduce_method(m):
if m.im_self is None:
return getattr, (m.im_class, m.im_func.func_name)
else:
return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _reduce_method)
def test1():
print("Hello, world 1")
def increment(x):
return x + 1
class testClass():
def process(self):
process1 = multiprocessing.Process(target=test1)
process1.start()
process1.join()
process2 = multiprocessing.Process(target=self.test2)
process2.start()
process2.join()
def pool(self):
pool = multiprocessing.Pool(1)
for answer in pool.imap(increment, range(10)):
print(answer)
print
for answer in pool.imap(self.square, range(10)):
print(answer)
def test2(self):
print("Hello, world 2")
def square(self, x):
return x * x
def main():
c = testClass()
c.process()
c.pool()
if __name__ == "__main__":
main()
Output:
输出:
Hello, world 1
Hello, world 2
GOT (0, 0, (True, 1))
GOT (0, 1, (True, 2))
GOT (0, 2, (True, 3))
GOT (0, 3, (True, 4))
GOT (0, 4, (True, 5))
1GOT (0, 5, (True, 6))
GOT (0, 6, (True, 7))
2
GOT (0, 7, (True, 8))
3
GOT (0, 8, (True, 9))
GOT (0, 9, (True, 10))
4
5
6
7
8
9
10
GOT (1, 0, (True, 0))
0
GOT (1, 1, (True, 1))
1
GOT (1, 2, (True, 4))
4
GOT (1, 3, (True, 9))
9
GOT (1, 4, (True, 16))
16
GOT (1, 5, (True, 25))
25
GOT (1, 6, (True, 36))
36
GOT (1, 7, (True, 49))
49
GOT (1, 8, (True, 64))
64
GOT (1, 9, (True, 81))
81
GOT None
Also note that in Python 3.x, pickle
can pickle instance method types natively, so none of this stuff matters any more. :)
另请注意,在 Python 3.x 中,pickle
可以本机腌制实例方法类型,因此这些东西都不再重要。:)
回答by skrrgwasme
Here's an alternative that I use sometimes, and it works in Python2.x:
这是我有时使用的替代方法,它适用于 Python2.x:
You can create a top-level "alias" of sorts to instance methods, that accept an object whose instance methods you want to run in a pool, and have it call the instance methods for you:
您可以为实例方法创建一个顶级“别名”,该别名接受您希望在池中运行其实例方法的对象,并让它为您调用实例方法:
import functools
import multiprocessing
def _instance_method_alias(obj, arg):
"""
Alias for instance method that allows the method to be called in a
multiprocessing pool
"""
obj.instance_method(arg)
return
class MyClass(object):
"""
Our custom class whose instance methods we want to be able to use in a
multiprocessing pool
"""
def __init__(self):
self.my_string = "From MyClass: {}"
def instance_method(self, arg):
"""
Some arbitrary instance method
"""
print(self.my_string.format(arg))
return
# create an object of MyClass
obj = MyClass()
# use functools.partial to create a new method that always has the
# MyClass object passed as its first argument
_bound_instance_method_alias = functools.partial(_instance_method_alias, obj)
# create our list of things we will use the pool to map
l = [1,2,3]
# create the pool of workers
pool = multiprocessing.Pool()
# call pool.map, passing it the newly created function
pool.map(_bound_instance_method_alias, l)
# cleanup
pool.close()
pool.join()
This code produces this output:
此代码产生此输出:
From MyClass: 1
From MyClass: 2
From MyClass: 3
来自我的课堂:1
来自我的课堂:2
来自我的课堂:3
One limitation is that you can't use this for methods that modify the object. Each process gets a copy of the object it is calling the methods on, so changes won't be propagated back to the main process. If you don't need to modify the object from the methods you're calling though, this can be a simple solution.
一个限制是您不能将它用于修改对象的方法。每个进程都会获取它正在调用方法的对象的副本,因此更改不会传播回主进程。如果您不需要从您正在调用的方法中修改对象,这可能是一个简单的解决方案。
回答by WeizhongTu
Here is a easier way work in Python 2, just wrap the original instance method. Works well on MacOSX and Linux, not work on Windows, tested Python 2.7
这是在 Python 2 中更简单的工作方式,只需包装原始实例方法。适用于 MacOSX 和 Linux,不适用于 Windows,已测试 Python 2.7
from multiprocessing import Pool
class Person(object):
def __init__(self):
self.name = 'Weizhong Tu'
def calc(self, x):
print self.name
return x ** 5
def func(x, p=Person()):
return p.calc(x)
pool = Pool()
print pool.map(func, range(10))