C++ 等价于 Java 的 BlockingQueue
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/12805041/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
C++ Equivalent to Java's BlockingQueue
提问by Ben
I'm in the process of porting some Java code over to C++, and one particular section makes use of a BlockingQueue to pass messages from many producers to a single consumer.
我正在将一些 Java 代码移植到 C++,其中一个特定部分使用 BlockingQueue 将消息从多个生产者传递给单个使用者。
If you are not familiar with what a Java BlockingQueue is, it is just a queue that has a hard capacity, which exposes thread safe methods to put() and take() from the queue. put() blocks if the queue is full, and take() blocks if the queue is empty. Also, timeout-sensitive versions of these methods are supplied.
如果您不熟悉 Java BlockingQueue 是什么,那么它只是一个具有硬容量的队列,它向队列中的 put() 和 take() 公开线程安全方法。如果队列已满,则 put() 阻塞,如果队列为空,则 take() 阻塞。此外,还提供了这些方法的超时敏感版本。
Timeouts are relevant to my use-case, so a recommendation that supplies those is ideal. If not, I can code up some myself.
超时与我的用例相关,因此提供这些的建议是理想的。如果没有,我可以自己编写一些代码。
I've googled around and quickly browsed the Boost libraries and I'm not finding anything like this. Maybe I'm blind here...but does anyone know of a good recommendation?
我在谷歌上搜索并快速浏览了 Boost 库,但没有找到类似的东西。也许我在这里瞎了……但是有人知道一个好的推荐吗?
Thanks!
谢谢!
回答by Dietmar Kühl
It isn't fixed size and it doesn't support timeouts but here is a simple implementation of a queue I had posted recently using C++ 2011 constructs:
它不是固定大小,也不支持超时,但这是我最近使用 C++ 2011 构造发布的队列的简单实现:
#include <mutex>
#include <condition_variable>
#include <deque>
template <typename T>
class queue
{
private:
std::mutex d_mutex;
std::condition_variable d_condition;
std::deque<T> d_queue;
public:
void push(T const& value) {
{
std::unique_lock<std::mutex> lock(this->d_mutex);
d_queue.push_front(value);
}
this->d_condition.notify_one();
}
T pop() {
std::unique_lock<std::mutex> lock(this->d_mutex);
this->d_condition.wait(lock, [=]{ return !this->d_queue.empty(); });
T rc(std::move(this->d_queue.back()));
this->d_queue.pop_back();
return rc;
}
};
It should be trivial to extend and use a timed wait for popping. The main reason I haven't done it is that I'm not happy with the interface choices I have thought of so far.
扩展和使用定时等待弹出应该是微不足道的。我没有这样做的主要原因是我对目前想到的界面选择不满意。
回答by Serge Rogatch
Here's an example of a blocking queue with shutdown requestfeature:
template <typename T> class BlockingQueue {
std::condition_variable _cvCanPop;
std::mutex _sync;
std::queue<T> _qu;
bool _bShutdown = false;
public:
void Push(const T& item)
{
{
std::unique_lock<std::mutex> lock(_sync);
_qu.push(item);
}
_cvCanPop.notify_one();
}
void RequestShutdown() {
{
std::unique_lock<std::mutex> lock(_sync);
_bShutdown = true;
}
_cvCanPop.notify_all();
}
bool Pop(T &item) {
std::unique_lock<std::mutex> lock(_sync);
for (;;) {
if (_qu.empty()) {
if (_bShutdown) {
return false;
}
}
else {
break;
}
_cvCanPop.wait(lock);
}
item = std::move(_qu.front());
_qu.pop();
return true;
}
};
回答by SkyWalker
OK I'm a bit late to the party but I think this is a better fit for the Java's BlockingQueue
implementation. Here I too use one mutex and two conditions to look after not full and not empty. IMO a BlockingQueue
makes more sense with limited capacity which I didn't see in the other answers. I include a simple test scenario too:
好吧,我参加聚会有点晚了,但我认为这更适合 Java 的BlockingQueue
实现。在这里,我也使用一个互斥锁和两个条件来处理非满和非空。IMO aBlockingQueue
在容量有限的情况下更有意义,我在其他答案中没有看到。我也包括一个简单的测试场景:
#include <iostream>
#include <algorithm>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>
template<typename T>
class blocking_queue {
private:
size_t _capacity;
std::queue<T> _queue;
std::mutex _mutex;
std::condition_variable _not_full;
std::condition_variable _not_empty;
public:
inline blocking_queue(size_t capacity) : _capacity(capacity) {
// empty
}
inline size_t size() const {
std::unique_lock<std::mutex> lock(_mutex);
return _queue.size();
}
inline bool empty() const {
std::unique_lock<std::mutex> lock(_mutex);
return _queue.empty();
}
inline void push(const T& elem) {
{
std::unique_lock<std::mutex> lock(_mutex);
// wait while the queue is full
while (_queue.size() >= _capacity) {
_not_full.wait(lock);
}
std::cout << "pushing element " << elem << std::endl;
_queue.push(elem);
}
_not_empty.notify_all();
}
inline void pop() {
{
std::unique_lock<std::mutex> lock(_mutex);
// wait while the queue is empty
while (_queue.size() == 0) {
_not_empty.wait(lock);
}
std::cout << "popping element " << _queue.front() << std::endl;
_queue.pop();
}
_not_full.notify_one();
}
inline const T& front() {
std::unique_lock<std::mutex> lock(_mutex);
// wait while the queue is empty
while (_queue.size() == 0) {
_not_empty.wait(lock);
}
return _queue.front();
}
};
int main() {
blocking_queue<int> queue(5);
// create producers
std::vector<std::thread> producers;
for (int i = 0; i < 10; i++) {
producers.push_back(std::thread([&queue, i]() {
queue.push(i);
// produces too fast
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}));
}
// create consumers
std::vector<std::thread> consumers;
for (int i = 0; i < 10; i++) {
producers.push_back(std::thread([&queue, i]() {
queue.pop();
// consumes too slowly
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}));
}
std::for_each(producers.begin(), producers.end(), [](std::thread &thread) {
thread.join();
});
std::for_each(consumers.begin(), consumers.end(), [](std::thread &thread) {
thread.join();
});
return EXIT_SUCCESS;
}