C++11 中的无锁多生产者/消费者队列
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/25709548/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Lock-Free Multiple Producer/Consumer Queue in C++11
提问by Joe
I'm trying to implement a lock free multiple producer, multiple consumer queue in C++11. I'm doing this as a learning exercise, so I'm well aware that I could just use an existing open source implementation, but I'd really like to find out why my code doesn't work. The data is stored in a ringbuffer, apparently it is a "bounded MPMC queue".
我正在尝试在 C++11 中实现一个无锁的多生产者、多消费者队列。我这样做是作为一个学习练习,所以我很清楚我可以只使用现有的开源实现,但我真的很想找出为什么我的代码不起作用。数据存储在一个环形缓冲区中,显然它是一个“有界 MPMC 队列”。
I've modelled it pretty closely to what I've read of Disruptor. The thing I've noticed is that it works absolutely fine with a single consumer and single/multiple producers, it's just multiple consumers which seems to break it.
我对它的建模与我读过的 Disruptor 非常接近。我注意到的一点是,它在单个消费者和单个/多个生产者的情况下工作得非常好,只是多个消费者似乎打破了它。
Here's the queue:
这是队列:
template <typename T>
class Queue : public IQueue<T>
{
public:
explicit Queue( int capacity );
~Queue();
bool try_push( T value );
bool try_pop( T& value );
private:
typedef struct
{
bool readable;
T value;
} Item;
std::atomic<int> m_head;
std::atomic<int> m_tail;
int m_capacity;
Item* m_items;
};
template <typename T>
Queue<T>::Queue( int capacity ) :
m_head( 0 ),
m_tail( 0 ),
m_capacity(capacity),
m_items( new Item[capacity] )
{
for( int i = 0; i < capacity; ++i )
{
m_items[i].readable = false;
}
}
template <typename T>
Queue<T>::~Queue()
{
delete[] m_items;
}
template <typename T>
bool Queue<T>::try_push( T value )
{
while( true )
{
// See that there's room
int tail = m_tail.load(std::memory_order_acquire);
int new_tail = ( tail + 1 );
int head = m_head.load(std::memory_order_acquire);
if( ( new_tail - head ) >= m_capacity )
{
return false;
}
if( m_tail.compare_exchange_weak( tail, new_tail, std::memory_order_acq_rel ) )
{
// In try_pop, m_head is incremented before the reading of the value has completed,
// so though we've acquired this slot, a consumer thread may be in the middle of reading
tail %= m_capacity;
std::atomic_thread_fence( std::memory_order_acquire );
while( m_items[tail].readable )
{
}
m_items[tail].value = value;
std::atomic_thread_fence( std::memory_order_release );
m_items[tail].readable = true;
return true;
}
}
}
template <typename T>
bool Queue<T>::try_pop( T& value )
{
while( true )
{
int head = m_head.load(std::memory_order_acquire);
int tail = m_tail.load(std::memory_order_acquire);
if( head == tail )
{
return false;
}
int new_head = ( head + 1 );
if( m_head.compare_exchange_weak( head, new_head, std::memory_order_acq_rel ) )
{
head %= m_capacity;
std::atomic_thread_fence( std::memory_order_acquire );
while( !m_items[head].readable )
{
}
value = m_items[head].value;
std::atomic_thread_fence( std::memory_order_release );
m_items[head].readable = false;
return true;
}
}
}
And here's the test I'm using:
这是我正在使用的测试:
void Test( std::string name, Queue<int>& queue )
{
const int NUM_PRODUCERS = 64;
const int NUM_CONSUMERS = 2;
const int NUM_ITERATIONS = 512;
bool table[NUM_PRODUCERS*NUM_ITERATIONS];
memset(table, 0, NUM_PRODUCERS*NUM_ITERATIONS*sizeof(bool));
std::vector<std::thread> threads(NUM_PRODUCERS+NUM_CONSUMERS);
std::chrono::system_clock::time_point start, end;
start = std::chrono::system_clock::now();
std::atomic<int> pop_count (NUM_PRODUCERS * NUM_ITERATIONS);
std::atomic<int> push_count (0);
for( int thread_id = 0; thread_id < NUM_PRODUCERS; ++thread_id )
{
threads[thread_id] = std::thread([&queue,thread_id,&push_count]()
{
int base = thread_id * NUM_ITERATIONS;
for( int i = 0; i < NUM_ITERATIONS; ++i )
{
while( !queue.try_push( base + i ) ){};
push_count.fetch_add(1);
}
});
}
for( int thread_id = 0; thread_id < ( NUM_CONSUMERS ); ++thread_id )
{
threads[thread_id+NUM_PRODUCERS] = std::thread([&]()
{
int v;
while( pop_count.load() > 0 )
{
if( queue.try_pop( v ) )
{
if( table[v] )
{
std::cout << v << " already set" << std::endl;
}
table[v] = true;
pop_count.fetch_sub(1);
}
}
});
}
for( int i = 0; i < ( NUM_PRODUCERS + NUM_CONSUMERS ); ++i )
{
threads[i].join();
}
end = std::chrono::system_clock::now();
std::chrono::duration<double> duration = end - start;
std::cout << name << " " << duration.count() << std::endl;
std::atomic_thread_fence( std::memory_order_acq_rel );
bool result = true;
for( int i = 0; i < NUM_PRODUCERS * NUM_ITERATIONS; ++i )
{
if( !table[i] )
{
std::cout << "failed at " << i << std::endl;
result = false;
}
}
std::cout << name << " " << ( result? "success" : "fail" ) << std::endl;
}
Any nudging in the right direction would be greatly appreciated. I'm pretty new to memory fences rather than just using a mutex for everything, so I'm probably just fundamentally misunderstanding something.
任何朝着正确方向的推动将不胜感激。我对内存栅栏很陌生,而不仅仅是对所有东西都使用互斥锁,所以我可能只是从根本上误解了一些东西。
Cheers J
干杯J
回答by EnzoR
I'd give a look to Moody Camel's implementation.
我会看看Moody Camel的实现。
It is a fast general purpose lock-free queue for C++ entirely written in C++11. Documentation seems to be rather good along with a few performance tests.
它是一个完全用 C++11 编写的用于 C++ 的快速通用无锁队列。文档和一些性能测试似乎相当不错。
Among all other interesting things (they're worth a read anyway), it's all contained in a single header, and available under the simplified BSD license. Just drop it in your project and enjoy!
在所有其他有趣的事情中(无论如何它们都值得一读),它们都包含在一个标题中,并且在简化的 BSD 许可下可用。只需将其放入您的项目中即可享受!
回答by Calmarius
The simplest approach uses a circular buffer. That is it's like an array of 256 elements and you use uint8_t
as index so it wraps around and starts at beginning when you overflow it.
最简单的方法是使用循环缓冲区。也就是说,它就像一个包含 256 个元素的数组,您将其uint8_t
用作索引,因此当您溢出它时,它会环绕并从头开始。
The simplest primitive you can build upon is when you have single producer, single consumer thread.
您可以构建的最简单的原语是当您拥有单个生产者、单个消费者线程时。
The buffer has two heads:
缓冲区有两个头:
- Write head: It points the element which will be written next.
- Read head: It points to the element which will be read next.
- 写头:指向下一个要写的元素。
- 读取头:指向接下来要读取的元素。
Operation of the producer:
生产者的运作:
- If write Head + 1 == read head, the buffer is full, return buffer full error.
- Write content to the element.
- Insert memory barrier to sync CPU cores.
- Move the write head forward.
- 如果写头 + 1 == 读头,则缓冲区已满,返回缓冲区已满错误。
- 将内容写入元素。
- 插入内存屏障以同步 CPU 内核。
- 向前移动写入头。
At the buffer full case there is still 1 room left, but we reserve that, to distinguish from the buffer empty case.
在缓冲区已满的情况下,还剩下 1 个房间,但我们保留了它,以区别于缓冲区空的情况。
Operation of the consumer:
消费者操作:
- If read head == write head, the buffer is empty, return buffer empty error.
- Read content of the element.
- Insert memory barrier to sync CPU cores.
- Move the read head forward.
- 如果读头==写头,则缓冲区为空,返回缓冲区空错误。
- 读取元素的内容。
- 插入内存屏障以同步 CPU 内核。
- 向前移动读取头。
The producer owns the write head, the consumer owns the read head, there is no concurrency on those. Also the heads are updated when the operation is completed, this ensure the consumer leaves finished elements behind, and the consumes leaves behind fully consumed empty cells.
生产者拥有写头,消费者拥有读头,它们之间没有并发。当操作完成时,头部也会更新,这确保消费者留下完成的元素,而消费留下完全消耗的空单元格。
Create 2 of these pipes in both directions whenever you fork off a new thread and you can have bidirectional communication with your threads.
每当您 fork 一个新线程时,在两个方向上创建 2 个这样的管道,并且您可以与您的线程进行双向通信。
Given that we are talking about lock freeness it also means none of the threads are blocked, when there is nothing to do the threads are spinning empty, you may want to detect this and add some sleep when it happens.
鉴于我们正在谈论锁自由,这也意味着没有任何线程被阻塞,当无事可做时,线程正在空转,您可能想要检测到这一点并在它发生时添加一些睡眠。
回答by Oktaheta
How about this lock free queue
这个无锁队列怎么样
It is memory ordering lock free queue, but this need to pre-set number of current thread when init the queue.
它是内存排序无锁队列,但这需要在初始化队列时预先设置当前线程的数量。
For example:-
例如:-
int* ret;
int max_concurrent_thread = 16;
lfqueue_t my_queue;
lfqueue_init(&my_queue, max_concurrent_thread );
/** Wrap This scope in other threads **/
int_data = (int*) malloc(sizeof(int));
assert(int_data != NULL);
*int_data = i++;
/*Enqueue*/
while (lfqueue_enq(&my_queue, int_data) == -1) {
printf("ENQ Full ?\n");
}
/** Wrap This scope in other threads **/
/*Dequeue*/
while ( (int_data = lfqueue_deq(&my_queue)) == NULL) {
printf("DEQ EMPTY ..\n");
}
// printf("%d\n", *(int*) ret );
free(ret);
/** End **/
lfqueue_destroy(&my_queue);
回答by bittnkr
On another similar question, I presented a solutionto this problem. I believe that it the smallest found so far.
在另一个类似的问题上,我提出了这个问题的解决方案。我相信它是迄今为止发现的最小的。
I will not put same answer here, but the repositoryhas a fully functional C++ implementation of the lock free queue you desire.
我不会在这里给出相同的答案,但是该存储库具有您想要的无锁队列的全功能 C++ 实现。
EDIT: Thanks to code review from @PeterCordes, I've found a bug on the solution when using 64 bit templates, but now it's working perfectly.
编辑:感谢@PeterCordes 的代码,我在使用 64 位模板时发现了解决方案的错误,但现在它运行良好。
This is the output I receive when running the tests
这是我在运行测试时收到的输出
Creating 4 producers & 4 consumers
to flow 10.000.000 items trough the queue.
Produced: 10.743.668.245.000.000
Consumed: 5.554.289.678.184.004
Produced: 10.743.668.245.000.000
Consumed: 15.217.833.969.059.643
Produced: 10.743.668.245.000.000
Consumed: 7.380.542.769.600.801
Produced: 10.743.668.245.000.000
Consumed: 14.822.006.563.155.552
Checksum: 0 (it must be zero)