python python多处理的生产者/消费者问题
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/914821/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
producer/consumer problem with python multiprocessing
提问by btw0
I am writing a server program with one producer and multiple consumers, what confuses me is only the first task producer put into the queue gets consumed, after which tasks enqueued no longer get consumed, they remain in the queue forever.
我正在编写一个带有一个生产者和多个消费者的服务器程序,让我感到困惑的是只有第一个放入队列的任务生产者被消耗,之后排队的任务不再被消耗,它们永远留在队列中。
from multiprocessing import Process, Queue, cpu_count
from http import httpserv
import time
def work(queue):
while True:
task = queue.get()
if task is None:
break
time.sleep(5)
print "task done:", task
queue.put(None)
class Manager:
def __init__(self):
self.queue = Queue()
self.NUMBER_OF_PROCESSES = cpu_count()
def start(self):
self.workers = [Process(target=work, args=(self.queue,))
for i in xrange(self.NUMBER_OF_PROCESSES)]
for w in self.workers:
w.start()
httpserv(self.queue)
def stop(self):
self.queue.put(None)
for i in range(self.NUMBER_OF_PROCESSES):
self.workers[i].join()
queue.close()
Manager().start()
The producer is a HTTP server which put a task in the queue once receive a request from the user. It seems that consumer processes are still blocked when there are new tasks in the queue, which is weird.
生产者是一个 HTTP 服务器,一旦收到用户的请求,它就会将任务放入队列中。好像队列中有新任务的时候消费者进程还是被阻塞了,这很奇怪。
P.S. Another two questions not relating to the above, I am not sure if it's better to put HTTP server in its own process other than the main process, if yes how can I make the main process keep running before all children processes end. Second question, what's the best way to stop the HTTP server gracefully?
PS另外两个与上述无关的问题,我不确定将HTTP服务器放在主进程之外的自己的进程中是否更好,如果是的话,我怎样才能让主进程在所有子进程结束之前继续运行。第二个问题,优雅地停止 HTTP 服务器的最佳方法是什么?
Edit: add producer code, it's just a simple python wsgi server:
编辑:添加生产者代码,它只是一个简单的 python wsgi 服务器:
import fapws._evwsgi as evwsgi
from fapws import base
def httpserv(queue):
evwsgi.start("0.0.0.0", 8080)
evwsgi.set_base_module(base)
def request_1(environ, start_response):
start_response('200 OK', [('Content-Type','text/html')])
queue.put('task_1')
return ["request 1!"]
def request_2(environ, start_response):
start_response('200 OK', [('Content-Type','text/html')])
queue.put('task_2')
return ["request 2!!"]
evwsgi.wsgi_cb(("/request_1", request_1))
evwsgi.wsgi_cb(("/request_2", request_2))
evwsgi.run()
采纳答案by Luper Rouch
I think there must be something wrong with the web server part, as this works perfectly:
我认为 Web 服务器部分肯定有问题,因为它可以完美运行:
from multiprocessing import Process, Queue, cpu_count
import random
import time
def serve(queue):
works = ["task_1", "task_2"]
while True:
time.sleep(0.01)
queue.put(random.choice(works))
def work(id, queue):
while True:
task = queue.get()
if task is None:
break
time.sleep(0.05)
print "%d task:" % id, task
queue.put(None)
class Manager:
def __init__(self):
self.queue = Queue()
self.NUMBER_OF_PROCESSES = cpu_count()
def start(self):
print "starting %d workers" % self.NUMBER_OF_PROCESSES
self.workers = [Process(target=work, args=(i, self.queue,))
for i in xrange(self.NUMBER_OF_PROCESSES)]
for w in self.workers:
w.start()
serve(self.queue)
def stop(self):
self.queue.put(None)
for i in range(self.NUMBER_OF_PROCESSES):
self.workers[i].join()
self.queue.close()
Manager().start()
Sample output:
示例输出:
starting 2 workers
0 task: task_1
1 task: task_2
0 task: task_2
1 task: task_1
0 task: task_1
回答by S.Lott
"Second question, what's the best way to stop the HTTP server gracefully?"
“第二个问题,优雅地停止 HTTP 服务器的最佳方式是什么?”
This is hard.
这很难。
You have two choices for Interprocess Communication:
进程间通信有两种选择:
Out-of-band controls. The server has another mechanism for communication. Another socket, a Unix Signal, or something else. The something else could be a "stop-now" file in the server's local directory. Seems odd, but it does work well and is simpler than introducing a select loop to listen on multiple sockets or a signal handler to catch a Unis signal.
The "stop-now" file is easy to implement. The
evwsgi.run()
loop merely checks for this file after each request. To make the server stop, you create the file, execute a/control
request (which will get a 500 error or something, it doesn't really matter) and the server should grind to a halt. Remember to delete the stop-now file, otherwise your server won't restart.In-band controls. The server has another URL (
/stop
) which will stop it. Superficially, this seems like a security nightmare, but it depends entirely on where and how this server will be used. Since it appears to be a simple wrapper around an internal request queue, this extra URL works well.To make this work, you need to write your own version of
evwsgi.run()
that can be terminated by setting some variable in a way that will break out of the loop.
带外控制。服务器具有另一种通信机制。另一个套接字、Unix 信号或其他东西。其他内容可能是服务器本地目录中的“立即停止”文件。看起来很奇怪,但它确实工作得很好,而且比引入一个选择循环来侦听多个套接字或一个信号处理程序来捕获 Unis 信号要简单。
“立即停止”文件很容易实现。该
evwsgi.run()
环仅仅检查每个请求后该文件。为了让服务器停止,你创建文件,执行一个/control
请求(这会得到一个 500 错误或什么的,这并不重要),服务器应该会停止。记得删除 stop-now 文件,否则你的服务器不会重启。带内控制。服务器有另一个 URL (
/stop
) 将停止它。从表面上看,这似乎是一场安全噩梦,但这完全取决于该服务器的使用地点和方式。由于它似乎是内部请求队列的简单包装器,因此这个额外的 URL 运行良好。为了使这个工作,你需要编写你自己的版本
evwsgi.run()
,可以通过设置一些变量来终止循环。
Edit
编辑
You probably don't want to terminate your server, since you don't know the state of it's worker threads. You need to signal the server and then you just have to wait until it finishes things normally.
您可能不想终止服务器,因为您不知道它的工作线程的状态。您需要向服务器发送信号,然后您只需等待它正常完成操作。
If you want to forcibly kill the server, then os.kill()
(or multiprocessing.terminate
) will work. Except, of course, you don't know what the child threads were doing.
如果您想强行杀死服务器,则os.kill()
(或multiprocessing.terminate
) 将起作用。当然,除了您不知道子线程在做什么之外。