从线程队列中获取所有项目

时间:2020-03-06 14:57:44  来源:igfitidea点击:

我有一个线程将结果写入队列。

在另一个线程(GUI)中,我定期(在IDLE事件中)检查队列中是否有结果,如下所示:

def queue_get_all(q):
    items = []
    while 1:
        try:
            items.append(q.get_nowait())
        except Empty, e:
            break
    return items

这是一个好方法吗?

编辑:

I'm asking because sometimes the
  waiting thread gets stuck for a few
  seconds without taking out new
  results.

事实证明,"卡住"问题是因为我在空闲事件处理程序中进行处理,而未确保按照建议的那样通过调用wx.WakeUpIdle实际上生成了此类事件。

解决方案

我看到我们使用的是get_nowait(),根据文档,"如果有立即可用,则返回一个项目,否则引发Empty异常"

现在,当抛出Empty异常时,我们碰巧跳出了循环。因此,如果队列中没有立即可用的结果,则函数将返回一个空项目列表。

有没有理由不使用get()方法呢?由于队列在同一时间正在为put()请求提供服务,因此get_nowait()可能会失败。

如果我们总是将所有可用项目从队列中拉出,那么使用队列而不是仅仅带有锁的列表有真正的意义吗? IE:

from __future__ import with_statement
import threading

class ItemStore(object):
    def __init__(self):
        self.lock = threading.Lock()
        self.items = []

    def add(self, item):
        with self.lock:
            self.items.append(item)

    def getAll(self):
        with self.lock:
            items, self.items = self.items, []
        return items

如果还单独拉出它们,并利用空队列的阻塞行为,则应该使用Queue,但是用例看起来要简单得多,上述方法可能会更好。

[Edit2]我错过了我们从空闲循环中轮询队列的事实,并且从更新中我发现问题与争用无关,因此以下方法与问题无关。如果有人发现此实用程序的阻塞变体,我将其保留:

对于确实要阻塞直到获得至少一个结果的情况,可以修改上面的代码以等待数据通过生产者线程发出信号而变为可用。例如。

class ItemStore(object):
    def __init__(self):
        self.cond = threading.Condition()
        self.items = []

    def add(self, item):
        with self.cond:
            self.items.append(item)
            self.cond.notify() # Wake 1 thread waiting on cond (if any)

    def getAll(self, blocking=False):
        with self.cond:
            # If blocking is true, always return at least 1 item
            while blocking and len(self.items) == 0:
                self.cond.wait()
            items, self.items = self.items, []
        return items

如果get_nowait()调用通过不返回列表为空而引起暂停,我会感到非常惊讶。

可能是我们在两次检查之间发布了大量(也许很大?)项目,这意味着接收线程有大量数据要从"队列"中拉出?我们可以尝试限制一批中检索到的数量:

def queue_get_all(q):
    items = []
    maxItemsToRetreive = 10
    for numOfItemsRetrieved in range(0, maxItemsToRetreive):
        try:
            if numOfItemsRetrieved == maxItemsToRetreive:
                break
            items.append(q.get_nowait())
        except Empty, e:
            break
    return items

这将限制接收线程一次最多拉出10个项目。