python 将 multiprocessing.Queue 转储到列表中

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/1540822/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-11-03 22:32:13  来源:igfitidea点击:

Dumping a multiprocessing.Queue into a list

pythonqueuemultiprocessing

提问by Ram Rachum

I wish to dump a multiprocessing.Queueinto a list. For that task I've written the following function:

我希望将 a 转储multiprocessing.Queue到列表中。对于该任务,我编写了以下函数:

import Queue

def dump_queue(queue):
    """
    Empties all pending items in a queue and returns them in a list.
    """
    result = []

    # START DEBUG CODE
    initial_size = queue.qsize()
    print("Queue has %s items initially." % initial_size)
    # END DEBUG CODE

    while True:
        try:
            thing = queue.get(block=False)
            result.append(thing)
        except Queue.Empty:

            # START DEBUG CODE
            current_size = queue.qsize()
            total_size = current_size + len(result)
            print("Dumping complete:")
            if current_size == initial_size:
                print("No items were added to the queue.")
            else:
                print("%s items were added to the queue." % \
                      (total_size - initial_size))
            print("Extracted %s items from the queue, queue has %s items \
            left" % (len(result), current_size))
            # END DEBUG CODE

            return result

But for some reason it doesn't work.

但由于某种原因它不起作用。

Observe the following shell session:

观察以下 shell 会话:

>>> import multiprocessing
>>> q = multiprocessing.Queue()
>>> for i in range(100):
...     q.put([range(200) for j in range(100)])
... 
>>> q.qsize()
100
>>> l=dump_queue(q)
Queue has 100 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 1 items from the queue, queue has 99 items left
>>> l=dump_queue(q)
Queue has 99 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 3 items from the queue, queue has 96 items left
>>> l=dump_queue(q)
Queue has 96 items initially.
Dumping complete:
0 items were added to the queue.
Extracted 1 items from the queue, queue has 95 items left
>>> 

What's happening here? Why aren't all the items being dumped?

这里发生了什么事?为什么不是所有的物品都被倾倒?

回答by jnoller

Try this:

试试这个:

import Queue
import time

def dump_queue(queue):
    """
    Empties all pending items in a queue and returns them in a list.
    """
    result = []

    for i in iter(queue.get, 'STOP'):
        result.append(i)
    time.sleep(.1)
    return result

import multiprocessing
q = multiprocessing.Queue()
for i in range(100):
    q.put([range(200) for j in range(100)])
q.put('STOP')
l=dump_queue(q)
print len(l)

Multiprocessing queues have an internal buffer which has a feeder thread which pulls work off a buffer and flushes it to the pipe. If not all of the objects have been flushed, I could see a case where Empty is raised prematurely. Using a sentinel to indicate the end of the queue is safe (and reliable). Also, using the iter(get, sentinel) idiom is just better than relying on Empty.

多处理队列有一个内部缓冲区,它有一个馈线线程,可以从缓冲区中提取工作并将其刷新到管道中。如果不是所有对象都被刷新,我可以看到 Empty 过早引发的情况。使用哨兵指示队列的末尾是安全的(可靠的)。此外,使用 iter(get, sentinel) 习惯用法比依赖 Empty 更好。

I don't like that it could raise empty due to flushing timing (I added the time.sleep(.1) to allow a context switch to the feeder thread, you may not need it, it works without it - it's a habit to release the GIL).

我不喜欢它可能由于刷新时间而导致空置(我添加了 time.sleep(.1) 以允许上下文切换到馈线线程,您可能不需要它,没有它也可以工作 - 这是一种习惯释放 GIL)。

回答by Matheus Araujo

In some situations we already computed everything and we want just to convert the Queue.

在某些情况下,我们已经计算了所有内容,我们只想转换队列。

shared_queue = Queue()
shared_queue_list = []
...
join() #All process are joined
while shared_queue.qsize() != 0:
    shared_queue_list.append(shared_queue.get())

Now shared_queue_list has the results converted to a list.

现在 shared_queue_list 将结果转换为列表。