限制 C# 中的并行线程数
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/8853907/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Limit the number of parallel threads in C#
提问by ca9163d9
I am writing a C# program to generate and upload a half million files via FTP. I want to process 4 files in parallel since the machine have 4 cores and the file generating takes much longer time. Is it possible to convert the following Powershell example to C#? Or is there any better framework such as Actor framework in C# (like F# MailboxProcessor)?
我正在编写一个 C# 程序来通过 FTP 生成和上传五十万个文件。我想并行处理 4 个文件,因为机器有 4 个内核,文件生成需要更长的时间。是否可以将以下 Powershell 示例转换为 C#?或者是否有更好的框架,例如 C# 中的 Actor 框架(如 F# MailboxProcessor)?
$maxConcurrentJobs = 3;
# Read the input and queue it up
$jobInput = get-content .\input.txt
$queue = [System.Collections.Queue]::Synchronized( (New-Object System.Collections.Queue) )
foreach($item in $jobInput)
{
$queue.Enqueue($item)
}
# Function that pops input off the queue and starts a job with it
function RunJobFromQueue
{
if( $queue.Count -gt 0)
{
$j = Start-Job -ScriptBlock {param($x); Get-WinEvent -LogName $x} -ArgumentList $queue.Dequeue()
Register-ObjectEvent -InputObject $j -EventName StateChanged -Action { RunJobFromQueue; Unregister-Event $eventsubscriber.SourceIdentifier; Remove-Job $eventsubscriber.SourceIdentifier } | Out-Null
}
}
# Start up to the max number of concurrent jobs
# Each job will take care of running the rest
for( $i = 0; $i -lt $maxConcurrentJobs; $i++ )
{
RunJobFromQueue
}
Update:
The connection to remote FTP server can be slow so I want to limit the FTP uploading processing.
更新:
到远程 FTP 服务器的连接可能很慢,所以我想限制 FTP 上传处理。
采纳答案by Austin Salonen
Assuming you're building this with the TPL, you can set the ParallelOptions.MaxDegreesOfParallelismto whatever you want it to be.
假设您使用 TPL 构建它,您可以将ParallelOptions.MaxDegreesOfParallelism设置为您想要的任何值。
Parallel.Forfor a code example.
Parallel.For代码示例。
回答by Giorgio Minardi
If you are using .Net 4.0 you can use the Parallel library
如果您使用的是 .Net 4.0,则可以使用Parallel 库
Supposing you're iterating throug the half million of files you can "parallel" the iteration using a Parallel Foreach for instanceor you can have a look to PLinqHere a comparison between the two
假设您正在迭代 50 万个文件,您可以使用Parallel Foreach来“并行”迭代,或者您可以查看 PLinqHere a比较两者
回答by Jeb
Task Parallel Library is your friend here. See thislink which describes what's available to you. Basically framework 4 comes with it which optimises these essentially background thread pooled threads to the number of processors on the running machine.
任务并行库是您的朋友。请参阅此链接,其中描述了您可以使用的内容。基本上,框架 4 随附了它,它将这些本质上是后台线程池的线程优化为正在运行的机器上的处理器数量。
Perhaps something along the lines of:
也许是这样的:
ParallelOptions options = new ParallelOptions();
options.MaxDegreeOfParallelism = 4;
Then in your loop something like:
然后在你的循环中是这样的:
Parallel.Invoke(options,
() => new WebClient().Upload("http://www.linqpad.net", "lp.html"),
() => new WebClient().Upload("http://www.jaoo.dk", "jaoo.html"));
回答by deadlydog
Essentially you're going to want to create an Action or Task for each file to upload, put them in a List, and then process that list, limiting the number that can be processed in parallel.
本质上,您要为每个要上传的文件创建一个操作或任务,将它们放入一个列表中,然后处理该列表,限制可以并行处理的数量。
My blog postshows how to do this both with Tasks and with Actions, and provides a sample project you can download and run to see both in action.
我的博客文章展示了如何使用 Tasks 和 Actions 执行此操作,并提供了一个示例项目,您可以下载并运行以查看两者的运行情况。
With Actions
有行动
If using Actions, you can use the built-in .Net Parallel.Invoke function. Here we limit it to running at most 4 threads in parallel.
如果使用操作,您可以使用内置的 .Net Parallel.Invoke 函数。这里我们限制它最多并行运行 4 个线程。
var listOfActions = new List<Action>();
foreach (var file in files)
{
var localFile = file;
// Note that we create the Task here, but do not start it.
listOfTasks.Add(new Task(() => UploadFile(localFile)));
}
var options = new ParallelOptions {MaxDegreeOfParallelism = 4};
Parallel.Invoke(options, listOfActions.ToArray());
This option doesn't support async though, and I'm assuming you're FileUpload function will be, so you might want to use the Task example below.
不过这个选项不支持异步,我假设你是 FileUpload 函数,所以你可能想要使用下面的任务示例。
With Tasks
有任务
With Tasks there is no built-in function. However, you can use the one that I provide on my blog.
任务没有内置功能。但是,您可以使用我在博客上提供的那个。
/// <summary>
/// Starts the given tasks and waits for them to complete. This will run, at most, the specified number of tasks in parallel.
/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
/// </summary>
/// <param name="tasksToRun">The tasks to run.</param>
/// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, CancellationToken cancellationToken = new CancellationToken())
{
await StartAndWaitAllThrottledAsync(tasksToRun, maxTasksToRunInParallel, -1, cancellationToken);
}
/// <summary>
/// Starts the given tasks and waits for them to complete. This will run the specified number of tasks in parallel.
/// <para>NOTE: If a timeout is reached before the Task completes, another Task may be started, potentially running more than the specified maximum allowed.</para>
/// <para>NOTE: If one of the given tasks has already been started, an exception will be thrown.</para>
/// </summary>
/// <param name="tasksToRun">The tasks to run.</param>
/// <param name="maxTasksToRunInParallel">The maximum number of tasks to run in parallel.</param>
/// <param name="timeoutInMilliseconds">The maximum milliseconds we should allow the max tasks to run in parallel before allowing another task to start. Specify -1 to wait indefinitely.</param>
/// <param name="cancellationToken">The cancellation token.</param>
public static async Task StartAndWaitAllThrottledAsync(IEnumerable<Task> tasksToRun, int maxTasksToRunInParallel, int timeoutInMilliseconds, CancellationToken cancellationToken = new CancellationToken())
{
// Convert to a list of tasks so that we don't enumerate over it multiple times needlessly.
var tasks = tasksToRun.ToList();
using (var throttler = new SemaphoreSlim(maxTasksToRunInParallel))
{
var postTaskTasks = new List<Task>();
// Have each task notify the throttler when it completes so that it decrements the number of tasks currently running.
tasks.ForEach(t => postTaskTasks.Add(t.ContinueWith(tsk => throttler.Release())));
// Start running each task.
foreach (var task in tasks)
{
// Increment the number of tasks currently running and wait if too many are running.
await throttler.WaitAsync(timeoutInMilliseconds, cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
task.Start();
}
// Wait for all of the provided tasks to complete.
// We wait on the list of "post" tasks instead of the original tasks, otherwise there is a potential race condition where the throttler's using block is exited before some Tasks have had their "post" action completed, which references the throttler, resulting in an exception due to accessing a disposed object.
await Task.WhenAll(postTaskTasks.ToArray());
}
}
And then creating your list of Tasks and calling the function to have them run, with say a maximum of 4 simultaneous at a time, you could do this:
然后创建你的任务列表并调用函数让它们运行,假设一次最多 4 个同时运行,你可以这样做:
var listOfTasks = new List<Task>();
foreach (var file in files)
{
var localFile = file;
// Note that we create the Task here, but do not start it.
listOfTasks.Add(new Task(async () => await UploadFile(localFile)));
}
await Tasks.StartAndWaitAllThrottledAsync(listOfTasks, 4);
Also, because this method supports async, it will not block the UI thread like using Parallel.Invoke or Parallel.ForEach would.
此外,由于此方法支持异步,因此不会像使用 Parallel.Invoke 或 Parallel.ForEach 那样阻塞 UI 线程。
回答by mca
I have coded below technique where I use BlockingCollection as a thread count manager. It is quite simple to implement and handles the job. It simply accepts Task objects and add an integer value to blocking list, increasing running thread count by 1. When thread finishes, it dequeues the object and releases the block on add operation for upcoming tasks.
我编写了以下技术,其中我使用 BlockingCollection 作为线程计数管理器。实现和处理这项工作非常简单。它只是接受 Task 对象并将一个整数值添加到阻塞列表中,将正在运行的线程数增加 1。当线程完成时,它使对象出列并在添加操作时为即将到来的任务释放块。
public class BlockingTaskQueue
{
private BlockingCollection<int> threadManager { get; set; } = null;
public bool IsWorking
{
get
{
return threadManager.Count > 0 ? true : false;
}
}
public BlockingTaskQueue(int maxThread)
{
threadManager = new BlockingCollection<int>(maxThread);
}
public async Task AddTask(Task task)
{
Task.Run(() =>
{
Run(task);
});
}
private bool Run(Task task)
{
try
{
threadManager.Add(1);
task.Start();
task.Wait();
return true;
}
catch (Exception ex)
{
return false;
}
finally
{
threadManager.Take();
}
}
}

