在多个 Python 多处理队列上“选择”?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/1123855/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 21:32:34  来源:igfitidea点击:

"select" on multiple Python multiprocessing Queues?

pythoneventsselectsynchronizationmultiprocessing

提问by cdleary

What's the best way to wait (without spinning) until something is available in either one of two (multiprocessing) Queues, where both reside on the same system?

等待(不旋转)直到两个(多处理)Queues之一可用的最佳方法是什么,两者都驻留在同一系统上?

采纳答案by ars

It doesn't look like there's an official way to handle this yet. Or at least, not based on this:

看起来还没有官方的方法来处理这个问题。或者至少,不是基于此:

You could try something like what this post is doing -- accessing the underlying pipe filehandles:

您可以尝试类似这篇文章所做的事情——访问底层管道文件句柄:

and then use select.

然后使用选择。

回答by silverado

Actually you can use multiprocessing.Queue objects in select.select. i.e.

实际上你可以在 select.select 中使用 multiprocessing.Queue 对象。IE

que = multiprocessing.Queue()
(input,[],[]) = select.select([que._reader],[],[])

would select que only if it is ready to be read from.

只有当它准备好被读取时才会选择 que。

No documentation about it though. I was reading the source code of the multiprocessing.queue library (at linux it's usually sth like /usr/lib/python2.6/multiprocessing/queue.py) to find it out.

虽然没有关于它的文档。我正在阅读 multiprocessing.queue 库的源代码(在 linux 上,它通常类似于 /usr/lib/python2.6/multiprocessing/queue.py)以找到它。

With Queue.Queue I didn't have found any smart way to do this (and I would really love to).

使用 Queue.Queue 我没有找到任何聪明的方法来做到这一点(我真的很想这样做)。

回答by hpk42

Seems like using threads which forward incoming items to a single Queue which you then wait on is a practical choice when using multiprocessing in a platform independent manner.

当以独立于平台的方式使用多处理时,似乎使用将传入项目转发到单个队列然后等待的线程是一个实用的选择。

Avoiding the threads requires either handling low-level pipes/FDs which is both platform specific and not easy to handle consistently with the higher-level API.

避免线程需要处理低级管道/FD,这既是平台特定的,又不容易与高级 API 一致处理。

Or you would need Queues with the ability to set callbacks which i think are the proper higher level interface to go for. I.e. you would write something like:

或者你需要能够设置回调的队列,我认为这是合适的更高级别的接口。即你会写这样的东西:

  singlequeue = Queue()
  incoming_queue1.setcallback(singlequeue.put)
  incoming_queue2.setcallback(singlequeue.put)
  ...
  singlequeue.get()

Maybe the multiprocessing package could grow this API but it's not there yet. The concept works well with py.execnet which uses the term "channel" instead of "queues", see here http://tinyurl.com/nmtr4w

也许 multiprocessing 包可以增加这个 API,但它还没有。该概念适用于使用术语“通道”而不是“队列”的 py.execnet,请参见此处http://tinyurl.com/nmtr4w

回答by Tony Wallace

Not sure how well the select on a multiprocessing queue works on windows. As select on windows listens for sockets and not file handles, I suspect there could be problems.

不确定多处理队列上的选择在 Windows 上的效果如何。由于 Windows 上的 select 侦听套接字而不是文件句柄,因此我怀疑可能存在问题。

My answer is to make a thread to listen to each queue in a blocking fashion, and to put the results all into a single queue listened to by the main thread, essentially multiplexing the individual queues into a single one.

我的答案是让一个线程以阻塞方式侦听每个队列,并将结果全部放入主线程侦听的单个队列中,实质上是将各个队列多路复用为一个队列。

My code for doing this is:

我这样做的代码是:

"""
Allow multiple queues to be waited upon.

queue,value = multiq.select(list_of_queues)
"""
import queue
import threading

class queue_reader(threading.Thread):
    def __init__(self,inq,sharedq):
        threading.Thread.__init__(self)
        self.inq = inq
        self.sharedq = sharedq
    def run(self):
        while True:
            data = self.inq.get()
            print ("thread reads data=",data)
            result = (self.inq,data)
            self.sharedq.put(result)

class multi_queue(queue.Queue):
    def __init__(self,list_of_queues):
        queue.Queue.__init__(self)
        for q in list_of_queues:
            qr = queue_reader(q,self)
            qr.start()

def select(list_of_queues):
    outq = queue.Queue()
    for q in list_of_queues:
        qr = queue_reader(q,outq)
        qr.start()
    return outq.get()

The following test routine shows how to use it:

以下测试例程显示了如何使用它:

import multiq
import queue

q1 = queue.Queue()
q2 = queue.Queue()

q3 = multiq.multi_queue([q1,q2])

q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)

res=0
while not res==4:
    while not q3.empty():
        res = q3.get()[1]
        print ("returning result =",res)

Hope this helps.

希望这可以帮助。

Tony Wallace

托尼华莱士

回答by Tony Wallace

New version of above code...

以上代码的新版本...

Not sure how well the select on a multiprocessing queue works on windows. As select on windows listens for sockets and not file handles, I suspect there could be problems.

不确定多处理队列上的选择在 Windows 上的效果如何。由于 Windows 上的 select 侦听套接字而不是文件句柄,因此我怀疑可能存在问题。

My answer is to make a thread to listen to each queue in a blocking fashion, and to put the results all into a single queue listened to by the main thread, essentially multiplexing the individual queues into a single one.

我的答案是让一个线程以阻塞方式侦听每个队列,并将结果全部放入主线程侦听的单个队列中,实质上是将各个队列多路复用为一个队列。

My code for doing this is:

我这样做的代码是:

"""
Allow multiple queues to be waited upon.

An EndOfQueueMarker marks a queue as
    "all data sent on this queue".
When this marker has been accessed on
all input threads, this marker is returned
by the multi_queue.

"""
import queue
import threading

class EndOfQueueMarker:
    def __str___(self):
        return "End of data marker"
    pass

class queue_reader(threading.Thread):
    def __init__(self,inq,sharedq):
        threading.Thread.__init__(self)
        self.inq = inq
        self.sharedq = sharedq
    def run(self):
        q_run = True
        while q_run:
            data = self.inq.get()
            result = (self.inq,data)
            self.sharedq.put(result)
            if data is EndOfQueueMarker:
                q_run = False

class multi_queue(queue.Queue):
    def __init__(self,list_of_queues):
        queue.Queue.__init__(self)
        self.qList = list_of_queues
        self.qrList = []
        for q in list_of_queues:
            qr = queue_reader(q,self)
            qr.start()
            self.qrList.append(qr)
    def get(self,blocking=True,timeout=None):
        res = []
        while len(res)==0:
            if len(self.qList)==0:
                res = (self,EndOfQueueMarker)
            else:
                res = queue.Queue.get(self,blocking,timeout)
                if res[1] is EndOfQueueMarker:
                    self.qList.remove(res[0])
                    res = []
        return res

    def join(self):
        for qr in self.qrList:
            qr.join()

def select(list_of_queues):
    outq = queue.Queue()
    for q in list_of_queues:
        qr = queue_reader(q,outq)
        qr.start()
    return outq.get()

The follow code is my test routine to show how it works:

以下代码是我的测试例程,用于展示它是如何工作的:

import multiq
import queue

q1 = queue.Queue()
q2 = queue.Queue()

q3 = multiq.multi_queue([q1,q2])

q1.put(1)
q2.put(2)
q1.put(3)
q1.put(4)
q1.put(multiq.EndOfQueueMarker)
q2.put(multiq.EndOfQueueMarker)
res=0
have_data = True
while have_data:
    res = q3.get()[1]
    print ("returning result =",res)
    have_data = not(res==multiq.EndOfQueueMarker)

回答by uniquesnowflake8

You could use something like the Observerpattern, wherein Queue subscribers are notified of state changes.

您可以使用类似于观察者模式的东西,其中队列订阅者会收到状态变化的通知。

In this case, you could have your worker thread designated as a listener on each queue, and whenever it receives a ready signal, it can work on the new item, otherwise sleep.

在这种情况下,您可以将您的工作线程指定为每个队列上的侦听器,并且每当它收到就绪信号时,它就可以处理新项目,否则就休眠。

回答by spacegoliath

As of Python 3.3 you can use multiprocessing.connection.waitto wait on multiple Queue._readerobjects at once.

从 Python 3.3 开始,您可以使用multiprocessing.connection.waitQueue._reader一次等待多个对象。

回答by Mike

Don't do it.

不要这样做。

Put a header on the messages and send them to a common queue. This simplifies the code and will be cleaner overall.

在消息上放置一个标题并将它们发送到一个公共队列。这简化了代码,整体上会更干净。