C# 具有异步 lambda 的并行 foreach

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/15136542/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-10 14:12:38  来源:igfitidea点击:

Parallel foreach with asynchronous lambda

c#async-awaittask-parallel-libraryparallel.foreach

提问by clausndk

I would like to handle a collection in parallel, but I'm having trouble implementing it and I'm therefore hoping for some help.

我想并行处理一个集合,但我在实现它时遇到了麻烦,因此我希望得到一些帮助。

The trouble arises if I want to call a method marked async in C#, within the lambda of the parallel loop. For example:

如果我想在并行循环的 lambda 中调用 C# 中标记为 async 的方法,就会出现问题。例如:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

The problem occurs with the count being 0, because all the threads created are effectively just background threads and the Parallel.ForEachcall doesn't wait for completion. If I remove the async keyword, the method looks like this:

计数为 0 时会出现问题,因为创建的所有线程实际上只是后台线程,并且Parallel.ForEach调用不会等待完成。如果我删除 async 关键字,则该方法如下所示:

var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
  // some pre stuff
  var responseTask = await GetData(item);
  responseTask.Wait();
  var response = responseTask.Result;
  bag.Add(response);
  // some post stuff
}
var count = bag.Count;

It works, but it completely disables the await cleverness and I have to do some manual exception handling.. (Removed for brevity).

它有效,但它完全禁用了等待智能,我必须进行一些手动异常处理..(为简洁起见已删除)。

How can I implement a Parallel.ForEachloop, that uses the await keyword within the lambda? Is it possible?

如何实现Parallel.ForEach在 lambda 中使用 await 关键字的循环?是否可以?

The prototype of the Parallel.ForEach method takes an Action<T>as parameter, but I want it to wait for my asynchronous lambda.

Parallel.ForEach 方法的原型采用一个Action<T>作为参数,但我希望它等待我的异步 lambda。

采纳答案by Stephen Cleary

If you just want simple parallelism, you can do this:

如果你只想要简单的并行性,你可以这样做:

var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;

If you need something more complex, check out Stephen Toub's ForEachAsyncpost.

如果您需要更复杂的东西,请查看Stephen Toub 的ForEachAsync帖子

回答by Serge Semenov

You can use the ParallelForEachAsyncextension method from AsyncEnumerator NuGet Package:

您可以使用AsyncEnumerator NuGet Package 中ParallelForEachAsync扩展方法:

using Dasync.Collections;

var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
  // some pre stuff
  var response = await GetData(item);
  bag.Add(response);
  // some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;

回答by Jay Shah

I've created an extension method for this which makes use of SemaphoreSlim and also allows to set maximum degree of parallelism

我为此创建了一个扩展方法,它使用 SemaphoreSlim 并允许设置最大并行度

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

Sample Usage:

示例用法:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);

回答by nicolay.anykienko

My lightweight implementation of ParallelForEach async.

我的 ParallelForEach 异步轻量级实现。

Features:

特征:

  1. Throttling (max degree of parallelism).
  2. Exception handling (aggregation exception will be thrown at completion).
  3. Memory efficient (no need to store the list of tasks).
  1. 节流(最大并行度)。
  2. 异常处理(聚合异常将在完成时抛出)。
  3. 内存高效(无需存储任务列表)。

public static class AsyncEx
{
    public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
    {
        var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
        var tcs = new TaskCompletionSource<object>();
        var exceptions = new ConcurrentBag<Exception>();
        bool addingCompleted = false;

        foreach (T item in source)
        {
            await semaphoreSlim.WaitAsync();
            asyncAction(item).ContinueWith(t =>
            {
                semaphoreSlim.Release();

                if (t.Exception != null)
                {
                    exceptions.Add(t.Exception);
                }

                if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
                {
                    tcs.SetResult(null);
                }
            });
        }

        Volatile.Write(ref addingCompleted, true);
        await tcs.Task;
        if (exceptions.Count > 0)
        {
            throw new AggregateException(exceptions);
        }
    }
}

Usage example:

用法示例:

await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
    var data = await GetData(i);
}, maxDegreeOfParallelism: 100);

回答by Felipe l

With SemaphoreSlimyou can achieve parallelism control.

SemaphoreSlim你可以实现并行控制。

var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
  try
  {
     await throttler.WaitAsync();
     var response = await GetData(item);
     bag.Add(response);
  }
  finally
  {
     throttler.Release();
  }
});
await Task.WhenAll(tasks);
var count = bag.Count;