C# 具有异步 lambda 的并行 foreach
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/15136542/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Parallel foreach with asynchronous lambda
提问by clausndk
I would like to handle a collection in parallel, but I'm having trouble implementing it and I'm therefore hoping for some help.
我想并行处理一个集合,但我在实现它时遇到了麻烦,因此我希望得到一些帮助。
The trouble arises if I want to call a method marked async in C#, within the lambda of the parallel loop. For example:
如果我想在并行循环的 lambda 中调用 C# 中标记为 async 的方法,就会出现问题。例如:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}
var count = bag.Count;
The problem occurs with the count being 0, because all the threads created are effectively just background threads and the Parallel.ForEach
call doesn't wait for completion. If I remove the async keyword, the method looks like this:
计数为 0 时会出现问题,因为创建的所有线程实际上只是后台线程,并且Parallel.ForEach
调用不会等待完成。如果我删除 async 关键字,则该方法如下所示:
var bag = new ConcurrentBag<object>();
Parallel.ForEach(myCollection, item =>
{
// some pre stuff
var responseTask = await GetData(item);
responseTask.Wait();
var response = responseTask.Result;
bag.Add(response);
// some post stuff
}
var count = bag.Count;
It works, but it completely disables the await cleverness and I have to do some manual exception handling.. (Removed for brevity).
它有效,但它完全禁用了等待智能,我必须进行一些手动异常处理..(为简洁起见已删除)。
How can I implement a Parallel.ForEach
loop, that uses the await keyword within the lambda? Is it possible?
如何实现Parallel.ForEach
在 lambda 中使用 await 关键字的循环?是否可以?
The prototype of the Parallel.ForEach method takes an Action<T>
as parameter, but I want it to wait for my asynchronous lambda.
Parallel.ForEach 方法的原型采用一个Action<T>
作为参数,但我希望它等待我的异步 lambda。
采纳答案by Stephen Cleary
If you just want simple parallelism, you can do this:
如果你只想要简单的并行性,你可以这样做:
var bag = new ConcurrentBag<object>();
var tasks = myCollection.Select(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
});
await Task.WhenAll(tasks);
var count = bag.Count;
If you need something more complex, check out Stephen Toub's ForEachAsync
post.
如果您需要更复杂的东西,请查看Stephen Toub 的ForEachAsync
帖子。
回答by Serge Semenov
You can use the ParallelForEachAsync
extension method from AsyncEnumerator NuGet Package:
您可以使用AsyncEnumerator NuGet Package 中的ParallelForEachAsync
扩展方法:
using Dasync.Collections;
var bag = new ConcurrentBag<object>();
await myCollection.ParallelForEachAsync(async item =>
{
// some pre stuff
var response = await GetData(item);
bag.Add(response);
// some post stuff
}, maxDegreeOfParallelism: 10);
var count = bag.Count;
回答by Jay Shah
I've created an extension method for this which makes use of SemaphoreSlim and also allows to set maximum degree of parallelism
我为此创建了一个扩展方法,它使用 SemaphoreSlim 并允许设置最大并行度
/// <summary>
/// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
/// Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
this IEnumerable<T> enumerable,
Func<T, Task> action,
int? maxDegreeOfParallelism = null)
{
if (maxDegreeOfParallelism.HasValue)
{
using (var semaphoreSlim = new SemaphoreSlim(
maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
{
var tasksWithThrottler = new List<Task>();
foreach (var item in enumerable)
{
// Increment the number of currently running tasks and wait if they are more than limit.
await semaphoreSlim.WaitAsync();
tasksWithThrottler.Add(Task.Run(async () =>
{
await action(item).ContinueWith(res =>
{
// action is completed, so decrement the number of currently running tasks
semaphoreSlim.Release();
});
}));
}
// Wait for all tasks to complete.
await Task.WhenAll(tasksWithThrottler.ToArray());
}
}
else
{
await Task.WhenAll(enumerable.Select(item => action(item)));
}
}
Sample Usage:
示例用法:
await enumerable.ForEachAsyncConcurrent(
async item =>
{
await SomeAsyncMethod(item);
},
5);
回答by nicolay.anykienko
My lightweight implementation of ParallelForEach async.
我的 ParallelForEach 异步轻量级实现。
Features:
特征:
- Throttling (max degree of parallelism).
- Exception handling (aggregation exception will be thrown at completion).
- Memory efficient (no need to store the list of tasks).
- 节流(最大并行度)。
- 异常处理(聚合异常将在完成时抛出)。
- 内存高效(无需存储任务列表)。
public static class AsyncEx
{
public static async Task ParallelForEachAsync<T>(this IEnumerable<T> source, Func<T, Task> asyncAction, int maxDegreeOfParallelism = 10)
{
var semaphoreSlim = new SemaphoreSlim(maxDegreeOfParallelism);
var tcs = new TaskCompletionSource<object>();
var exceptions = new ConcurrentBag<Exception>();
bool addingCompleted = false;
foreach (T item in source)
{
await semaphoreSlim.WaitAsync();
asyncAction(item).ContinueWith(t =>
{
semaphoreSlim.Release();
if (t.Exception != null)
{
exceptions.Add(t.Exception);
}
if (Volatile.Read(ref addingCompleted) && semaphoreSlim.CurrentCount == maxDegreeOfParallelism)
{
tcs.SetResult(null);
}
});
}
Volatile.Write(ref addingCompleted, true);
await tcs.Task;
if (exceptions.Count > 0)
{
throw new AggregateException(exceptions);
}
}
}
Usage example:
用法示例:
await Enumerable.Range(1, 10000).ParallelForEachAsync(async (i) =>
{
var data = await GetData(i);
}, maxDegreeOfParallelism: 100);
回答by Felipe l
With SemaphoreSlim
you can achieve parallelism control.
用SemaphoreSlim
你可以实现并行控制。
var bag = new ConcurrentBag<object>();
var maxParallel = 20;
var throttler = new SemaphoreSlim(initialCount: maxParallel);
var tasks = myCollection.Select(async item =>
{
try
{
await throttler.WaitAsync();
var response = await GetData(item);
bag.Add(response);
}
finally
{
throttler.Release();
}
});
await Task.WhenAll(tasks);
var count = bag.Count;