Python 为什么我可以将实例方法传递给 multiprocessing.Process,而不是 multiprocessing.Pool?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/27318290/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-19 01:39:19  来源:igfitidea点击:

Why can I pass an instance method to multiprocessing.Process, but not a multiprocessing.Pool?

pythonpython-2.7multiprocessingpickle

提问by dpitch40

I am trying to write an application that applies a function concurrently with a multiprocessing.Pool. I would like this function to be an instance method (so I can define it differently in different subclasses). This doesn't seem to be possible; as I have learned elsewhere, apparently bound methods can't be pickled. So why does starting a multiprocessing.Processwith a bound method as a target work? The following code:

我正在尝试编写一个应用程序,该应用程序与multiprocessing.Pool. 我希望这个函数是一个实例方法(所以我可以在不同的子类中对其进行不同的定义)。这似乎不可能;正如我在别处了解到的,显然绑定方法不能被腌制。那么为什么multiprocessing.Process以绑定方法作为目标开始工作呢?以下代码:

import multiprocessing

def test1():
    print "Hello, world 1"

def increment(x):
    return x + 1

class testClass():
    def process(self):
        process1 = multiprocessing.Process(target=test1)
        process1.start()
        process1.join()
        process2 = multiprocessing.Process(target=self.test2)
        process2.start()
        process2.join()

    def pool(self):
        pool = multiprocessing.Pool(1)
        for answer in pool.imap(increment, range(10)):
            print answer
        print
        for answer in pool.imap(self.square, range(10)):
            print answer

    def test2(self):
        print "Hello, world 2"

    def square(self, x):
        return x * x

def main():
    c = testClass()
    c.process()
    c.pool()

if __name__ == "__main__":
    main()

Produces this output:

产生这个输出:

Hello, world 1
Hello, world 2
1
2
3
4
5
6
7
8
9
10

Exception in thread Thread-2:
Traceback (most recent call last):
  File "C:\Python27\Lib\threading.py", line 551, in __bootstrap_inner
    self.run()
  File "C:\Python27\Lib\threading.py", line 504, in run
    self.__target(*self.__args, **self.__kwargs)
  File "C:\Python27\Lib\multiprocessing\pool.py", line 319, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed

Why can Processes handle bound methods, but not Pools?

为什么进程可以处理绑定方法,而不能处理池?

采纳答案by dano

The picklemodule normally can't pickle instance methods:

pickle模块通常不能咸菜实例方法:

>>> import pickle
>>> class A(object):
...  def z(self): print "hi"
... 
>>> a = A()
>>> pickle.dumps(a.z)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps
    Pickler(file, protocol).dump(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 224, in dump
    self.save(obj)
  File "/usr/local/lib/python2.7/pickle.py", line 306, in save
    rv = reduce(self.proto)
  File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex
    raise TypeError, "can't pickle %s objects" % base.__name__
TypeError: can't pickle instancemethod objects

However, the multiprocessingmodule has a custom Picklerthat adds some code to enable this feature:

但是,该multiprocessing模块有一个自定义功能Pickler,可以添加一些代码来启用此功能

#
# Try making some callable types picklable
#

from pickle import Pickler
class ForkingPickler(Pickler):
    dispatch = Pickler.dispatch.copy()

    @classmethod
    def register(cls, type, reduce):
        def dispatcher(self, obj):
            rv = reduce(obj)
            self.save_reduce(obj=obj, *rv)
        cls.dispatch[type] = dispatcher

def _reduce_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)
ForkingPickler.register(type(ForkingPickler.save), _reduce_method)

You can replicate this using the copy_regmodule to see it work for yourself:

您可以使用copy_reg模块复制它以查看它是否适合自己:

>>> import copy_reg
>>> def _reduce_method(m):
...     if m.im_self is None:
...         return getattr, (m.im_class, m.im_func.func_name)
...     else:
...         return getattr, (m.im_self, m.im_func.func_name)
... 
>>> copy_reg.pickle(type(a.z), _reduce_method)
>>> pickle.dumps(a.z)
"c__builtin__\ngetattr\np0\n(ccopy_reg\n_reconstructor\np1\n(c__main__\nA\np2\nc__builtin__\nobject\np3\nNtp4\nRp5\nS'z'\np6\ntp7\nRp8\n."

When you use Process.startto spawn a new process on Windows, it pickles all the parameters you passed to the child process using this custom ForkingPickler:

当您用来Process.start在 Windows 上生成一个新进程时,它会使用此自定义来腌制您传递给子进程的所有参数ForkingPickler

#
# Windows
#

else:
    # snip...
    from pickle import load, HIGHEST_PROTOCOL

    def dump(obj, file, protocol=None):
        ForkingPickler(file, protocol).dump(obj)

    #
    # We define a Popen class similar to the one from subprocess, but
    # whose constructor takes a process object as its argument.
    #

    class Popen(object):
        '''
        Start a subprocess to run the code of a process object
        '''
        _tls = thread._local()

        def __init__(self, process_obj):
            # create pipe for communication with child
            rfd, wfd = os.pipe()

            # get handle for read end of the pipe and make it inheritable
            ...
            # start process
            ...

            # set attributes of self
            ...

            # send information to child
            prep_data = get_preparation_data(process_obj._name)
            to_child = os.fdopen(wfd, 'wb')
            Popen._tls.process_handle = int(hp)
            try:
                dump(prep_data, to_child, HIGHEST_PROTOCOL)
                dump(process_obj, to_child, HIGHEST_PROTOCOL)
            finally:
                del Popen._tls.process_handle
                to_child.close()

Note the "send information to the child" section. It's using the dumpfunction, which uses ForkingPicklerto pickle the data, which means your instance method can be pickled.

请注意“向孩子发送信息”部分。它正在使用用于腌制数据的dump函数,这ForkingPickler意味着您的实例方法可以被腌制。

Now, when you use methods on multiprocessing.Poolto send a method to a child process, it's using a multiprocessing.Pipeto pickle the data. In Python 2.7, multiprocessing.Pipeis implemented in C, and calls pickle_dumpsdirectly, so it doesn't take advantage of the ForkingPickler. That means pickling the instance method doesn't work.

现在,当您使用方法 onmultiprocessing.Pool将方法发送到子进程时,它使用 amultiprocessing.Pipe来腌制数据。在 Python 2.7 中,multiprocessing.Pipe是用 C 实现的,并且pickle_dumps直接调用,所以它没有利用ForkingPickler. 这意味着酸洗实例方法不起作用。

However, if you use copy_regto register the instancemethodtype, rather than a custom Pickler, allattempts at pickling will be affected. So you can use that to enable pickling instance methods, even via Pool:

但是,如果您使用copy_reg注册instancemethod类型,而不是自定义Pickler,则所有酸洗尝试都会受到影响。所以你可以使用它来启用酸洗实例方法,甚至通过Pool

import multiprocessing
import copy_reg
import types

def _reduce_method(m):
    if m.im_self is None:
        return getattr, (m.im_class, m.im_func.func_name)
    else:
        return getattr, (m.im_self, m.im_func.func_name)
copy_reg.pickle(types.MethodType, _reduce_method)

def test1():
    print("Hello, world 1")

def increment(x):
    return x + 1

class testClass():
    def process(self):
        process1 = multiprocessing.Process(target=test1)
        process1.start()
        process1.join()
        process2 = multiprocessing.Process(target=self.test2)
        process2.start()
        process2.join()

    def pool(self):
        pool = multiprocessing.Pool(1)
        for answer in pool.imap(increment, range(10)):
            print(answer)
        print
        for answer in pool.imap(self.square, range(10)):
            print(answer)

    def test2(self):
        print("Hello, world 2")

    def square(self, x):
        return x * x

def main():
    c = testClass()
    c.process()
    c.pool()

if __name__ == "__main__":
    main()

Output:

输出:

Hello, world 1
Hello, world 2
GOT (0, 0, (True, 1))
GOT (0, 1, (True, 2))
GOT (0, 2, (True, 3))
GOT (0, 3, (True, 4))
GOT (0, 4, (True, 5))
 1GOT (0, 5, (True, 6))

GOT (0, 6, (True, 7))
2
GOT (0, 7, (True, 8))
3
 GOT (0, 8, (True, 9))
GOT (0, 9, (True, 10))
4
5
6
7
8
9
10

GOT (1, 0, (True, 0))
0
GOT (1, 1, (True, 1))
1
GOT (1, 2, (True, 4))
4
GOT (1, 3, (True, 9))
9
 GOT (1, 4, (True, 16))
16
GOT (1, 5, (True, 25))
25
 GOT (1, 6, (True, 36))
36
 GOT (1, 7, (True, 49))
49
 GOT (1, 8, (True, 64))
64
GOT (1, 9, (True, 81))
81
GOT None

Also note that in Python 3.x, picklecan pickle instance method types natively, so none of this stuff matters any more. :)

另请注意,在 Python 3.x 中,pickle可以本机腌制实例方法类型,因此这些东西都不再重要。:)

回答by skrrgwasme

Here's an alternative that I use sometimes, and it works in Python2.x:

这是我有时使用的替代方法,它适用于 Python2.x:

You can create a top-level "alias" of sorts to instance methods, that accept an object whose instance methods you want to run in a pool, and have it call the instance methods for you:

您可以为实例方法创建一个顶级“别名”,该别名接受您希望在池中运行其实例方法的对象,并让它为您调用实例方法:

import functools
import multiprocessing

def _instance_method_alias(obj, arg):
    """
    Alias for instance method that allows the method to be called in a 
    multiprocessing pool
    """
    obj.instance_method(arg)
    return

class MyClass(object):
    """
    Our custom class whose instance methods we want to be able to use in a 
    multiprocessing pool
    """

    def __init__(self):
        self.my_string = "From MyClass: {}"

    def instance_method(self, arg):
        """
        Some arbitrary instance method
        """

        print(self.my_string.format(arg))
        return

# create an object of MyClass
obj = MyClass()

# use functools.partial to create a new method that always has the 
# MyClass object passed as its first argument
_bound_instance_method_alias = functools.partial(_instance_method_alias, obj)

# create our list of things we will use the pool to map
l = [1,2,3]

# create the pool of workers
pool = multiprocessing.Pool()

# call pool.map, passing it the newly created function
pool.map(_bound_instance_method_alias, l)

# cleanup
pool.close()
pool.join()

This code produces this output:

此代码产生此输出:

From MyClass: 1
From MyClass: 2
From MyClass: 3

来自我的课堂:1
来自我的课堂:2
来自我的课堂:3

One limitation is that you can't use this for methods that modify the object. Each process gets a copy of the object it is calling the methods on, so changes won't be propagated back to the main process. If you don't need to modify the object from the methods you're calling though, this can be a simple solution.

一个限制是您不能将它用于修改对象的方法。每个进程都会获取它正在调用方法的对象的副本,因此更改不会传播回主进程。如果您不需要从您正在调用的方法中修改对象,这可能是一个简单的解决方案。

回答by WeizhongTu

Here is a easier way work in Python 2, just wrap the original instance method. Works well on MacOSX and Linux, not work on Windows, tested Python 2.7

这是在 Python 2 中更简单的工作方式,只需包装原始实例方法。适用于 MacOSX 和 Linux,不适用于 Windows,已测试 Python 2.7

from multiprocessing import Pool

class Person(object):
    def __init__(self):
        self.name = 'Weizhong Tu'

    def calc(self, x):
        print self.name
        return x ** 5


def func(x, p=Person()):
    return p.calc(x)


pool = Pool()
print pool.map(func, range(10))