C++11 线程安全队列

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/15278343/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-27 19:13:34  来源:igfitidea点击:

C++11 thread-safe queue

c++multithreadingc++11queuecondition-variable

提问by Matt Kline

A project I'm working on uses multiple threads to do work on a collection of files. Each thread can add files to the list of files to be processed, so I put together (what I thought was) a thread-safe queue. Relevant portions follow:

我正在处理的一个项目使用多个线程来处理文件集合。每个线程都可以将文件添加到要处理的文件列表中,所以我把(我认为是)一个线程安全队列放在一起。相关部分如下:

// qMutex is a std::mutex intended to guard the queue
// populatedNotifier is a std::condition_variable intended to
//                   notify waiting threads of a new item in the queue

void FileQueue::enqueue(std::string&& filename)
{
    std::lock_guard<std::mutex> lock(qMutex);
    q.push(std::move(filename));

    // Notify anyone waiting for additional files that more have arrived
    populatedNotifier.notify_one();
}

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    if (q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::no_timeout) {
            std::string ret = q.front();
            q.pop();
            return ret;
        }
        else {
            return std::string();
        }
    }
    else {
        std::string ret = q.front();
        q.pop();
        return ret;
    }
}

However, I am occasionally segfaulting inside the if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { }block, and inspection in gdb indicates that the segfaults are occurring because the queue is empty. How is this possible? It was my understanding that wait_foronly returns cv_status::no_timeoutwhen it has been notified, and this should only happen after FileQueue::enqueuehas just pushed a new item to the queue.

但是,我偶尔会在if (...wait_for(lock, timeout) == std::cv_status::no_timeout) { }块内发生段错误,并且在 gdb 中检查表明发生了段错误,因为队列为空。这怎么可能?我的理解是,wait_for只有cv_status::no_timeout在收到通知时才返回,并且只有在FileQueue::enqueue将新项目推送到队列后才会发生这种情况。

采纳答案by Grizzly

According to the standard condition_variablesare allowed to wakeup spuriously, even if the event hasn't occured. In case of a spurious wakeup it will return cv_status::no_timeout(since it woke up instead of timing out), even though it hasn't been notified. The correct solution for this is of course to check if the wakeup was actually legit before proceding.

根据标准condition_variables,即使事件没有发生,也允许虚假唤醒。在虚假唤醒的情况下,它会返回cv_status::no_timeout(因为它被唤醒而不是超时),即使它没有被通知。对此的正确解决方案当然是在继续之前检查唤醒实际上是否合法。

The details are specified in the standard §30.5.1 [thread.condition.condvar]:

详细信息在标准 §30.5.1 [thread.condition.condvar] 中指定:

—The function will unblock when signaled by a call to notify_one(), a call to notify_all(), expiration of the absolute timeout (30.2.4) speci?ed by abs_time, or spuriously.

...

Returns:cv_status::timeout if the absolute timeout (30.2.4) speci?edby abs_time expired, other-ise cv_status::no_timeout.

— 当通过调用 notify_one()、调用 notify_all()、abs_time 指定的绝对超时 (30.2.4) 或虚假发出信号时,该函数将解除阻塞。

...

返回:cv_status::timeout 如果 abs_time 指定的绝对超时 (30.2.4) 已过期,否则为 cv_status::no_timeout。

回答by ChewOnThis_Trident

It is best to make the condition (monitored by your condition variable) the inverse condition of a while-loop: while(!some_condition). Inside this loop, you go to sleep if your condition fails, triggering the body of the loop.

最好使条件(由条件变量监视)成为 while 循环的逆条件: while(!some_condition)。在此循环中,如果条件失败,您将进入睡眠状态,从而触发循环主体。

This way, if your thread is awoken--possibly spuriously--your loop will still check the condition before proceeding. Think of the conditionas the state of interest, and think of the condition variableas more of a signal from the system that this state mightbe ready. The loop will do the heavy lifting of actually confirming that it's true, and going to sleep if it's not.

这样,如果您的线程被唤醒——可能是虚假的——您的循环仍将在继续之前检查条件。将条件视为感兴趣的状态,并将条件变量视为来自系统的信号,表明该状态可能已准备就绪。循环将执行繁重的工作,即实际确认它是真实的,如果不是,则进入睡眠状态。

I just wrote a template for an async queue, hope this helps. Here, q.empty()is the inverse condition of what we want: for the queue to have something in it. So it serves as the check for the while loop.

我刚刚为异步队列编写了一个模板,希望这会有所帮助。这里,q.empty()是我们想要的逆条件:队列中有东西。所以它作为while循环的检查。

#ifndef SAFE_QUEUE
#define SAFE_QUEUE

#include <queue>
#include <mutex>
#include <condition_variable>

// A threadsafe-queue.
template <class T>
class SafeQueue
{
public:
  SafeQueue(void)
    : q()
    , m()
    , c()
  {}

  ~SafeQueue(void)
  {}

  // Add an element to the queue.
  void enqueue(T t)
  {
    std::lock_guard<std::mutex> lock(m);
    q.push(t);
    c.notify_one();
  }

  // Get the "front"-element.
  // If the queue is empty, wait till a element is avaiable.
  T dequeue(void)
  {
    std::unique_lock<std::mutex> lock(m);
    while(q.empty())
    {
      // release lock as long as the wait and reaquire it afterwards.
      c.wait(lock);
    }
    T val = q.front();
    q.pop();
    return val;
  }

private:
  std::queue<T> q;
  mutable std::mutex m;
  std::condition_variable c;
};
#endif

回答by ronag

This is probably how you should do it:

这可能是你应该怎么做:

void push(std::string&& filename)
{
    {
        std::lock_guard<std::mutex> lock(qMutex);

        q.push(std::move(filename));
    }

    populatedNotifier.notify_one();
}

bool try_pop(std::string& filename, std::chrono::milliseconds timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);

    if(!populatedNotifier.wait_for(lock, timeout, [this] { return !q.empty(); }))
        return false;

    filename = std::move(q.front());
    q.pop();

    return true;    
}

回答by quantdev

Adding to the accepted answer, I would say that implementing a correct multi producers / multi consumers queue is difficult (easier since C++11, though)

添加到已接受的答案中,我会说实现正确的多生产者/多消费者队列很困难(不过,自 C++11 以来更容易)

I would suggest you to try the (very good) lock free boost library, the "queue" structure will do what you want, with wait-free/lock-free guarantees and without the need for a C++11 compiler.

我建议您尝试(非常好的)无锁 boost 库,“队列”结构可以满足您的需求,具有无等待/无锁保证,并且不需要 C++11 编译器

I am adding this answer now because the lock-free library is quite new to boost (since 1.53 I believe)

我现在添加这个答案是因为无锁库是提升的新功能(我相信从 1.53 开始)

回答by Slava

I would rewrite your dequeue function as:

我会将您的出队函数重写为:

std::string FileQueue::dequeue(const std::chrono::milliseconds& timeout)
{
    std::unique_lock<std::mutex> lock(qMutex);
    while(q.empty()) {
        if (populatedNotifier.wait_for(lock, timeout) == std::cv_status::timeout ) 
           return std::string();
    }
    std::string ret = q.front();
    q.pop();
    return ret;
}

It is shorter and does not have duplicate code like your did. Only issue it may wait longer that timeout. To prevent that you would need to remember start time before loop, check for timeout and adjust wait time accordingly. Or specify absolute time on wait condition.

它更短,并且没有像您那样的重复代码。只有发出它可能会等待更长时间的超时。为了防止您需要记住循环前的开始时间,请检查超时并相应地调整等待时间。或者在等待条件下指定绝对时间。

回答by ransh

There is also GLib solution for this case, I did not try it yet, but I believe it is a good solution. https://developer.gnome.org/glib/2.36/glib-Asynchronous-Queues.html#g-async-queue-new

这个案例也有GLib解决方案,我还没有尝试过,但我相信这是一个很好的解决方案。 https://developer.gnome.org/glib/2.36/glib-Asynchronous-Queues.html#g-async-queue-new

回答by gm127

BlockingCollectionis a C++11 thread safe collection class that provides support for queue, stack and priority containers. It handles the "empty" queue scenario you described. As well as a "full" queue.

BlockingCollection是一个 C++11 线程安全的集合类,提供对队列、堆栈和优先级容器的支持。它处理您描述的“空”队列场景。以及“满”队列。

回答by woon minika

You may like lfqueue, https://github.com/Taymindis/lfqueue. It's lock free concurrent queue. I'm currently using it to consuming the queue from multiple incoming calls and works like a charm.

你可能喜欢 lfqueue,https://github.com/Taymindis/lfqueue。它是无锁并发队列。我目前正在使用它来使用来自多个来电的队列,并且效果很好。