C# 在 .NET 中创建阻塞 Queue<T>?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/530211/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Creating a blocking Queue<T> in .NET?
提问by Eric Schoonover
I have a scenario where I have multiple threads adding to a queue and multiple threads reading from the same queue. If the queue reaches a specific size all threadsthat are filling the queue will be blocked on add until an item is removed from the queue.
我有一个场景,我有多个线程添加到一个队列和多个线程从同一个队列中读取。如果队列达到特定大小,所有填充队列的线程将在添加时被阻塞,直到从队列中删除一个项目。
The solution below is what I am using right now and my question is: How can this be improved? Is there an object that already enables this behavior in the BCL that I should be using?
下面的解决方案是我现在正在使用的,我的问题是:如何改进?是否有一个对象已经在我应该使用的 BCL 中启用了这种行为?
internal class BlockingCollection<T> : CollectionBase, IEnumerable
{
//todo: might be worth changing this into a proper QUEUE
private AutoResetEvent _FullEvent = new AutoResetEvent(false);
internal T this[int i]
{
get { return (T) List[i]; }
}
private int _MaxSize;
internal int MaxSize
{
get { return _MaxSize; }
set
{
_MaxSize = value;
checkSize();
}
}
internal BlockingCollection(int maxSize)
{
MaxSize = maxSize;
}
internal void Add(T item)
{
Trace.WriteLine(string.Format("BlockingCollection add waiting: {0}", Thread.CurrentThread.ManagedThreadId));
_FullEvent.WaitOne();
List.Add(item);
Trace.WriteLine(string.Format("BlockingCollection item added: {0}", Thread.CurrentThread.ManagedThreadId));
checkSize();
}
internal void Remove(T item)
{
lock (List)
{
List.Remove(item);
}
Trace.WriteLine(string.Format("BlockingCollection item removed: {0}", Thread.CurrentThread.ManagedThreadId));
}
protected override void OnRemoveComplete(int index, object value)
{
checkSize();
base.OnRemoveComplete(index, value);
}
internal new IEnumerator GetEnumerator()
{
return List.GetEnumerator();
}
private void checkSize()
{
if (Count < MaxSize)
{
Trace.WriteLine(string.Format("BlockingCollection FullEvent set: {0}", Thread.CurrentThread.ManagedThreadId));
_FullEvent.Set();
}
else
{
Trace.WriteLine(string.Format("BlockingCollection FullEvent reset: {0}", Thread.CurrentThread.ManagedThreadId));
_FullEvent.Reset();
}
}
}
采纳答案by Marc Gravell
That looks very unsafe (very little synchronization); how about something like:
这看起来很不安全(很少同步);怎么样:
class SizeQueue<T>
{
private readonly Queue<T> queue = new Queue<T>();
private readonly int maxSize;
public SizeQueue(int maxSize) { this.maxSize = maxSize; }
public void Enqueue(T item)
{
lock (queue)
{
while (queue.Count >= maxSize)
{
Monitor.Wait(queue);
}
queue.Enqueue(item);
if (queue.Count == 1)
{
// wake up any blocked dequeue
Monitor.PulseAll(queue);
}
}
}
public T Dequeue()
{
lock (queue)
{
while (queue.Count == 0)
{
Monitor.Wait(queue);
}
T item = queue.Dequeue();
if (queue.Count == maxSize - 1)
{
// wake up any blocked enqueue
Monitor.PulseAll(queue);
}
return item;
}
}
}
(edit)
(编辑)
In reality, you'd want a way to close the queue so that readers start exiting cleanly - perhaps something like a bool flag - if set, an empty queue just returns (rather than blocking):
实际上,您需要一种关闭队列的方法,以便读者开始干净地退出 - 可能类似于 bool 标志 - 如果设置,则空队列将返回(而不是阻塞):
bool closing;
public void Close()
{
lock(queue)
{
closing = true;
Monitor.PulseAll(queue);
}
}
public bool TryDequeue(out T value)
{
lock (queue)
{
while (queue.Count == 0)
{
if (closing)
{
value = default(T);
return false;
}
Monitor.Wait(queue);
}
value = queue.Dequeue();
if (queue.Count == maxSize - 1)
{
// wake up any blocked enqueue
Monitor.PulseAll(queue);
}
return true;
}
}
回答by DavidN
If you want maximum throughput, allowing multiple readers to read and only one writer to write, BCL has something called ReaderWriterLockSlim that should help slim down your code...
如果你想要最大的吞吐量,允许多个读者读取而只有一个作者写,BCL 有一个叫做 ReaderWriterLockSlim 的东西,它应该有助于精简你的代码......
回答by Vilx-
Well, you might look at System.Threading.Semaphore
class. Other than that - no, you have to make this yourself. AFAIK there is no such built-in collection.
好吧,你可以看看System.Threading.Semaphore
课堂。除此之外 - 不,你必须自己做这个。AFAIK 没有这样的内置集合。
回答by TheMissingLINQ
回答by Daniel Earwicker
"How can this be improved?"
“这怎么改善?”
Well, you need to look at every method in your class and consider what would happen if another thread was simultaneously calling that method or any other method. For example, you put a lock in the Remove method, but not in the Add method. What happens if one thread Adds at the same time as another thread Removes? Bad things.
好吧,您需要查看类中的每个方法,并考虑如果另一个线程同时调用该方法或任何其他方法会发生什么。例如,您在 Remove 方法中放置了一个锁,而不是在 Add 方法中放置了一个锁。如果一个线程在另一个线程删除的同时添加会发生什么?坏事。
Also consider that a method can return a second object that provides access to the first object's internal data - for example, GetEnumerator. Imagine one thread is going through that enumerator, another thread is modifying the list at the same time. Not good.
还要考虑一个方法可以返回提供对第一个对象的内部数据的访问的第二个对象 - 例如,GetEnumerator。想象一个线程正在通过该枚举器,同时另一个线程正在修改列表。不好。
A good rule of thumb is to make this simpler to get right by cutting down the number of methods in the class to the absolute minimum.
一个好的经验法则是通过将类中的方法数量减少到绝对最小值来使这更简单。
In particular, don't inherit another container class, because you will expose all of that class's methods, providing a way for the caller to corrupt the internal data, or to see partially complete changes to the data (just as bad, because the data appears corrupted at that moment). Hide all the details and be completely ruthless about how you allow access to them.
特别是,不要继承另一个容器类,因为您将公开该类的所有方法,为调用者提供了一种破坏内部数据的方法,或者查看对数据的部分完整更改(同样糟糕,因为数据在那一刻似乎已损坏)。隐藏所有细节,对如何允许访问它们完全无情。
I'd strongly advise you to use off-the-shelf solutions - get a book about threading or use 3rd party library. Otherwise, given what you're attempting, you're going to be debugging your code for a long time.
我强烈建议您使用现成的解决方案 - 获取一本关于线程的书或使用 3rd 方库。否则,根据您的尝试,您将要调试代码很长时间。
Also, wouldn't it make more sense for Remove to return an item (say, the one that was added first, as it's a queue), rather than the caller choosing a specific item? And when the queue is empty, perhaps Remove should also block.
此外,Remove 返回一个项目(例如,首先添加的项目,因为它是一个队列)而不是调用者选择特定项目是否更有意义?而当队列为空时,也许 Remove 也应该阻塞。
Update: Marc's answer actually implements all these suggestions! :) But I'll leave this here as it may be helpful to understand why his version is such an improvement.
更新:Marc 的回答实际上实现了所有这些建议!:) 但我将把它留在这里,因为它可能有助于理解为什么他的版本如此改进。
回答by Kevin
This is what I came op for a thread safe bounded blocking queue.
这就是我为线程安全的有界阻塞队列而来的。
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
public class BlockingBuffer<T>
{
private Object t_lock;
private Semaphore sema_NotEmpty;
private Semaphore sema_NotFull;
private T[] buf;
private int getFromIndex;
private int putToIndex;
private int size;
private int numItems;
public BlockingBuffer(int Capacity)
{
if (Capacity <= 0)
throw new ArgumentOutOfRangeException("Capacity must be larger than 0");
t_lock = new Object();
buf = new T[Capacity];
sema_NotEmpty = new Semaphore(0, Capacity);
sema_NotFull = new Semaphore(Capacity, Capacity);
getFromIndex = 0;
putToIndex = 0;
size = Capacity;
numItems = 0;
}
public void put(T item)
{
sema_NotFull.WaitOne();
lock (t_lock)
{
while (numItems == size)
{
Monitor.Pulse(t_lock);
Monitor.Wait(t_lock);
}
buf[putToIndex++] = item;
if (putToIndex == size)
putToIndex = 0;
numItems++;
Monitor.Pulse(t_lock);
}
sema_NotEmpty.Release();
}
public T take()
{
T item;
sema_NotEmpty.WaitOne();
lock (t_lock)
{
while (numItems == 0)
{
Monitor.Pulse(t_lock);
Monitor.Wait(t_lock);
}
item = buf[getFromIndex++];
if (getFromIndex == size)
getFromIndex = 0;
numItems--;
Monitor.Pulse(t_lock);
}
sema_NotFull.Release();
return item;
}
}
回答by Mark Rendle
I just knocked this up using the Reactive Extensions and remembered this question:
我刚刚使用 Reactive Extensions 解决了这个问题,并记住了这个问题:
public class BlockingQueue<T>
{
private readonly Subject<T> _queue;
private readonly IEnumerator<T> _enumerator;
private readonly object _sync = new object();
public BlockingQueue()
{
_queue = new Subject<T>();
_enumerator = _queue.GetEnumerator();
}
public void Enqueue(T item)
{
lock (_sync)
{
_queue.OnNext(item);
}
}
public T Dequeue()
{
_enumerator.MoveNext();
return _enumerator.Current;
}
}
Not necessarily entirely safe, but very simple.
不一定完全安全,但非常简单。
回答by xhafan
Use .net 4 BlockingCollection, to enqueue use Add(), to dequeue use Take(). It internally uses non-blocking ConcurrentQueue. More info here Fast and Best Producer/consumer queue technique BlockingCollection vs concurrent Queue
使用.net 4 BlockingCollection,入队使用Add(),出队使用Take()。它在内部使用非阻塞 ConcurrentQueue。更多信息在这里快速和最佳生产者/消费者队列技术 BlockingCollection vs concurrent Queue
回答by Andreas
You can use the BlockingCollectionand ConcurrentQueuein the System.Collections.Concurrent Namespace
您可以在 System.Collections.Concurrent 命名空间中使用BlockingCollection和ConcurrentQueue
public class ProducerConsumerQueue<T> : BlockingCollection<T>
{
/// <summary>
/// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
/// </summary>
public ProducerConsumerQueue()
: base(new ConcurrentQueue<T>())
{
}
/// <summary>
/// Initializes a new instance of the ProducerConsumerQueue, Use Add and TryAdd for Enqueue and TryEnqueue and Take and TryTake for Dequeue and TryDequeue functionality
/// </summary>
/// <param name="maxSize"></param>
public ProducerConsumerQueue(int maxSize)
: base(new ConcurrentQueue<T>(), maxSize)
{
}
}