windows 多线程单读取器单写入器先进先出队列
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/1448276/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Multithreaded single-reader single-writer fifo queue
提问by Fire Lancer
I need a queue for passing messages from one thread (A) to another (B), however ive not been able to find one that really does what I want, since they generally allow adding an item to fail, a case which in my situation is pretty much fatal since the message needs to be processed, and the thread really cant stop and wait for spare room.
我需要一个队列来将消息从一个线程 (A) 传递到另一个线程 (B),但是我无法找到真正满足我要求的队列,因为它们通常允许添加一个项目失败,这在我的情况下几乎是致命的,因为需要处理消息,并且线程真的不能停下来等待空闲空间。
- Only thread A adds items, and only thread B reads them
- Thread A must never block, however thread B is not performance critical, so it can
- Adding items must always succeed, so the queue cant have an upper size limit (short of running out of memory on the system)
- If the queue is empty, thread B should wait until there is an item to process
- 只有线程 A 添加项目,只有线程 B 读取它们
- 线程 A 绝不能阻塞,但是线程 B 不是性能关键,因此它可以
- 添加项目必须始终成功,因此队列不能有大小上限(系统内存不足)
- 如果队列为空,线程 B 应该等待,直到有项目要处理
回答by Steve Jessop
Here's how to write a lock-free queue in C++:
下面是如何在 C++ 中编写无锁队列:
http://www.ddj.com/hpc-high-performance-computing/210604448
http://www.ddj.com/hpc-high-performance-computing/210604448
But when you say "thread A must not block", are you sure that's the requirement? Windows is not a real-time operating system (and neither is linux, in normal use). If you want Thread A to be able to use all available system memory, then it needs to allocate memory (or wait while someone else does). The OS itself cannot provide timing guarantees any better than those you'd have if both reader and writer took an in-process lock (i.e. a non-shared mutex) in order to manipulate the list. And the worst-case of adding a message is going to have to go to the OS to get memory.
但是当您说“线程 A 不能阻塞”时,您确定这是要求吗?Windows 不是实时操作系统(正常使用中的 linux 也不是)。如果您希望线程 A 能够使用所有可用的系统内存,则它需要分配内存(或等待其他人这样做)。如果读取器和写入器都使用进程内锁(即非共享互斥锁)来操作列表,则操作系统本身无法提供比您拥有的更好的时序保证。添加消息的最坏情况将不得不去操作系统获取内存。
In short, there's a reason those queues you don't like have a fixed capacity - it's so that they don't have to allocate memory in the supposedly low-latency thread.
简而言之,您不喜欢那些队列具有固定容量是有原因的——这样它们就不必在所谓的低延迟线程中分配内存。
So the lock-free code will generally be less block-y, but due to the memory allocation it isn't guaranteed to be, and performance with a mutex shouldn't be all that shabby unless you have a truly huge stream of events to process (like, you're writing a network driver and the messages are incoming ethernet packets).
所以无锁代码通常会更少块,但由于内存分配,它不能保证,并且互斥锁的性能不应该那么糟糕,除非你有一个真正巨大的事件流进程(例如,您正在编写网络驱动程序并且消息是传入的以太网数据包)。
So, in pseudo-code, the first thing I'd try would be:
所以,在伪代码中,我会尝试的第一件事是:
Writer:
allocate message and fill it in
acquire lock
append node to intrusive list
signal condition variable
release lock
Reader:
for(;;)
acquire lock
for(;;)
if there's a node
remove it
break
else
wait on condition variable
endif
endfor
release lock
process message
free message
endfor
Only if this proves to introduce unacceptable delays in the writer thread would I go to lock-free code, (unless I happened to have a suitable queue already lying around).
只有当这证明在编写器线程中引入了不可接受的延迟时,我才会使用无锁代码(除非我碰巧已经有一个合适的队列)。
回答by Rick
Visual Studio 2010 is adding 2 new libraries which support this scenario very well, the Asynchronous Agents Libraryand Parallel Pattern Library.
Visual Studio 2010 正在添加 2 个新库,它们非常好地支持这种情况,即异步代理库和并行模式库。
The agents library has support or asynchronous message passing and contains message blocks for sending messages to 'targets' and for receiving messages from 'sources'
代理库支持或异步消息传递,并包含用于向“目标”发送消息和从“源”接收消息的消息块
An unbounded_buffer is a template class which offers what I believe you are looking for:
unbounded_buffer 是一个模板类,它提供了我相信您正在寻找的内容:
#include <agents.h>
#include <ppl.h>
#include <iostream>
using namespace ::Concurrency;
using namespace ::std;
int main()
{
//to hold our messages, the buffer is unbounded...
unbounded_buffer<int> buf1;
task_group tasks;
//thread 1 sends messages to the unbounded_buffer
//without blocking
tasks.run([&buf1](){
for(int i = 0 ; i < 10000; ++i)
send(&buf1,i)
//signal exit
send(&buf1,-1);
});
//thread 2 receives messages and blocks if there are none
tasks.run([&buf1](){
int result;
while(result = receive(&buf1)!=-1)
{
cout << "I got a " << result << endl;
}
});
//wait for the threads to end
tasks.wait();
}
回答by Mr.Ree
Why not use STL <
list
> or <deque
> with a mutex around add/remove? Is the thread-safety of STLinsufficient?Why not create your own (singly/doubly) linked-list-node class that contains a pointer, and have the items to be added/removed inherit from that? Thus making additional allocation unnecessary. You just frob a few pointers in
threadA::add()
andthreadB::remove()
and you are done. (While you'd want to do that under a mutex, the blocking effect on threadA would be negligible unless you did something really wrong...)If you're using pthreads, check out
sem_post()
andsem_wait()
. The idea is that threadB can block indefinitely viasem_wait()
until threadA puts something on the queue. Then threadA invokessem_post()
. Which wakes up threadB to do it's work. After which threadB can go back to sleep. It's an efficient way of handling asynchronous signaling, supporting things like multiplethreadA::add()
's beforethreadB::remove()
completes.
为什么不创建自己的(单/双)链表节点类,其中包含一个指针,并让要添加/删除的项目继承自该类?因此不需要额外的分配。你只需在FROB几个指针
threadA::add()
,并threadB::remove()
和你做。(虽然您想在互斥锁下执行此操作,但除非您确实做错了什么,否则对 threadA 的阻塞影响可以忽略不计......)如果您使用的是 pthread,请查看
sem_post()
和sem_wait()
。这个想法是线程B可以无限期地阻塞,sem_wait()
直到线程A在队列中放入一些东西。然后 threadA 调用sem_post()
. 这唤醒了 threadB 来完成它的工作。之后线程B可以返回睡眠状态。这是一种处理异步信号的有效方式,支持多个threadA::add()
之前threadB::remove()
完成的事情。
回答by JustJeff
You might want to consider your requirements - is it truly the case that A can't discard any queue items whatsoever? Or is it that you don't want B to pull two consecutive elements out of the queue that weren't consecutive items going in because that would somehow misrepresent a sequence of events?
您可能需要考虑您的要求 - A 真的不能丢弃任何队列项目吗?或者您是否不希望 B 从队列中拉出两个连续的元素,这些元素不是连续的项目进入,因为这会以某种方式歪曲事件序列?
For example, if this is some kind of data logging system, you (understandably) wouldn't want gaps in the record -- but without an unlimited memory, the reality is that in some corner case somewhere you probably could overrun your queue capacity..
例如,如果这是某种数据记录系统,您(可以理解)不希望记录中有间隙——但如果没有无限内存,现实情况是,在某些极端情况下,您可能会超出队列容量。 .
In which case one solution is to have some kind of special element that can be put in the queue, which represents the case of A discovering that it had to drop items. Basically you keep one extra element around, which is null most of the time. Every time A goes to add elements to the queue, if this extra element is not null, that goes in. If A discovers there is no room in the queue, then it configures this extra element to say 'hey, the queue was full'.
在这种情况下,一种解决方案是将某种特殊元素放入队列中,这表示 A 发现它必须删除项目的情况。基本上你会保留一个额外的元素,大多数时候它是空的。每次 A 去往队列中添加元素时,如果这个额外的元素不为空,那就进去。如果 A 发现队列中没有空间,那么它配置这个额外的元素说'嘿,队列已满' .
This way, A never blocks, you can drop elements when the system is Very Busy, but you don't lose sight of the fact that elements were dropped, because as soon as queue space becomes available, this mark goes in to indicate where data drop occurred. Process B then does whatever it needs to do when it discovers it has pulled this overrun mark element out of the queue.
这样,A 永远不会阻塞,您可以在系统非常忙时删除元素,但您不会忽略元素被删除的事实,因为一旦队列空间可用,此标记就会进入指示数据的位置发生下降。然后,当进程 B 发现它已将这个溢出标记元素从队列中拉出时,它会做任何它需要做的事情。