C# 在 Parallel.ForEach 中嵌套 await
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/11564506/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Nesting await in Parallel.ForEach
提问by Darthg8r
In a metro app, I need to execute a number of WCF calls. There are a significant number of calls to be made, so I need to do them in a parallel loop. The problem is that the parallel loop exits before the WCF calls are all complete.
在 Metro 应用程序中,我需要执行多个 WCF 调用。需要进行大量调用,因此我需要在并行循环中执行它们。问题是并行循环在 WCF 调用全部完成之前退出。
How would you refactor this to work as expected?
您将如何重构它以按预期工作?
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new System.Collections.Concurrent.BlockingCollection<Customer>();
Parallel.ForEach(ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
foreach ( var customer in customers )
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
采纳答案by svick
The whole idea behind Parallel.ForEach()is that you have a set of threads and each thread processes part of the collection. As you noticed, this doesn't work with async-await, where you want to release the thread for the duration of the async call.
背后的整个想法Parallel.ForEach()是你有一组线程,每个线程处理集合的一部分。正如您所注意到的,这不适用于async- await,您希望在异步调用期间释放线程。
You could “fix” that by blocking the ForEach()threads, but that defeats the whole point of async-await.
你可以“修复”,通过阻塞ForEach()线程,但失败的整点async- await。
What you could do is to use TPL Dataflowinstead of Parallel.ForEach(), which supports asynchronous Tasks well.
您可以做的是使用TPL Dataflow而不是Parallel.ForEach(),它Task很好地支持异步。
Specifically, your code could be written using a TransformBlockthat transforms each id into a Customerusing the asynclambda. This block can be configured to execute in parallel. You would link that block to an ActionBlockthat writes each Customerto the console.
After you set up the block network, you can Post()each id to the TransformBlock.
具体来说,您的代码可以使用写入TransformBlock是将每个ID为Customer使用async拉姆达。该块可以配置为并行执行。您可以将该块链接到ActionBlock将每个块写入Customer控制台的块。设置块网络后,您可以将Post()每个 id 添加到TransformBlock.
In code:
在代码中:
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
Although you probably want to limit the parallelism of the TransformBlockto some small constant. Also, you could limit the capacity of the TransformBlockand add the items to it asynchronously using SendAsync(), for example if the collection is too big.
尽管您可能希望将 的并行性限制TransformBlock为某个小常量。此外,您可以限制 的容量TransformBlock并将项目异步添加到其中SendAsync(),例如,如果集合太大。
As an added benefit when compared to your code (if it worked) is that the writing will start as soon as a single item is finished, and not wait until all of the processing is finished.
与您的代码(如果它有效)相比,一个额外的好处是写入将在单个项目完成后立即开始,而不是等到所有处理完成。
回答by Stephen Cleary
svick's answeris (as usual) excellent.
svick 的回答(像往常一样)非常好。
However, I find Dataflow to be more useful when you actually have large amounts of data to transfer. Or when you need an async-compatible queue.
但是,我发现当您实际上有大量数据要传输时,Dataflow 会更有用。或者当你需要一个async兼容的队列时。
In your case, a simpler solution is to just use the async-style parallelism:
在您的情况下,更简单的解决方案是仅使用async-style 并行性:
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customerTasks = ids.Select(i =>
{
ICustomerRepo repo = new CustomerRepo();
return repo.GetCustomer(i);
});
var customers = await Task.WhenAll(customerTasks);
foreach (var customer in customers)
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
回答by Ohad Schneider
Using DataFlow as svick suggested may be overkill, and Stephen's answer does not provide the means to control the concurrency of the operation. However, that can be achieved rather simply:
按照 svick 的建议使用 DataFlow 可能有点矫枉过正,Stephen 的回答没有提供控制操作并发性的方法。但是,这可以很简单地实现:
public static async Task RunWithMaxDegreeOfConcurrency<T>(
int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
var activeTasks = new List<Task>(maxDegreeOfConcurrency);
foreach (var task in collection.Select(taskFactory))
{
activeTasks.Add(task);
if (activeTasks.Count == maxDegreeOfConcurrency)
{
await Task.WhenAny(activeTasks.ToArray());
//observe exceptions here
activeTasks.RemoveAll(t => t.IsCompleted);
}
}
await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t =>
{
//observe exceptions in a manner consistent with the above
});
}
The ToArray()calls can be optimized by using an array instead of a list and replacing completed tasks, but I doubt it would make much of a difference in most scenarios. Sample usage per the OP's question:
该ToArray()呼叫可以通过使用数组,而不是一个列表,并更换完成的任务进行优化,但我怀疑这会挣很多在大多数情况下的差别。每个 OP 问题的示例用法:
RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
EDITFellow SO user and TPL wiz Eli Arbelpointed me to a related article from Stephen Toub. As usual, his implementation is both elegant and efficient:
EDITFellow SO 用户和 TPL 奇才Eli Arbel向我指出了Stephen Toub 的一篇相关文章。像往常一样,他的实现既优雅又高效:
public static Task ForEachAsync<T>(
this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current).ContinueWith(t =>
{
//observe exceptions
});
}));
}
回答by ofcoursedude
Wrap the Parallel.Foreachinto a Task.Run()and instead of the awaitkeyword use [yourasyncmethod].Result
将 the 包裹Parallel.Foreach成Task.Run()and 而不是await关键字 use[yourasyncmethod].Result
(you need to do the Task.Run thing to not block the UI thread)
(您需要执行 Task.Run 以不阻塞 UI 线程)
Something like this:
像这样的东西:
var yourForeachTask = Task.Run(() =>
{
Parallel.ForEach(ids, i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = repo.GetCustomer(i).Result;
customers.Add(cust);
});
});
await yourForeachTask;
回答by John Gietzen
This should be pretty efficient, and easier than getting the whole TPL Dataflow working:
这应该非常有效,而且比让整个 TPL 数据流工作更容易:
var customers = await ids.SelectAsync(async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
});
...
public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
var results = new List<TResult>();
var activeTasks = new HashSet<Task<TResult>>();
foreach (var item in source)
{
activeTasks.Add(selector(item));
if (activeTasks.Count >= maxDegreesOfParallelism)
{
var completed = await Task.WhenAny(activeTasks);
activeTasks.Remove(completed);
results.Add(completed.Result);
}
}
results.AddRange(await Task.WhenAll(activeTasks));
return results;
}
回答by Teoman shipahi
I am a little late to party but you may want to consider using GetAwaiter.GetResult() to run your async code in sync context but as paralled as below;
我参加聚会有点晚了,但您可能想考虑使用 GetAwaiter.GetResult() 在同步上下文中运行您的异步代码,但如下所示;
Parallel.ForEach(ids, i =>
{
ICustomerRepo repo = new CustomerRepo();
// Run this in thread which Parallel library occupied.
var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
customers.Add(cust);
});
回答by Serge Semenov
You can save effort with the new AsyncEnumerator NuGet Package, which didn't exist 4 years ago when the question was originally posted. It allows you to control the degree of parallelism:
您可以使用新的AsyncEnumerator NuGet Package节省工作量,该包在 4 年前最初发布问题时还不存在。它允许您控制并行度:
using System.Collections.Async;
...
await ids.ParallelForEachAsync(async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
},
maxDegreeOfParallelism: 10);
Disclaimer: I'm the author of the AsyncEnumerator library, which is open source and licensed under MIT, and I'm posting this message just to help the community.
免责声明:我是 AsyncEnumerator 库的作者,该库是开源的并在 MIT 许可下发布,我发布此消息只是为了帮助社区。
回答by Vitaliy Ulantikov
After introducing a bunch of helper methods, you will be able run parallel queries with this simple syntax:
在介绍了一堆辅助方法之后,您将能够使用以下简单的语法运行并行查询:
const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
.Split(DegreeOfParallelism)
.SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
.ConfigureAwait(false);
What happens here is: we split source collection into 10 chunks (.Split(DegreeOfParallelism)), then run 10 tasks each processing its items one by one (.SelectManyAsync(...)) and merge those back into a single list.
这里发生的事情是:我们将源集合分成 10 个块 ( .Split(DegreeOfParallelism)),然后运行 10 个任务,每个任务一个一个地处理其项目 ( .SelectManyAsync(...)) 并将它们合并回一个列表。
Worth mentioning there is a simpler approach:
值得一提的是,有一种更简单的方法:
double[] result2 = await Enumerable.Range(0, 1000000)
.Select(async i => await CalculateAsync(i).ConfigureAwait(false))
.WhenAll()
.ConfigureAwait(false);
But it needs a precaution: if you have a source collection that is too big, it will schedule a Taskfor every item right away, which may cause significant performance hits.
但它需要一个预防措施:如果您的源集合太大,它会立即Task为每个项目安排一个,这可能会导致显着的性能下降。
Extension methods used in examples above look as follows:
上述示例中使用的扩展方法如下所示:
public static class CollectionExtensions
{
/// <summary>
/// Splits collection into number of collections of nearly equal size.
/// </summary>
public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
{
if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));
List<T> source = src.ToList();
var sourceIndex = 0;
for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
{
var list = new List<T>();
int itemsLeft = source.Count - targetIndex;
while (slicesCount * list.Count < itemsLeft)
{
list.Add(source[sourceIndex++]);
}
yield return list;
}
}
/// <summary>
/// Takes collection of collections, projects those in parallel and merges results.
/// </summary>
public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
this IEnumerable<IEnumerable<T>> source,
Func<T, Task<TResult>> func)
{
List<TResult>[] slices = await source
.Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
.WhenAll()
.ConfigureAwait(false);
return slices.SelectMany(s => s);
}
/// <summary>Runs selector and awaits results.</summary>
public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
{
List<TResult> result = new List<TResult>();
foreach (TSource source1 in source)
{
TResult result1 = await selector(source1).ConfigureAwait(false);
result.Add(result1);
}
return result;
}
/// <summary>Wraps tasks with Task.WhenAll.</summary>
public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
{
return Task.WhenAll<TResult>(source);
}
}
回答by Jay Shah
An extension method for this which makes use of SemaphoreSlim and also allows to set maximum degree of parallelism
一种使用 SemaphoreSlim 并允许设置最大并行度的扩展方法
/// <summary>
/// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
/// Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
this IEnumerable<T> enumerable,
Func<T, Task> action,
int? maxDegreeOfParallelism = null)
{
if (maxDegreeOfParallelism.HasValue)
{
using (var semaphoreSlim = new SemaphoreSlim(
maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
{
var tasksWithThrottler = new List<Task>();
foreach (var item in enumerable)
{
// Increment the number of currently running tasks and wait if they are more than limit.
await semaphoreSlim.WaitAsync();
tasksWithThrottler.Add(Task.Run(async () =>
{
await action(item).ContinueWith(res =>
{
// action is completed, so decrement the number of currently running tasks
semaphoreSlim.Release();
});
}));
}
// Wait for all tasks to complete.
await Task.WhenAll(tasksWithThrottler.ToArray());
}
}
else
{
await Task.WhenAll(enumerable.Select(item => action(item)));
}
}
Sample Usage:
示例用法:
await enumerable.ForEachAsyncConcurrent(
async item =>
{
await SomeAsyncMethod(item);
},
5);

