Python 如何跟踪从多处理池返回的异步结果
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/20577472/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
how to keep track of asynchronous results returned from a multiprocessing pool
提问by d3pd
I am trying to add multiprocessingto some code which features functions that I can not modify. I want to submit these functions as jobs to a multiprocessing pool asynchronously. I am doing something much like the code shown here. However, I am not sure how to keep track of results. How can I know to which applied function a returned result corresponds?
我正在尝试将多处理添加到一些具有我无法修改的功能的代码中。我想将这些函数作为作业异步提交到多处理池。我正在做的事情很像这里显示的代码。但是,我不确定如何跟踪结果。如何知道返回的结果对应于哪个应用函数?
The important points to emphasise are that I can not modify the existing functions (other things rely on them remaining as they are) and that results can be returned in an order different to the order in which the function jobs are applied to the pool.
需要强调的重点是,我不能修改现有的函数(其他东西依赖于它们保持原样),并且结果可以按照与函数作业应用于池的顺序不同的顺序返回。
Thanks for any thoughts on this!
感谢您对此的任何想法!
EDIT: Some attempt code is below:
编辑:一些尝试代码如下:
import multiprocessing
from multiprocessing import Pool
import os
import signal
import time
import inspect
def multiply(multiplicand1=0, multiplicand2=0):
return multiplicand1*multiplicand2
def workFunctionTest(**kwargs):
time.sleep(3)
return kwargs
def printHR(object):
"""
This function prints a specified object in a human readable way.
"""
# dictionary
if isinstance(object, dict):
for key, value in sorted(object.items()):
print u'{a1}: {a2}'.format(a1=key, a2=value)
# list or tuple
elif isinstance(object, list) or isinstance(object, tuple):
for element in object:
print element
# other
else:
print object
class Job(object):
def __init__(
self,
workFunction=workFunctionTest,
workFunctionKeywordArguments={'testString': "hello world"},
workFunctionTimeout=1,
naturalLanguageString=None,
classInstance=None,
resultGetter=None,
result=None
):
self.workFunction=workFunction
self.workFunctionKeywordArguments=workFunctionKeywordArguments
self.workFunctionTimeout=workFunctionTimeout
self.naturalLanguageString=naturalLanguageString
self.classInstance=self.__class__.__name__
self.resultGetter=resultGetter
self.result=result
def description(self):
descriptionString=""
for key, value in sorted(vars(self).items()):
descriptionString+=str("{a1}:{a2} ".format(a1=key, a2=value))
return descriptionString
def printout(self):
"""
This method prints a dictionary of all data attributes.
"""
printHR(vars(self))
class JobGroup(object):
"""
This class acts as a container for jobs. The data attribute jobs is a list of job objects.
"""
def __init__(
self,
jobs=None,
naturalLanguageString="null",
classInstance=None,
result=None
):
self.jobs=jobs
self.naturalLanguageString=naturalLanguageString
self.classInstance=self.__class__.__name__
self.result=result
def description(self):
descriptionString=""
for key, value in sorted(vars(self).items()):
descriptionString+=str("{a1}:{a2} ".format(a1=key, a2=value))
return descriptionString
def printout(self):
"""
This method prints a dictionary of all data attributes.
"""
printHR(vars(self))
def initialise_processes():
signal.signal(signal.SIGINT, signal.SIG_IGN)
def execute(
jobObject=None,
numberOfProcesses=multiprocessing.cpu_count()
):
# Determine the current function name.
functionName=str(inspect.stack()[0][3])
def collateResults(result):
"""
This is a process pool callback function which collates a list of results returned.
"""
# Determine the caller function name.
functionName=str(inspect.stack()[1][3])
print("{a1}: result: {a2}".format(a1=functionName, a2=result))
results.append(result)
def getResults(job):
# Determine the current function name.
functionName=str(inspect.stack()[0][3])
while True:
try:
result=job.resultGetter.get(job.workFunctionTimeout)
break
except multiprocessing.TimeoutError:
print("{a1}: subprocess timeout for job".format(a1=functionName, a2=job.description()))
#job.result=result
return result
# Create a process pool.
pool1 = multiprocessing.Pool(numberOfProcesses, initialise_processes)
print("{a1}: pool {a2} of {a3} processes created".format(a1=functionName, a2=str(pool1), a3=str(numberOfProcesses)))
# Unpack the input job object and submit it to the process pool.
print("{a1}: unpacking and applying job object {a2} to pool...".format(a1=functionName, a2=jobObject))
if isinstance(jobObject, Job):
# If the input job object is a job, apply it to the pool with its associated timeout specification.
# Return a list of results.
job=jobObject
print("{a1}: job submitted to pool: {a2}".format(a1=functionName, a2=job.description()))
# Apply the job to the pool, saving the object pool.ApplyResult to the job object.
job.resultGetter=pool1.apply_async(
func=job.workFunction,
kwds=job.workFunctionKeywordArguments
)
# Get results.
# Acquire the job result with respect to the specified job timeout and apply this result to the job data attribute result.
print("{a1}: getting results for job...".format(a1=functionName))
job.result=getResults(job)
print("{a1}: job completed: {a2}".format(a1=functionName, a2=job.description()))
print("{a1}: job result: {a2}".format(a1=functionName, a2=job.result))
# Return the job result from execute.
return job.result
pool1.terminate()
pool1.join()
elif isinstance(jobObject, JobGroup):
# If the input job object is a job group, cycle through each job and apply it to the pool with its associated timeout specification.
for job in jobObject.jobs:
print("{a1}: job submitted to pool: {a2}".format(a1=functionName, a2=job.description()))
# Apply the job to the pool, saving the object pool.ApplyResult to the job object.
job.resultGetter=pool1.apply_async(
func=job.workFunction,
kwds=job.workFunctionKeywordArguments
)
# Get results.
# Cycle through each job and and append the result for the job to a list of results.
results=[]
for job in jobObject.jobs:
# Acquire the job result with respect to the specified job timeout and apply this result to the job data attribute result.
print("{a1}: getting results for job...".format(a1=functionName))
job.result=getResults(job)
print("{a1}: job completed: {a2}".format(a1=functionName, a2=job.description()))
#print("{a1}: job result: {a2}".format(a1=functionName, a2=job.result))
# Collate the results.
results.append(job.result)
# Apply the list of results to the job group data attribute results.
jobObject.results=results
print("{a1}: job group results: {a2}".format(a1=functionName, a2=jobObject.results))
# Return the job result list from execute.
return jobObject.results
pool1.terminate()
pool1.join()
else:
# invalid input object
print("{a1}: invalid job object {a2}".format(a1=functionName, a2=jobObject))
def main():
print('-'*80)
print("MULTIPROCESSING SYSTEM DEMONSTRATION\n")
# Create a job.
print("# creating a job...\n")
job1=Job(
workFunction=workFunctionTest,
workFunctionKeywordArguments={'testString': "hello world"},
workFunctionTimeout=4
)
print("- printout of new job object:")
job1.printout()
print("\n- printout of new job object in logging format:")
print job1.description()
# Create another job.
print("\n# creating another job...\n")
job2=Job(
workFunction=multiply,
workFunctionKeywordArguments={'multiplicand1': 2, 'multiplicand2': 3},
workFunctionTimeout=6
)
print("- printout of new job object:")
job2.printout()
print("\n- printout of new job object in logging format:")
print job2.description()
# Create a JobGroup object.
print("\n# creating a job group (of jobs 1 and 2)...\n")
jobGroup1=JobGroup(
jobs=[job1, job2],
)
print("- printout of new job group object:")
jobGroup1.printout()
print("\n- printout of new job group object in logging format:")
print jobGroup1.description()
# Submit the job group.
print("\nready to submit job group")
response=raw_input("\nPress Enter to continue...\n")
execute(jobGroup1)
response=raw_input("\nNote the results printed above. Press Enter to continue the demonstration.\n")
# Demonstrate timeout.
print("\n # creating a new job in order to demonstrate timeout functionality...\n")
job3=Job(
workFunction=workFunctionTest,
workFunctionKeywordArguments={'testString': "hello world"},
workFunctionTimeout=1
)
print("- printout of new job object:")
job3.printout()
print("\n- printout of new job object in logging format:")
print job3.description()
print("\nNote the timeout specification of only 1 second.")
# Submit the job.
print("\nready to submit job")
response=raw_input("\nPress Enter to continue...\n")
execute(job3)
response=raw_input("\nNote the recognition of timeouts printed above. This concludes the demonstration.")
print('-'*80)
if __name__ == '__main__':
main()
EDIT: This question has been placed [on hold] for the following stated reason:
编辑:由于以下所述原因,此问题已被搁置:
"Questions asking for code must demonstrate a minimal understanding of the problem being solved. Include attempted solutions, why they didn't work, and the expected results. See also: Stack Overflow question checklist"
“要求代码的问题必须证明对正在解决的问题有最低限度的理解。包括尝试的解决方案,为什么它们不起作用,以及预期的结果。另见:堆栈溢出问题清单“
This question is not requesting code; it is requesting thoughts, general guidance. A minimal understanding of the problem under consideration is demonstrated (note the correct use of the terms "multiprocessing", "pool" and "asynchronously" and note the reference to prior code). Regarding attempted solutions, I acknowledge that attempted efforts at solutions would have been beneficial. I have added such code now. I hope that I have addressed the concerns raised that lead to the [on hold] status.
这个问题不是要求代码;它要求思想,一般指导。展示了对所考虑问题的最低限度的理解(注意术语“多处理”、“池”和“异步”的正确使用,并注意对先前代码的引用)。关于尝试的解决方案,我承认尝试解决方案的努力是有益的。我现在已经添加了这样的代码。我希望我已经解决了导致 [搁置] 状态的问题。
采纳答案by abarnert
Without seeing actual code, I can only answer in generalities. But there are two general solutions.
在没有看到实际代码的情况下,我只能笼统地回答。但是有两种通用的解决方案。
First, instead of using a callbackand ignoring the AsyncResults, store them in some kind of collection. Then you can just use that collection. For example, if you want to be able to look up the results for a function using that function as a key, just create a dictkeyed with the functions:
首先,不要使用 acallback并忽略AsyncResults ,而是将它们存储在某种集合中。然后你就可以使用那个集合了。例如,如果您希望能够使用该函数作为键来查找函数的结果,只需创建一个dict键控函数:
def in_parallel(funcs):
results = {}
pool = mp.Pool()
for func in funcs:
results[func] = pool.apply_async(func)
pool.close()
pool.join()
return {func: result.get() for func, result in results.items()}
Alternatively, you can change the callback function to store the results in your collection by key. For example:
或者,您可以更改回调函数以按键将结果存储在您的集合中。例如:
def in_parallel(funcs):
results = {}
pool = mp.Pool()
for func in funcs:
def callback(result, func=func):
results[func] = result
pool.apply_async(func, callback=callback)
pool.close()
pool.join()
return results
I'm using the function itself as a key. But you want to use the index instead, that's just as easy. Any value you have, you can use as a key.
我使用函数本身作为键。但是您想改用索引,这同样简单。您拥有的任何值都可以用作键。
Meanwhile, the example you linked is really just calling the same function on a bunch of arguments, waiting for all of them to finish, and leaving the results in some iterable in arbitrary order. That's exactly what imap_unordereddoes, but a lot more simply. You could replace the whole complicated thing from the linked code with this:
同时,您链接的示例实际上只是在一堆参数上调用相同的函数,等待所有参数完成,然后以任意顺序将结果保留为可迭代的。这正是imap_unordered它的作用,但要简单得多。你可以用这个替换链接代码中的整个复杂的东西:
pool = mp.Pool()
results = list(pool.imap_unordered(foo_pool, range(10)))
pool.close()
pool.join()
And then, if you want the results in their original order instead of in arbitrary order, you can just switch to imapor mapinstead. So:
然后,如果您希望结果按原始顺序而不是任意顺序,您可以切换到imap或map。所以:
pool = mp.Pool()
results = pool.map(foo_pool, range(10))
pool.close()
pool.join()
If you need something similar but too complicated to fit into the mapparadigm, concurrent.futureswill probably make your life easier than multiprocessing. If you're on Python 2.x, you will have to install the backport. But then you can do things that are much harder to do with AsyncResults or callbacks (or map), like composing a whole bunch of futures into one big future. See the examples in the linked docs.
如果您需要类似但太复杂而无法适应map范式的东西,concurrent.futures可能会使您的生活比multiprocessing. 如果您使用的是 Python 2.x,则必须安装backport。但是,您可以使用AsyncResults 或callbacks(或map)来做更难做的事情,例如将一大堆期货组合成一个大未来。请参阅链接文档中的示例。
One last note:
最后一点:
The important points to emphasise are that I can not modify the existing functions…
需要强调的重点是我不能修改现有的功能......
If you can't modify a function, you can always wrap it. For example, let's say I have a function that returns the square of a number, but I'm trying to build a dict mapping numbers to their squares asynchronously, so I need to have the original number as part of the result as well. That's easy:
如果你不能修改一个函数,你总是可以包装它。例如,假设我有一个返回数字平方的函数,但我正在尝试构建一个 dict 将数字异步映射到它们的平方,因此我还需要将原始数字作为结果的一部分。这很容易:
def number_and_square(x):
return x, square(x)
And now, I can just apply_async(number_and_square)instead of just square, and get the results I want.
而现在,我可以apply_async(number_and_square)不只是square,并获得我想要的结果。
I didn't do that in the examples above because in the first case I stored the key into the collection from the calling side, and in the second place I bound it into the callback function. But binding it into a wrapper around the function is just as easy as either of these, and can be appropriate when neither of these is.
我在上面的例子中没有这样做,因为在第一种情况下,我从调用方将键存储到集合中,然后我将它绑定到回调函数中。但是将它绑定到函数的包装器中与其中任何一个一样简单,并且当这两个都不合适时可能是合适的。

