.NET 上的线程安全阻塞队列实现
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/801528/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Thread-safe blocking queue implementation on .NET
提问by Shrike
I'm looking for an implementation of thread-safe blocking queue for .NET. By "thread-safe blocking queue" I mean: - thread-safe access to a queue where Dequeue method call blocks a thread untill other thread puts (Enqueue) some value.
我正在为 .NET 寻找线程安全阻塞队列的实现。通过“线程安全阻塞队列”,我的意思是: - 对队列的线程安全访问,其中 Dequeue 方法调用阻塞一个线程,直到其他线程放置(入队)某个值。
By the moment I'v found this one: http://www.eggheadcafe.com/articles/20060414.asp(But it's for .NET 1.1).
到目前为止,我已经找到了这个:http://www.eggheadcafe.com/articles/20060414.asp (但它适用于 .NET 1.1)。
Could someone comment/criticize correctness of this implementation. Or suggest some another one. Thanks in advance.
有人可以评论/批评这个实现的正确性。或者推荐一些其他的。提前致谢。
采纳答案by Marc Gravell
How about this one Creating a blocking Queue in .NET?
在 .NET 中创建阻塞队列这个怎么样?
If you need it for .NET 1.1 (I wasn't sure from the question), just drop the generics and replace Twith object.
如果您需要它用于 .NET 1.1(我不确定这个问题),只需删除泛型并替换T为object.
回答by Sam Harwell
For the reference, .NET 4 introduces the System.Collections.Concurrent.BlockingCollection<T>type to address this. For non-blocking queue, you can use System.Collections.Concurrent.ConcurrentQueue<T>. Note that ConcurrentQueue<T>would likely be used as the underlying datastore for the BlockingCollection<T>for the OP's usage.
作为参考,.NET 4 引入了System.Collections.Concurrent.BlockingCollection<T>解决此问题的类型。对于非阻塞队列,您可以使用System.Collections.Concurrent.ConcurrentQueue<T>. 请注意,它ConcurrentQueue<T>可能会用作BlockingCollection<T>OP 使用的基础数据存储。
回答by Chad Grant
Queue.Synchronized http://msdn.microsoft.com/en-us/library/system.collections.queue.synchronized(VS.71).aspx
Queue.Synchronized http://msdn.microsoft.com/en-us/library/system.collections.queue.synchronized(VS.71).aspx
Is a starting point anyways, I've never used a Blocking Queue. Sorry for the not so relevant post.
无论如何都是一个起点,我从未使用过阻塞队列。很抱歉没有那么相关的帖子。
回答by Shrike
Yes, .NET4 contains concurrent collections. BTW, very very nice manual about Parallel Extensions from pfx team - http://www.microsoft.com/downloads/details.aspx?FamilyID=86b3d32b-ad26-4bb8-a3ae-c1637026c3ee&displaylang=en.
是的,.NET4 包含并发集合。顺便说一句,来自 pfx 团队的关于并行扩展的非常好的手册 - http://www.microsoft.com/downloads/details.aspx?FamilyID=86b3d32b-ad26-4bb8-a3ae-c1637026c3ee&displaylang=en。
pfx is also available for .net 3.5 as part of Reactive Extensions.
作为 Reactive Extensions 的一部分,pfx 也可用于 .net 3.5。
回答by Steven Padifeld
The Microsoft example is a good one but it is not encapsulated into a class. Also, it requires that the consumer thread is running in the MTA (because of the WaitAny call). There are some cases in which you may need to run in an STA (e.g., if you are doing COM interop). In these cases, WaitAny cannot be used.
Microsoft 示例是一个很好的示例,但它没有封装到一个类中。此外,它还要求使用者线程在 MTA 中运行(因为 WaitAny 调用)。在某些情况下,您可能需要在 STA 中运行(例如,如果您正在执行 COM 互操作)。在这些情况下,无法使用 WaitAny。
I have a simple blocking queue class that overcomes this issue here: http://element533.blogspot.com/2010/01/stoppable-blocking-queue-for-net.html
我有一个简单的阻塞队列类可以在这里解决这个问题:http: //element533.blogspot.com/2010/01/stoppable-blocking-queue-for-net.html
回答by CSharper
Microsoft has a pretty nice sample about this:
微软有一个很好的例子:
//Copyright (C) Microsoft Corporation. All rights reserved.
using System;
using System.Threading;
using System.Collections;
using System.Collections.Generic;
// The thread synchronization events are encapsulated in this
// class to allow them to easily be passed to the Consumer and
// Producer classes.
public class SyncEvents
{
public SyncEvents()
{
// AutoResetEvent is used for the "new item" event because
// we want this event to reset automatically each time the
// consumer thread responds to this event.
_newItemEvent = new AutoResetEvent(false);
// ManualResetEvent is used for the "exit" event because
// we want multiple threads to respond when this event is
// signaled. If we used AutoResetEvent instead, the event
// object would revert to a non-signaled state with after
// a single thread responded, and the other thread would
// fail to terminate.
_exitThreadEvent = new ManualResetEvent(false);
// The two events are placed in a WaitHandle array as well so
// that the consumer thread can block on both events using
// the WaitAny method.
_eventArray = new WaitHandle[2];
_eventArray[0] = _newItemEvent;
_eventArray[1] = _exitThreadEvent;
}
// Public properties allow safe access to the events.
public EventWaitHandle ExitThreadEvent
{
get { return _exitThreadEvent; }
}
public EventWaitHandle NewItemEvent
{
get { return _newItemEvent; }
}
public WaitHandle[] EventArray
{
get { return _eventArray; }
}
private EventWaitHandle _newItemEvent;
private EventWaitHandle _exitThreadEvent;
private WaitHandle[] _eventArray;
}
// The Producer class asynchronously (using a worker thread)
// adds items to the queue until there are 20 items.
public class Producer
{
public Producer(Queue<int> q, SyncEvents e)
{
_queue = q;
_syncEvents = e;
}
public void ThreadRun()
{
int count = 0;
Random r = new Random();
while (!_syncEvents.ExitThreadEvent.WaitOne(0, false))
{
lock (((ICollection)_queue).SyncRoot)
{
while (_queue.Count < 20)
{
_queue.Enqueue(r.Next(0, 100));
_syncEvents.NewItemEvent.Set();
count++;
}
}
}
Console.WriteLine("Producer thread: produced {0} items", count);
}
private Queue<int> _queue;
private SyncEvents _syncEvents;
}
// The Consumer class uses its own worker thread to consume items
// in the queue. The Producer class notifies the Consumer class
// of new items with the NewItemEvent.
public class Consumer
{
public Consumer(Queue<int> q, SyncEvents e)
{
_queue = q;
_syncEvents = e;
}
public void ThreadRun()
{
int count = 0;
while (WaitHandle.WaitAny(_syncEvents.EventArray) != 1)
{
lock (((ICollection)_queue).SyncRoot)
{
int item = _queue.Dequeue();
}
count++;
}
Console.WriteLine("Consumer Thread: consumed {0} items", count);
}
private Queue<int> _queue;
private SyncEvents _syncEvents;
}
public class ThreadSyncSample
{
private static void ShowQueueContents(Queue<int> q)
{
// Enumerating a collection is inherently not thread-safe,
// so it is imperative that the collection be locked throughout
// the enumeration to prevent the consumer and producer threads
// from modifying the contents. (This method is called by the
// primary thread only.)
lock (((ICollection)q).SyncRoot)
{
foreach (int i in q)
{
Console.Write("{0} ", i);
}
}
Console.WriteLine();
}
static void Main()
{
// Configure struct containing event information required
// for thread synchronization.
SyncEvents syncEvents = new SyncEvents();
// Generic Queue collection is used to store items to be
// produced and consumed. In this case 'int' is used.
Queue<int> queue = new Queue<int>();
// Create objects, one to produce items, and one to
// consume. The queue and the thread synchronization
// events are passed to both objects.
Console.WriteLine("Configuring worker threads...");
Producer producer = new Producer(queue, syncEvents);
Consumer consumer = new Consumer(queue, syncEvents);
// Create the thread objects for producer and consumer
// objects. This step does not create or launch the
// actual threads.
Thread producerThread = new Thread(producer.ThreadRun);
Thread consumerThread = new Thread(consumer.ThreadRun);
// Create and launch both threads.
Console.WriteLine("Launching producer and consumer threads...");
producerThread.Start();
consumerThread.Start();
// Let producer and consumer threads run for 10 seconds.
// Use the primary thread (the thread executing this method)
// to display the queue contents every 2.5 seconds.
for (int i = 0; i < 4; i++)
{
Thread.Sleep(2500);
ShowQueueContents(queue);
}
// Signal both consumer and producer thread to terminate.
// Both threads will respond because ExitThreadEvent is a
// manual-reset event--so it stays 'set' unless explicitly reset.
Console.WriteLine("Signaling threads to terminate...");
syncEvents.ExitThreadEvent.Set();
// Use Join to block primary thread, first until the producer thread
// terminates, then until the consumer thread terminates.
Console.WriteLine("main thread waiting for threads to finish...");
producerThread.Join();
consumerThread.Join();
}
}
回答by GregC
Please keep in mind that locking in calling code may be a better option if you have full control over it. Consider accessing your queue in a loop: you'll be needlessly acquiring locks multiple times, potentially incurring a performance penalty.
请记住,如果您可以完全控制调用代码,则锁定调用代码可能是更好的选择。考虑在循环中访问您的队列:您将不必要地多次获取锁,可能会导致性能下降。

