C# 如何限制并发异步 I/O 操作的数量?
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/10806951/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
How to limit the amount of concurrent async I/O operations?
提问by Grief Coder
// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };
// now let's send HTTP requests to each of these URLs in parallel
urls.AsParallel().ForAll(async (url) => {
var client = new HttpClient();
var html = await client.GetStringAsync(url);
});
Here is the problem, it starts 1000+ simultaneous web requests. Is there an easy way to limit the concurrent amount of these async http requests? So that no more than 20 web pages are downloaded at any given time. How to do it in the most efficient manner?
这就是问题所在,它同时启动 1000 多个 Web 请求。有没有一种简单的方法来限制这些异步 http 请求的并发数量?以便在任何给定时间下载的网页不超过 20 个。如何以最有效的方式做到这一点?
采纳答案by Theo Yaung
You can definitely do this in the latest versions of async for .NET, using .NET 4.5 Beta. The previous post from 'usr' points to a good article written by Stephen Toub, but the less announced news is that the async semaphore actually made it into the Beta release of .NET 4.5
您绝对可以使用 .NET 4.5 Beta 在最新版本的 .NET 异步中执行此操作。来自 'usr' 的上一篇文章指向了 Stephen Toub 写的一篇好文章,但鲜为人知的消息是异步信号量实际上已进入 .NET 4.5 的 Beta 版
If you look at our beloved SemaphoreSlimclass (which you should be using since it's more performant than the original Semaphore), it now boasts the WaitAsync(...)series of overloads, with all of the expected arguments - timeout intervals, cancellation tokens, all of your usual scheduling friends :)
如果你看看我们心爱的SemaphoreSlim类(你应该使用它,因为它比原来的性能更好Semaphore),它现在拥有WaitAsync(...)一系列重载,带有所有预期的参数 - 超时间隔,取消标记,你所有常用的调度朋友: )
Stephen's also written a more recent blog post about the new .NET 4.5 goodies that came out with beta see What's New for Parallelism in .NET 4.5 Beta.
Stephen 最近还写了一篇关于新的 .NET 4.5 好东西的博客文章,这些好东西随测试版一起出现,请参阅.NET 4.5 Beta 中并行性的新功能。
Last, here's some sample code about how to use SemaphoreSlim for async method throttling:
最后,这里有一些关于如何使用 SemaphoreSlim 进行异步方法节流的示例代码:
public async Task MyOuterMethod()
{
// let's say there is a list of 1000+ URLs
var urls = { "http://google.com", "http://yahoo.com", ... };
// now let's send HTTP requests to each of these URLs in parallel
var allTasks = new List<Task>();
var throttler = new SemaphoreSlim(initialCount: 20);
foreach (var url in urls)
{
// do an async wait until we can schedule again
await throttler.WaitAsync();
// using Task.Run(...) to run the lambda in its own parallel
// flow on the threadpool
allTasks.Add(
Task.Run(async () =>
{
try
{
var client = new HttpClient();
var html = await client.GetStringAsync(url);
}
finally
{
throttler.Release();
}
}));
}
// won't get here until all urls have been put into tasks
await Task.WhenAll(allTasks);
// won't get here until all tasks have completed in some way
// (either success or exception)
}
Last, but probably a worthy mention is a solution that uses TPL-based scheduling. You can create delegate-bound tasks on the TPL that have not yet been started, and allow for a custom task scheduler to limit the concurrency. In fact, there's an MSDN sample for it here:
最后,但可能值得一提的是使用基于 TPL 的调度的解决方案。您可以在尚未启动的 TPL 上创建委托绑定任务,并允许自定义任务调度程序来限制并发。事实上,这里有一个 MSDN 示例:
See also TaskScheduler .
另请参阅TaskScheduler。
回答by scottm
Although 1000 tasks might be queued very quickly, the Parallel Tasks library can only handle concurrent tasks equal to the amount of CPU cores in the machine. That means that if you have a four-core machine, only 4 tasks will be executing at a given time (unless you lower the MaxDegreeOfParallelism).
尽管 1000 个任务可能会很快排队,但 Parallel Tasks 库只能处理与机器中 CPU 内核数量相等的并发任务。这意味着,如果您有一台四核机器,则在给定时间将只执行 4 个任务(除非您降低 MaxDegreeOfParallelism)。
回答by GregC
Parallel computations should be used for speeding up CPU-bound operations. Here we are talking about I/O bound operations. Your implementation should be purely async, unless you're overwhelming the busy single core on your multi-core CPU.
应该使用并行计算来加速 CPU 密集型操作。这里我们讨论的是 I/O 绑定操作。您的实现应该是纯粹的 async,除非您在多核 CPU 上压倒了繁忙的单核。
EDITI like the suggestion made by usr to use an "async semaphore" here.
编辑我喜欢 usr 提出的在这里使用“异步信号量”的建议。
回答by Sean U
Use MaxDegreeOfParallelism, which is an option you can specify in Parallel.ForEach():
使用MaxDegreeOfParallelism,这是您可以在 中指定的选项Parallel.ForEach():
var options = new ParallelOptions { MaxDegreeOfParallelism = 20 };
Parallel.ForEach(urls, options,
url =>
{
var client = new HttpClient();
var html = client.GetStringAsync(url);
// do stuff with html
});
回答by usr
Unfortunately, the .NET Framework is missing most important combinators for orchestrating parallel async tasks. There is no such thing built-in.
不幸的是,.NET Framework 缺少用于编排并行异步任务的最重要的组合器。没有内置这样的东西。
Look at the AsyncSemaphoreclass built by the most respectable Stephen Toub. What you want is called a semaphore, and you need an async version of it.
看看最受尊敬的 Stephen Toub 构建的AsyncSemaphore类。你想要的叫做信号量,你需要它的异步版本。
回答by vitidev
Theo Yaung example is nice, but there is a variant without list of waiting tasks.
Theo Yaung 的例子很好,但有一个没有等待任务列表的变体。
class SomeChecker
{
private const int ThreadCount=20;
private CountdownEvent _countdownEvent;
private SemaphoreSlim _throttler;
public Task Check(IList<string> urls)
{
_countdownEvent = new CountdownEvent(urls.Count);
_throttler = new SemaphoreSlim(ThreadCount);
return Task.Run( // prevent UI thread lock
async () =>{
foreach (var url in urls)
{
// do an async wait until we can schedule again
await _throttler.WaitAsync();
ProccessUrl(url); // NOT await
}
//instead of await Task.WhenAll(allTasks);
_countdownEvent.Wait();
});
}
private async Task ProccessUrl(string url)
{
try
{
var page = await new WebClient()
.DownloadStringTaskAsync(new Uri(url));
ProccessResult(page);
}
finally
{
_throttler.Release();
_countdownEvent.Signal();
}
}
private void ProccessResult(string page){/*....*/}
}
回答by deadlydog
Essentially you're going to want to create an Action or Task for each URL that you want to hit, put them in a List, and then process that list, limiting the number that can be processed in parallel.
本质上,您将要为要点击的每个 URL 创建一个操作或任务,将它们放在一个列表中,然后处理该列表,限制可以并行处理的数量。
My blog postshows how to do this both with Tasks and with Actions, and provides a sample project you can download and run to see both in action.
我的博客文章展示了如何使用 Tasks 和 Actions 执行此操作,并提供了一个示例项目,您可以下载并运行以查看两者的运行情况。
With Actions
有行动
If using Actions, you can use the built-in .Net Parallel.Invoke function. Here we limit it to running at most 20 threads in parallel.
如果使用操作,您可以使用内置的 .Net Parallel.Invoke 函数。在这里,我们将其限制为最多并行运行 20 个线程。
var listOfActions = new List<Action>();
foreach (var url in urls)
{
var localUrl = url;
// Note that we create the Task here, but do not start it.
listOfTasks.Add(new Task(() => CallUrl(localUrl)));
}
var options = new ParallelOptions {MaxDegreeOfParallelism = 20};
Parallel.Invoke(options, listOfActions.ToArray());
With Tasks
有任务
With Tasks there is no built-in function. However, you can use the one that I provide on my blog.
任务没有内置功能。但是,您可以使用我在博客上提供的那个。
/// <summary>
/// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
/// </summary>
/// <param name="tasksToRun">The tasks to run.</param>
/// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
{
await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
}
/// <summary>
/// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
/// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
/// </summary>
/// <param name="tasksToRun">The tasks to run.</param>
/// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
/// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
{
// Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
var tasks = tasksToRun.ToList();
using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
{
var postTaskTasks = new List<Task>();
// Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));
// Start running each task.
foreach (var task in tasks)
{
// Increment the number of tasks currently running and wait if too many are running.
await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
task.Start();
}
// Wait for all of the provided tasks to complete.
// We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
await Task.WhenAll(postTaskTasks.ToArray());
}
}
And then creating your list of Tasks and calling the function to have them run, with say a maximum of 20 simultaneous at a time, you could do this:
然后创建你的任务列表并调用函数让它们运行,假设一次最多 20 个同时运行,你可以这样做:
var listOfTasks = new List<Task>();
foreach (var url in urls)
{
var localUrl = url;
// Note that we create the Task here, but do not start it.
listOfTasks.Add(new Task(async () => await CallUrl(localUrl)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 20);
回答by Dogu Arslan
If you have an IEnumerable (ie. strings of URL s) and you want to do an I/O bound operation with each of these (ie. make an async http request) concurrently AND optionally you also want to set the maximum number of concurrent I/O requests in real time, here is how you can do that. This way you do not use thread pool et al, the method uses semaphoreslim to control max concurrent I/O requests similar to a sliding window pattern one request completes, leaves the semaphore and the next one gets in.
如果您有一个 IEnumerable(即 URL 字符串)并且您想同时对其中的每一个进行 I/O 绑定操作(即发出异步 http 请求),并且您还可以选择设置最大并发数实时 I/O 请求,这是您如何做到的。这样你就不用线程池等,该方法使用 semaphoreslim 来控制最大并发 I/O 请求,类似于滑动窗口模式,一个请求完成,离开信号量,下一个请求进入。
usage: await ForEachAsync(urlStrings, YourAsyncFunc, optionalMaxDegreeOfConcurrency);
用法:await ForEachAsync(urlStrings, YourAsyncFunc, optionalMaxDegreeOfConcurrency);
public static Task ForEachAsync<TIn>(
IEnumerable<TIn> inputEnumerable,
Func<TIn, Task> asyncProcessor,
int? maxDegreeOfParallelism = null)
{
int maxAsyncThreadCount = maxDegreeOfParallelism ?? DefaultMaxDegreeOfParallelism;
SemaphoreSlim throttler = new SemaphoreSlim(maxAsyncThreadCount, maxAsyncThreadCount);
IEnumerable<Task> tasks = inputEnumerable.Select(async input =>
{
await throttler.WaitAsync().ConfigureAwait(false);
try
{
await asyncProcessor(input).ConfigureAwait(false);
}
finally
{
throttler.Release();
}
});
return Task.WhenAll(tasks);
}
回答by Serge Semenov
There are a lot of pitfalls and direct use of a semaphore can be tricky in error cases, so I would suggest to use AsyncEnumerator NuGet Packageinstead of re-inventing the wheel:
有很多陷阱,在错误情况下直接使用信号量可能会很棘手,所以我建议使用AsyncEnumerator NuGet 包而不是重新发明轮子:
// let's say there is a list of 1000+ URLs
string[] urls = { "http://google.com", "http://yahoo.com", ... };
// now let's send HTTP requests to each of these URLs in parallel
await urls.ParallelForEachAsync(async (url) => {
var client = new HttpClient();
var html = await client.GetStringAsync(url);
}, maxDegreeOfParalellism: 20);
回答by No Refunds No Returns
Old question, new answer. @vitidev had a block of code that was reused almost intact in a project I reviewed. After discussing with a few colleagues one asked "Why don't you just use the built-in TPL methods?" ActionBlock looks like the winner there. https://msdn.microsoft.com/en-us/library/hh194773(v=vs.110).aspx. Probably won't end up changing any existing code but will definitely look to adopt this nuget and reuse Mr. Softy's best practice for throttled parallelism.
老问题,新答案。@vitidev 有一段代码,在我的一个项目中几乎完整地重用。和几位同事讨论后问:“为什么不直接使用内置的TPL方法?” ActionBlock 看起来像那里的赢家。 https://msdn.microsoft.com/en-us/library/hh194773(v=vs.110).aspx。可能最终不会更改任何现有代码,但肯定会采用此 nuget 并重用 Softy 先生的节流并行性最佳实践。

