C# 等待池线程完成

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/540078/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-04 07:21:23  来源:igfitidea点击:

Wait for pooled threads to complete

c#.netmultithreading

提问by Jesse Hallam

I'm sorry for a redundant question. However, I've found many solutions to my problem but none of them are very well explained. I'm hoping that it will be made clear, here.

我很抱歉一个多余的问题。但是,我找到了许多解决我的问题的方法,但没有一个得到很好的解释。我希望它会在这里说清楚。

My C# application's main thread spawns 1..n background workers using the ThreadPool. I wish for the original thread to lock until all of the workers have completed. I have researched the ManualResetEvent in particular but I'm not clear on it's use.

我的 C# 应用程序的主线程使用 ThreadPool 生成 1..n 个后台工作线程。我希望原始线程锁定,直到所有工人完成。我特别研究了 ManualResetEvent,但我不清楚它的用途。

In pseudo:

在伪:

foreach( var o in collection )
{
  queue new worker(o);
}

while( workers not completed ) { continue; }

If necessary, I will know the number of workers that are about to be queued before hand.

如果有必要,我会提前知道即将排队的工人数量。

采纳答案by JaredPar

Try this. The function takes in a list of Action delegates. It will add a ThreadPool worker entry for each item in the list. It will wait for every action to complete before returning.

尝试这个。该函数接受一个 Action 委托列表。它将为列表中的每个项目添加一个 ThreadPool 工作条目。它将等待每个动作完成后再返回。

public static void SpawnAndWait(IEnumerable<Action> actions)
{
    var list = actions.ToList();
    var handles = new ManualResetEvent[actions.Count()];
    for (var i = 0; i < list.Count; i++)
    {
        handles[i] = new ManualResetEvent(false);
        var currentAction = list[i];
        var currentHandle = handles[i];
        Action wrappedAction = () => { try { currentAction(); } finally { currentHandle.Set(); } };
        ThreadPool.QueueUserWorkItem(x => wrappedAction());
    }

    WaitHandle.WaitAll(handles);
}

回答by Marc Gravell

First, how long do the workers execute? pool threads should generally be used for short-lived tasks - if they are going to run for a while, consider manual threads.

首先,工人执行多长时间?池线程通常应用于短期任务 - 如果它们要运行一段时间,请考虑手动线程。

Re the problem; do you actually need to block the main thread? Can you use a callback instead? If so, something like:

重新问题;你真的需要阻塞主线程吗?你可以用回调代替吗?如果是这样,类似于:

int running = 1; // start at 1 to prevent multiple callbacks if
          // tasks finish faster than they are started
Action endOfThread = delegate {
    if(Interlocked.Decrement(ref running) == 0) {
        // ****run callback method****
    }
};
foreach(var o in collection)
{
    var tmp = o; // avoid "capture" issue
    Interlocked.Increment(ref running);
    ThreadPool.QueueUserWorkItem(delegate {
        DoSomeWork(tmp); // [A] should handle exceptions internally
        endOfThread();
    });
}
endOfThread(); // opposite of "start at 1"

This is a fairly lightweight (no OS primitives) way of tracking the workers.

这是一种相当轻量级(没有操作系统原语)的跟踪工作人员的方式。

If you needto block, you can do the same using a Monitor(again, avoiding an OS object):

如果您需要阻止,您可以使用 a Monitor(再次避免 OS 对象)来执行相同的操作:

    object syncLock = new object();
    int running = 1;
    Action endOfThread = delegate {
        if (Interlocked.Decrement(ref running) == 0) {
            lock (syncLock) {
                Monitor.Pulse(syncLock);
            }
        }
    };
    lock (syncLock) {
        foreach (var o in collection) {
            var tmp = o; // avoid "capture" issue
            ThreadPool.QueueUserWorkItem(delegate
            {
                DoSomeWork(tmp); // [A] should handle exceptions internally
                endOfThread();
            });
        }
        endOfThread();
        Monitor.Wait(syncLock);
    }
    Console.WriteLine("all done");

回答by James

I think you were on the right track with the ManualResetEvent. This linkhas a code sample that closely matches what your trying to do. The key is to use the WaitHandle.WaitAll and pass an array of wait events. Each thread needs to set one of these wait events.

我认为您在使用 ManualResetEvent 时走在正确的轨道上。此链接具有与您尝试执行的操作非常匹配的代码示例。关键是使用 WaitHandle.WaitAll 并传递等待事件数组。每个线程都需要设置这些等待事件之一。

   // Simultaneously calculate the terms.
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateBase));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateFirstTerm));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateSecondTerm));
    ThreadPool.QueueUserWorkItem(
        new WaitCallback(CalculateThirdTerm));

    // Wait for all of the terms to be calculated.
    WaitHandle.WaitAll(autoEvents);

    // Reset the wait handle for the next calculation.
    manualEvent.Reset();

Edit:

编辑:

Make sure that in your worker thread code path you set the event (i.e. autoEvents1.Set();). Once they are all signaled the waitAll will return.

确保在您的工作线程代码路径中设置了事件(即 autoEvents 1.Set();)。一旦它们都发出信号,waitAll 将返回。

void CalculateSecondTerm(object stateInfo)
{
    double preCalc = randomGenerator.NextDouble();
    manualEvent.WaitOne();
    secondTerm = preCalc * baseNumber * 
        randomGenerator.NextDouble();
    autoEvents[1].Set();
}

回答by Marc Gravell

Here's a different approach - encapsulation; so your code could be as simple as:

这是一种不同的方法 - 封装;所以你的代码可以很简单:

    Forker p = new Forker();
    foreach (var obj in collection)
    {
        var tmp = obj;
        p.Fork(delegate { DoSomeWork(tmp); });
    }
    p.Join();

Where the Forkerclass is given below (I got bored on the train ;-p)... again, this avoids OS objects, but wraps things up quite neatly (IMO):

Forker下面给出了课程的位置(我在火车上感到无聊;-p)......再次,这避免了操作系统对象,但非常整洁(IMO):

using System;
using System.Threading;

/// <summary>Event arguments representing the completion of a parallel action.</summary>
public class ParallelEventArgs : EventArgs
{
    private readonly object state;
    private readonly Exception exception;
    internal ParallelEventArgs(object state, Exception exception)
    {
        this.state = state;
        this.exception = exception;
    }

    /// <summary>The opaque state object that identifies the action (null otherwise).</summary>
    public object State { get { return state; } }

    /// <summary>The exception thrown by the parallel action, or null if it completed without exception.</summary>
    public Exception Exception { get { return exception; } }
}

/// <summary>Provides a caller-friendly wrapper around parallel actions.</summary>
public sealed class Forker
{
    int running;
    private readonly object joinLock = new object(), eventLock = new object();

    /// <summary>Raised when all operations have completed.</summary>
    public event EventHandler AllComplete
    {
        add { lock (eventLock) { allComplete += value; } }
        remove { lock (eventLock) { allComplete -= value; } }
    }
    private EventHandler allComplete;
    /// <summary>Raised when each operation completes.</summary>
    public event EventHandler<ParallelEventArgs> ItemComplete
    {
        add { lock (eventLock) { itemComplete += value; } }
        remove { lock (eventLock) { itemComplete -= value; } }
    }
    private EventHandler<ParallelEventArgs> itemComplete;

    private void OnItemComplete(object state, Exception exception)
    {
        EventHandler<ParallelEventArgs> itemHandler = itemComplete; // don't need to lock
        if (itemHandler != null) itemHandler(this, new ParallelEventArgs(state, exception));
        if (Interlocked.Decrement(ref running) == 0)
        {
            EventHandler allHandler = allComplete; // don't need to lock
            if (allHandler != null) allHandler(this, EventArgs.Empty);
            lock (joinLock)
            {
                Monitor.PulseAll(joinLock);
            }
        }
    }

    /// <summary>Adds a callback to invoke when each operation completes.</summary>
    /// <returns>Current instance (for fluent API).</returns>
    public Forker OnItemComplete(EventHandler<ParallelEventArgs> handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");
        ItemComplete += handler;
        return this;
    }

    /// <summary>Adds a callback to invoke when all operations are complete.</summary>
    /// <returns>Current instance (for fluent API).</returns>
    public Forker OnAllComplete(EventHandler handler)
    {
        if (handler == null) throw new ArgumentNullException("handler");
        AllComplete += handler;
        return this;
    }

    /// <summary>Waits for all operations to complete.</summary>
    public void Join()
    {
        Join(-1);
    }

    /// <summary>Waits (with timeout) for all operations to complete.</summary>
    /// <returns>Whether all operations had completed before the timeout.</returns>
    public bool Join(int millisecondsTimeout)
    {
        lock (joinLock)
        {
            if (CountRunning() == 0) return true;
            Thread.SpinWait(1); // try our luck...
            return (CountRunning() == 0) ||
                Monitor.Wait(joinLock, millisecondsTimeout);
        }
    }

    /// <summary>Indicates the number of incomplete operations.</summary>
    /// <returns>The number of incomplete operations.</returns>
    public int CountRunning()
    {
        return Interlocked.CompareExchange(ref running, 0, 0);
    }

    /// <summary>Enqueues an operation.</summary>
    /// <param name="action">The operation to perform.</param>
    /// <returns>The current instance (for fluent API).</returns>
    public Forker Fork(ThreadStart action) { return Fork(action, null); }

    /// <summary>Enqueues an operation.</summary>
    /// <param name="action">The operation to perform.</param>
    /// <param name="state">An opaque object, allowing the caller to identify operations.</param>
    /// <returns>The current instance (for fluent API).</returns>
    public Forker Fork(ThreadStart action, object state)
    {
        if (action == null) throw new ArgumentNullException("action");
        Interlocked.Increment(ref running);
        ThreadPool.QueueUserWorkItem(delegate
        {
            Exception exception = null;
            try { action(); }
            catch (Exception ex) { exception = ex;}
            OnItemComplete(state, exception);
        });
        return this;
    }
}

回答by Gordon Thompson

I've found a good solution here :

我在这里找到了一个很好的解决方案:

http://msdn.microsoft.com/en-us/magazine/cc163914.aspx

http://msdn.microsoft.com/en-us/magazine/cc163914.aspx

May come in handy for others with the same issue

可能对有同样问题的其他人有用

回答by Joseph Kingry

Using .NET 4.0 Barrier class:

使用 .NET 4.0 Barrier 类:

        Barrier sync = new Barrier(1);

        foreach(var o in collection)
        {
            WaitCallback worker = (state) => 
            {
                // do work
                sync.SignalAndWait();
            };

            sync.AddParticipant();
            ThreadPool.QueueUserWorkItem(worker, o);
        }

        sync.SignalAndWait();

回答by Joseph Kingry

I have been using the new Parallel task library in CTP here:

我在这里一直在 CTP 中使用新的 Parallel 任务库:

       Parallel.ForEach(collection, o =>
            {
                DoSomeWork(o);
            });

回答by Brian Gideon

Here is a solution using the CountdownEventclass.

这是使用CountdownEvent该类的解决方案。

var complete = new CountdownEvent(1);
foreach (var o in collection)
{
  var capture = o;
  ThreadPool.QueueUserWorkItem((state) =>
    {
      try
      {
        DoSomething(capture);
      }
      finally
      {
        complete.Signal();
      }
    }, null);
}
complete.Signal();
complete.Wait();

Of course, if you have access to the CountdownEventclass then you have the whole TPL to work with. The Parallelclass takes care of the waiting for you.

当然,如果您可以访问CountdownEvent该类,那么您就可以使用整个 TPL。本Parallel类负责等待的为您服务。

Parallel.ForEach(collection, o =>
  {
    DoSomething(o);
  });

回答by nikoo28

Try using CountdownEvent

尝试使用 CountdownEvent

// code before the threads start

CountdownEvent countdown = new CountdownEvent(collection.Length);

foreach (var o in collection)
{
    ThreadPool.QueueUserWorkItem(delegate
    {
        // do something with the worker
        Console.WriteLine("Thread Done!");
        countdown.Signal();
    });
}
countdown.Wait();

Console.WriteLine("Job Done!");

// resume the code here

The countdown would wait until all threads have finished execution.

倒计时将等到所有线程完成执行。