Python 使用多处理队列、池和锁定的死简单示例
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/20887555/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Dead simple example of using Multiprocessing Queue, Pool and Locking
提问by thclpr
I tried to read the documentation at http://docs.python.org/dev/library/multiprocessing.htmlbut I'm still struggling with multiprocessing Queue, Pool and Locking. And for now I was able to build the example below.
我试图阅读http://docs.python.org/dev/library/multiprocessing.html 上的文档,但我仍在努力处理多处理队列、池和锁定。现在我能够构建下面的示例。
Regarding Queue and Pool, I'm not sure if I understood the concept in the right way, so correct me if I'm wrong. What I'm trying to achieve is to process 2 requests at time ( data list have 8 in this example ) so, what should I use? Pool to create 2 processes that can handle two different queues ( 2 at max ) or should I just use Queue to process 2 inputs each time? The lock would be to print the outputs correctly.
关于队列和池,我不确定我是否以正确的方式理解了这个概念,所以如果我错了,请纠正我。我想要实现的是一次处理 2 个请求(在这个例子中数据列表有 8 个)那么,我应该使用什么?池创建 2 个可以处理两个不同队列(最多 2 个)的进程,还是我应该每次只使用 Queue 来处理 2 个输入?锁定将是正确打印输出。
import multiprocessing
import time
data = (['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)
def mp_handler(var1):
for indata in var1:
p = multiprocessing.Process(target=mp_worker, args=(indata[0], indata[1]))
p.start()
def mp_worker(inputs, the_time):
print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs
if __name__ == '__main__':
mp_handler(data)
采纳答案by Velimir Mlaker
The best solution for your problem is to utilize a Pool. Using Queues and having a separate "queue feeding" functionality is probably overkill.
您的问题的最佳解决方案是使用Pool. 使用Queues 并具有单独的“队列馈送”功能可能有点矫枉过正。
Here's a slightly rearranged version of your program, this time with only 2 processescoralled in a Pool. I believe it's the easiest way to go, with minimal changes to original code:
这是你的程序的一个稍微重新排列的版本,这次只有 2 个进程在Pool. 我相信这是最简单的方法,对原始代码的更改最少:
import multiprocessing
import time
data = (
['a', '2'], ['b', '4'], ['c', '6'], ['d', '8'],
['e', '1'], ['f', '3'], ['g', '5'], ['h', '7']
)
def mp_worker((inputs, the_time)):
print " Processs %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs
def mp_handler():
p = multiprocessing.Pool(2)
p.map(mp_worker, data)
if __name__ == '__main__':
mp_handler()
Note that mp_worker()function now accepts a single argument (a tuple of the two previous arguments) because the map()function chunks up your input data into sublists, each sublist given as a single argument to your worker function.
请注意,mp_worker()函数现在接受单个参数(前两个参数的元组),因为该map()函数将您的输入数据分块到子列表中,每个子列表都作为工作函数的单个参数给出。
Output:
输出:
Processs a Waiting 2 seconds
Processs b Waiting 4 seconds
Process a DONE
Processs c Waiting 6 seconds
Process b DONE
Processs d Waiting 8 seconds
Process c DONE
Processs e Waiting 1 seconds
Process e DONE
Processs f Waiting 3 seconds
Process d DONE
Processs g Waiting 5 seconds
Process f DONE
Processs h Waiting 7 seconds
Process g DONE
Process h DONE
Edit as per @Thales comment below:
根据下面的@Thales 评论进行编辑:
If you want "a lock for each pool limit" so that your processes run in tandem pairs, ala:
如果您想要“每个池限制的锁”,以便您的进程以串联对运行,ala:
A waiting B waiting | A done , B done | C waiting , D waiting | C done, D done | ...
A 等待 B 等待 | A 完成,B 完成 | C 等待,D 等待 | C 完成,D 完成 | ...
then change the handler function to launch pools (of 2 processes) for each pair of data:
然后更改处理程序函数以启动每对数据的池(2 个进程):
def mp_handler():
subdata = zip(data[0::2], data[1::2])
for task1, task2 in subdata:
p = multiprocessing.Pool(2)
p.map(mp_worker, (task1, task2))
Now your output is:
现在你的输出是:
Processs a Waiting 2 seconds
Processs b Waiting 4 seconds
Process a DONE
Process b DONE
Processs c Waiting 6 seconds
Processs d Waiting 8 seconds
Process c DONE
Process d DONE
Processs e Waiting 1 seconds
Processs f Waiting 3 seconds
Process e DONE
Process f DONE
Processs g Waiting 5 seconds
Processs h Waiting 7 seconds
Process g DONE
Process h DONE
回答by jb.
Here is an example from my code (for threaded pool, but just change class name and you'll have process pool):
这是我的代码中的一个示例(对于线程池,但只需更改类名即可获得进程池):
def execute_run(rp):
... do something
pool = ThreadPoolExecutor(6)
for mat in TESTED_MATERIAL:
for en in TESTED_ENERGIES:
for ecut in TESTED_E_CUT:
rp = RunParams(
simulations, DEST_DIR,
PARTICLE, mat, 960, 0.125, ecut, en
)
pool.submit(execute_run, rp)
pool.join()
Basically:
基本上:
pool = ThreadPoolExecutor(6)creates a pool for 6 threads- Then you have bunch of for's that add tasks to the pool
pool.submit(execute_run, rp)adds a task to pool, first arogument is a function called in in a thread/process, rest of the arguments are passed to the called function.pool.joinwaits until all tasks are done.
pool = ThreadPoolExecutor(6)为 6 个线程创建一个池- 然后你有一堆将任务添加到池中的 for
pool.submit(execute_run, rp)将任务添加到池中,第一个参数是在线程/进程中调用的函数,其余参数传递给被调用的函数。pool.join等待所有任务完成。
回答by linqu
This might be not 100% related to the question, but on my search for an example of using multiprocessing with a queue this shows up first on google.
这可能不是 100% 与问题相关,但在我搜索使用多处理队列的示例时,这首先出现在谷歌上。
This is a basic example class that you can instantiate and put items in a queue and can wait until queue is finished. That's all I needed.
这是一个基本的示例类,您可以实例化并将项目放入队列中,并且可以等待队列完成。这就是我所需要的。
from multiprocessing import JoinableQueue
from multiprocessing.context import Process
class Renderer:
queue = None
def __init__(self, nb_workers=2):
self.queue = JoinableQueue()
self.processes = [Process(target=self.upload) for i in range(nb_workers)]
for p in self.processes:
p.start()
def render(self, item):
self.queue.put(item)
def upload(self):
while True:
item = self.queue.get()
if item is None:
break
# process your item here
self.queue.task_done()
def terminate(self):
""" wait until queue is empty and terminate processes """
self.queue.join()
for p in self.processes:
p.terminate()
r = Renderer()
r.render(item1)
r.render(item2)
r.terminate()
回答by ThorSummoner
Here is my personal goto for this topic:
这是我个人对这个主题的转到:
Gist here, (pull requests welcome!): https://gist.github.com/thorsummoner/b5b1dfcff7e7fdd334ec
要点在这里,(欢迎拉取请求!):https: //gist.github.com/thorsummoner/b5b1dfcff7e7fdd334ec
import multiprocessing
import sys
THREADS = 3
# Used to prevent multiple threads from mixing thier output
GLOBALLOCK = multiprocessing.Lock()
def func_worker(args):
"""This function will be called by each thread.
This function can not be a class method.
"""
# Expand list of args into named args.
str1, str2 = args
del args
# Work
# ...
# Serial-only Portion
GLOBALLOCK.acquire()
print(str1)
print(str2)
GLOBALLOCK.release()
def main(argp=None):
"""Multiprocessing Spawn Example
"""
# Create the number of threads you want
pool = multiprocessing.Pool(THREADS)
# Define two jobs, each with two args.
func_args = [
('Hello', 'World',),
('Goodbye', 'World',),
]
try:
# Spawn up to 9999999 jobs, I think this is the maximum possible.
# I do not know what happens if you exceed this.
pool.map_async(func_worker, func_args).get(9999999)
except KeyboardInterrupt:
# Allow ^C to interrupt from any thread.
sys.stdout.write('3[0m')
sys.stdout.write('User Interupt\n')
pool.close()
if __name__ == '__main__':
main()
回答by ZF007
For everyone using editors like Komodo Edit (win10) add sys.stdout.flush()to:
对于使用 Komodo Edit (win10) 等编辑器的每个人,请添加sys.stdout.flush()到:
def mp_worker((inputs, the_time)):
print " Process %s\tWaiting %s seconds" % (inputs, the_time)
time.sleep(int(the_time))
print " Process %s\tDONE" % inputs
sys.stdout.flush()
or as first line to:
或作为第一行:
if __name__ == '__main__':
sys.stdout.flush()
This helps to see what goes on during the run of the script; in stead of having to look at the black command line box.
这有助于查看脚本运行期间发生的情况;而不必查看黑色命令行框。

