带有map_async的python多处理池
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/16542261/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
python multiprocessing Pool with map_async
提问by dseira
I trying to use the multiprocessing package in python with a Pool.
我试图在带有池的 python 中使用多处理包。
I have the function f which is called by the map_async function:
我有由 map_async 函数调用的函数 f:
from multiprocessing import Pool
def f(host, x):
print host
print x
hosts = ['1.1.1.1', '2.2.2.2']
pool = Pool(processes=5)
pool.map_async(f,hosts,"test")
pool.close()
pool.join()
This code has the next error:
此代码有下一个错误:
Traceback (most recent call last):
File "pool-test.py", line 9, in <module>
pool.map_async(f,hosts,"test")
File "/usr/lib/python2.7/multiprocessing/pool.py", line 290, in map_async
result = MapResult(self._cache, chunksize, len(iterable), callback)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 557, in __init__
self._number_left = length//chunksize + bool(length % chunksize)
TypeError: unsupported operand type(s) for //: 'int' and 'str'
I don't know how to pass more than 1 argument to the f function. Are there any way?
我不知道如何将 1 个以上的参数传递给 f 函数。有什么办法吗?
回答by F.X.
"test"is interpreted as map_async's chunksizekeyword argument (see the docs).
"test"被解释为map_async的chunksize关键字参数(参见文档)。
Your code should probably be (here copy-pasted from my IPython session) :
您的代码可能应该是(这里是从我的 IPython 会话中复制粘贴的):
from multiprocessing import Pool
def f(arg):
host, x = arg
print host
print x
hosts = ['1.1.1.1', '2.2.2.2']
args = ((host, "test") for host in hosts)
pool = Pool(processes=5)
pool.map_async(f, args)
pool.close()
pool.join()
## -- End pasted text --
1.1.1.1
test
2.2.2.2
test
Note: In Python 3 you can use starmap, which will unpack the arguments from the tuples. You'll be able to avoid doing host, x = argexplicitely.
注意:在 Python 3 中,您可以使用starmap,它将从元组中解压参数。你将能够避免host, x = arg明确地做。
回答by yoder
as I recall, the Pool().map() and .map_async() specifically accept only a single argument. this limitation can be worked around by passing a list, but of course then you need a customized function designed to take a list(like) object as an argument.
我记得, Pool().map() 和 .map_async() 专门只接受一个参数。这个限制可以通过传递一个列表来解决,但是当然你需要一个定制的函数来将一个列表(类似)对象作为参数。
one approach is to write the custom code once -- aka, a general "function + args" wrapper. i worked up something like this (note: this is only partially tested):
一种方法是编写一次自定义代码——也就是通用的“函数 + args”包装器。我做了这样的事情(注意:这只是部分测试):
def tmp_test():
# a short test script:
#
A=[[1,2], [2,3], [4,5], [6,7]]
P=mpp.Pool(mpp.cpu_count())
X=P.map_async(map_helper, [[operator.eq]+a for a in A])
#
return X.get()
def null_funct(args=[], kwargs={}):
# a place-holder
pass
#
def map_helper(args_in = [null_funct, [], {}]):
# helper function for pool.map_async(). pass data as a list(-like object):
# [function, [args], {kwargs}] (though we'll allow for some mistakes).
#
funct = args_in[0]
#
# allow for different formatting options:
if not (isinstance(args_in[1], list) or isinstance(args_in[1], tuple) or isinstance(args_in[1], dict)):
# probably passed a list of parameters. just use them:
args = args_in[1:]
#
return funct(*args)
#
# if the args are "properly" formatted:
args=[]
kwargs = {}
for arg in args_in[1:]:
# assign list types to args, dict types to kwargs...
if isinstance(arg, list) or isinstance(arg, tuple): args += arg
if isinstance(arg, dict): kwargs.update(arg)
return funct(*args, **kwargs)
回答by Russel Winder
Pool returns a context manager in Python 3 and so a with statement can be used. This avoids problems with exceptions and means no necessity to close and join. In this case the function is always receiving a constant for the x variable and so this can be handled with a partial evaluation. map_async is lazy and so we need to get the result for the actions to happen, might as well just use map. Thus:
Pool 在 Python 3 中返回一个上下文管理器,因此可以使用 with 语句。这避免了异常问题,也意味着不需要关闭和加入。在这种情况下,函数总是接收 x 变量的常量,因此可以通过部分评估来处理。map_async 是懒惰的,所以我们需要得到动作发生的结果,不妨使用 map。因此:
from multiprocessing import Pool
from functools import partial
def f(host, x):
print(host)
print(x)
hosts = ('1.1.1.1', '2.2.2.2')
with Pool(processes=5) as pool:
pool.map(partial(f, x='test'), hosts)
results in:
结果是:
1.1.1.1 test 2.2.2.2 test

