带有map_async的python多处理池

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/16542261/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-18 22:57:24  来源:igfitidea点击:

python multiprocessing Pool with map_async

pythonmultiprocessing

提问by dseira

I trying to use the multiprocessing package in python with a Pool.

我试图在带有池的 python 中使用多处理包。

I have the function f which is called by the map_async function:

我有由 map_async 函数调用的函数 f:

from multiprocessing import Pool

def f(host, x):
    print host
    print x

hosts = ['1.1.1.1', '2.2.2.2']
pool = Pool(processes=5)
pool.map_async(f,hosts,"test")
pool.close()
pool.join()

This code has the next error:

此代码有下一个错误:

Traceback (most recent call last):
  File "pool-test.py", line 9, in <module>
    pool.map_async(f,hosts,"test")
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 290, in map_async
    result = MapResult(self._cache, chunksize, len(iterable), callback)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 557, in __init__
    self._number_left = length//chunksize + bool(length % chunksize)
TypeError: unsupported operand type(s) for //: 'int' and 'str'

I don't know how to pass more than 1 argument to the f function. Are there any way?

我不知道如何将 1 个以上的参数传递给 f 函数。有什么办法吗?

回答by F.X.

"test"is interpreted as map_async's chunksizekeyword argument (see the docs).

"test"被解释为map_asyncchunksize关键字参数(参见文档)。

Your code should probably be (here copy-pasted from my IPython session) :

您的代码可能应该是(这里是从我的 IPython 会话中复制粘贴的):

from multiprocessing import Pool

def f(arg):
    host, x = arg
    print host
    print x

hosts = ['1.1.1.1', '2.2.2.2']
args = ((host, "test") for host in hosts)
pool = Pool(processes=5)
pool.map_async(f, args)
pool.close()
pool.join()
## -- End pasted text --

1.1.1.1
test
2.2.2.2
test

Note: In Python 3 you can use starmap, which will unpack the arguments from the tuples. You'll be able to avoid doing host, x = argexplicitely.

注意:在 Python 3 中,您可以使用starmap,它将从元组中解压参数。你将能够避免host, x = arg明确地做。

回答by yoder

as I recall, the Pool().map() and .map_async() specifically accept only a single argument. this limitation can be worked around by passing a list, but of course then you need a customized function designed to take a list(like) object as an argument.

我记得, Pool().map() 和 .map_async() 专门只接受一个参数。这个限制可以通过传递一个列表来解决,但是当然你需要一个定制的函数来将一个列表(类似)对象作为参数。

one approach is to write the custom code once -- aka, a general "function + args" wrapper. i worked up something like this (note: this is only partially tested):

一种方法是编写一次自定义代码——也就是通用的“函数 + args”包装器。我做了这样的事情(注意:这只是部分测试):

def tmp_test():
    # a short test script:
    #
    A=[[1,2], [2,3], [4,5], [6,7]]
    P=mpp.Pool(mpp.cpu_count())
    X=P.map_async(map_helper, [[operator.eq]+a for a in A])
    #
    return X.get()


def null_funct(args=[], kwargs={}):
    # a place-holder 
    pass
#
def map_helper(args_in = [null_funct, [], {}]):
    # helper function for pool.map_async(). pass data as a list(-like object):
    # [function, [args], {kwargs}] (though we'll allow for some mistakes).
    #
    funct = args_in[0]
    #
    # allow for different formatting options:
    if not (isinstance(args_in[1], list) or isinstance(args_in[1], tuple) or isinstance(args_in[1], dict)):
        # probably passed a list of parameters. just use them:
        args = args_in[1:]
        #
        return funct(*args)
    #
    # if the args are "properly" formatted:
    args=[]
    kwargs = {}
    for arg in args_in[1:]:
        # assign list types to args, dict types to kwargs...
        if isinstance(arg, list) or isinstance(arg, tuple): args += arg
        if isinstance(arg, dict): kwargs.update(arg)
    return funct(*args, **kwargs)

回答by Russel Winder

Pool returns a context manager in Python 3 and so a with statement can be used. This avoids problems with exceptions and means no necessity to close and join. In this case the function is always receiving a constant for the x variable and so this can be handled with a partial evaluation. map_async is lazy and so we need to get the result for the actions to happen, might as well just use map. Thus:

Pool 在 Python 3 中返回一个上下文管理器,因此可以使用 with 语句。这避免了异常问题,也意味着不需要关闭和加入。在这种情况下,函数总是接收 x 变量的常量,因此可以通过部分评估来处理。map_async 是懒惰的,所以我们需要得到动作发生的结果,不妨使用 map。因此:

from multiprocessing import Pool
from functools import partial

def f(host, x):
    print(host)
    print(x)

hosts = ('1.1.1.1', '2.2.2.2')
with Pool(processes=5) as pool:
    pool.map(partial(f, x='test'), hosts)

results in:

结果是:

1.1.1.1
test
2.2.2.2
test