C# 限制通过并行任务库运行的活动任务数量的最佳方法

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/11138927/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-09 16:39:34  来源:igfitidea点击:

Best way to limit the number of active Tasks running via the Parallel Task Library

c#.nettask-parallel-library

提问by Ryan

Consider a queue holding a lotof jobs that need processing. Limitation of queue is can only get 1 job at a time and no way of knowing how many jobs there are. The jobs take 10s to complete and involve a lot of waiting for responses from web services so is not CPU bound.

考虑一个队列,其中包含大量需要处理的作业。队列的限制是一次只能得到 1 个作业,并且无法知道有多少作业。这些作业需要 10 秒才能完成,并且需要大量等待来自 Web 服务的响应,因此不受 CPU 限制。

If I use something like this

如果我使用这样的东西

while (true)
{
   var job = Queue.PopJob();
   if (job == null)
      break;
   Task.Factory.StartNew(job.Execute); 
}

Then it will furiously pop jobs from the queue much faster than it can complete them, run out of memory and fall on its ass. >.<

然后它会以比完成它们更快的速度从队列中疯狂地弹出作业,耗尽内存并陷入困境。>.<

I can't use (I don't think) ParallelOptions.MaxDegreeOfParallelismbecause I can't use Parallel.Invoke or Parallel.ForEach

我不能使用(我不认为)ParallelOptions.MaxDegreeOfParallelism因为我不能使用 Parallel.Invoke 或 Parallel.ForEach

3 alternatives I've found

我找到了 3 个替代方案

  1. Replace Task.Factory.StartNew with

    Task task = new Task(job.Execute,TaskCreationOptions.LongRunning)
    task.Start();
    

    Which seems to somewhat solve the problem but I am not clear exactly what this is doingand if this is the best method.

  2. Create a custom task scheduler that limits the degree of concurrency

  3. Use something like BlockingCollectionto add jobs to collection when started and remove when finished to limit number that can be running.

  1. 将 Task.Factory.StartNew 替换为

    Task task = new Task(job.Execute,TaskCreationOptions.LongRunning)
    task.Start();
    

    这似乎在某种程度上解决了问题,但我不清楚这是在做什么,以及这是否是最好的方法。

  2. 创建限制并发度的自定义任务调度程序

  3. 使用BlockingCollection 之类的东西在开始时将作业添加到集合中,并在完成时删除以限制可以运行的数量。

With #1 I've got to trust that the right decision is automatically made, #2/#3 I've got to work out the max number of tasks that can be running myself.

#1 我必须相信正确的决定是自动做出的,#2/#3 我必须计算出自己可以运行的最大任务数。

Have I understood this correctly - which is the better way, or is there another way?

我是否正确理解了这一点 - 这是更好的方法,还是有另一种方法?

EDIT- This is what I've come up with from the answers below, producer-consumer pattern.

编辑- 这是我从下面的答案中得出的,生产者 - 消费者模式。

As well as overall throughput aim was not to dequeue jobs faster than could be processed and not have multiple threads polling queue (not shown here but thats a non-blocking op and will lead to huge transaction costs if polled at high frequency from multiple places).

以及总体吞吐量的目标不是使作业出队速度比处理速度快,并且没有多线程轮询队列(此处未显示,但这是一个非阻塞操作,如果从多个地方以高频率轮询,将导致巨大的事务成本) .

// BlockingCollection<>(1) will block if try to add more than 1 job to queue (no
// point in being greedy!), or is empty on take.
var BlockingCollection<Job> jobs = new BlockingCollection<Job>(1);

// Setup a number of consumer threads.
// Determine MAX_CONSUMER_THREADS empirically, if 4 core CPU and 50% of time
// in job is blocked waiting IO then likely be 8.
for(int numConsumers = 0; numConsumers < MAX_CONSUMER_THREADS; numConsumers++)
{
   Thread consumer = new Thread(() =>
   {
      while (!jobs.IsCompleted)
      {
         var job = jobs.Take();
         job.Execute();
      }
   }
   consumer.Start();
}

// Producer to take items of queue and put in blocking collection ready for processing
while (true)
{
    var job = Queue.PopJob();
    if (job != null)
       jobs.Add(job);
    else
    {
       jobs.CompletedAdding()
       // May need to wait for running jobs to finish
       break;
    }
}

采纳答案by usr

I just gave an answerwhich is very applicable to this question.

我刚刚给出了一个非常适用于这个问题的答案

Basically, the TPL Task class is made to schedule CPU-bound work. It is not made for blocking work.

基本上,TPL Task 类是用来调度 CPU 密集型工作的。它不是为阻止工作而设计的。

You are working with a resource that is not CPU: waiting for service replies. This means the TPL will mismange your resource because it assumes CPU boundedness to a certain degree.

您正在使用非 CPU 的资源:等待服务回复。这意味着 TPL 会错误地管理您的资源,因为它在一定程度上假定了 CPU 有界。

Manage the resources yourself: Start a fixed number of threads or LongRunning tasks (which is basically the same). Decide on the number of threads empirically.

自己管理资源:启动固定数量的线程或LongRunning任务(基本相同)。根据经验确定线程数。

You can't put unreliable systems into production.For that reason, I recommend #1 but throttled. Don't create as many threads as there are work items. Create as many threads which are needed to saturate the remote service. Write yourself a helper function which spawns N threads and uses them to process M work items. You get totally predictable and reliable results that way.

您不能将不可靠的系统投入生产。出于这个原因,我推荐 #1 但限制了. 不要创建与工作项一样多的线程。创建使远程服务饱和所需的尽可能多的线程。自己编写一个辅助函数,它产生 N 个线程并使用它们来处理 M 个工作项。您可以通过这种方式获得完全可预测且可靠的结果。

回答by svick

The problem here doesn't seem to be too many runningTasks, it's too many scheduledTasks. Your code will try to schedule as many Tasks as it can, no matter how fast they are executed. And if you have too many jobs, this means you will get OOM.

这里似乎问题并不太多运行TaskS,它是太多了预定Task秒。您的代码将尝试尽可能多地安排Tasks,无论它们执行得有多快。如果你有太多的工作,这意味着你会遇到 OOM。

Because of this, none of your proposed solutions will actually solve your problem. If it seems that simply specifying LongRunningsolves your problem, then that's most likely because creating a new Thread(which is what LongRunningdoes) takes some time, which effectively throttles getting new jobs. So, this solution only works by accident, and will most likely lead to other problems later on.

因此,您提出的任何解决方案都不会真正解决您的问题。如果看起来简单地指定LongRunning可以解决您的问题,那么这很可能是因为创建一个新的ThreadLongRunning确实如此)需要一些时间,这有效地限制了获得新工作。因此,此解决方案仅在偶然情况下起作用,并且很可能在以后导致其他问题。

Regarding the solution, I mostly agree with usr: the simplest solution that works reasonably well is to create a fixed number of LongRunningtasks and have one loop that calls Queue.PopJob()(protected by a lockif that method is not thread-safe) and Execute()s the job.

关于解决方案,我基本同意 usr:最简单的、运行良好的解决方案是创建固定数量的LongRunning任务并有一个循环调用Queue.PopJob()lock如果该方法不是线程安全的,则受保护)和Execute()作业。

UPDATE:After some more thinking, I realized the following attempt will most likely behave terribly. Use it only if you're really sure it will work well for you.

更新:经过更多思考,我意识到以下尝试很可能会表现得非常糟糕。仅当您确信它对您有用时才使用它。



But the TPL tries to figure out the best degree of parallelism, even for IO-bound Tasks. So, you might try to use that to your advantage. Long Tasks won't work here, because from the point of view of TPL, it seems like no work is done and it will start new Tasks over and over. What you can do instead is to start a new Taskat the end of each Task. This way, TPL will know what's going on and its algorithm may work well. Also, to let the TPL decide the degree of parallelism, at the start of a Taskthat is first in its line, start another line of Tasks.

但是 TPL 试图找出最佳的并行度,即使对于 IO-bound 也是如此Task。因此,您可以尝试利用它来发挥自己的优势。Long Tasks在这里不起作用,因为从TPL的角度来看,似乎没有工作完成,它会Task一遍又一遍地开始new s。相反,您可以做的是Task在每个Task. 这样,TPL 就会知道发生了什么,它的算法可能会运行良好。此外,为了让 TPL 决定并行度,Task在其行中的第一个 a 的开头,开始另一行Tasks。

This algorithm maywork well. But it's also possible that the TPL will make a bad decision regarding the degree of parallelism, I haven't actually tried anything like this.

这个算法可能工作得很好。但也有可能 TPL 会在并行度方面做出错误的决定,我实际上还没有尝试过这样的事情。

In code, it would look like this:

在代码中,它看起来像这样:

void ProcessJobs(bool isFirst)
{
    var job = Queue.PopJob(); // assumes PopJob() is thread-safe
    if (job == null)
        return;

    if (isFirst)
        Task.Factory.StartNew(() => ProcessJobs(true));

    job.Execute();

    Task.Factory.StartNew(() => ProcessJob(false));
}

And start it with

并开始

Task.Factory.StartNew(() => ProcessJobs(true));

回答by Maciej

TaskCreationOptions.LongRunningis useful for blocking tasks and using it here is legitimate. What it does is it suggests to the scheduler to dedicate a thread to the task. The scheduler itself tries to keep number of threads on same level as number of CPU cores to avoid excessive context switching.

TaskCreationOptions.LongRunning对于阻塞任务很有用,在这里使用它是合法的。它的作用是向调度程序建议将线程专用于任务。调度程序本身会尝试将线程数与 CPU 内核数保持在同一级别,以避免过度的上下文切换。

It is well described in Threading in C# by Joseph Albahari

Joseph Albahari 在 C#中的线程中对此进行了很好的描述

回答by Alon Catz

Microsoft has a very cool library called DataFlow which does exactly what you want (and much more). Details here.

Microsoft 有一个非常酷的库,称为 DataFlow,它可以完全满足您的需求(以及更多功能)。详情请看这里

You should use the ActionBlock class and set the MaxDegreeOfParallelism of the ExecutionDataflowBlockOptions object. ActionBlock plays nicely with async/await, so even when your external calls are awaited, no new jobs will begin processing.

您应该使用 ActionBlock 类并设置 ExecutionDataflowBlockOptions 对象的 MaxDegreeOfParallelism。ActionBlock 与 async/await 配合得很好,因此即使在等待外部调用时,也不会开始处理新作业。

ExecutionDataflowBlockOptions actionBlockOptions = new ExecutionDataflowBlockOptions
{
     MaxDegreeOfParallelism = 10
};

this.sendToAzureActionBlock = new ActionBlock<List<Item>>(async items => await ProcessItems(items),
            actionBlockOptions);
...
this.sendToAzureActionBlock.Post(itemsToProcess)

回答by MoonStom

Potential flow splits and continuations caused by await, later on in your code or in a 3rd party library, won't play nicely with long running tasks (or threads), so don't bother using long running tasks. In the async/awaitworld, they're useless. More details here.

await稍后在您的代码中或在第 3 方库中引起的潜在流拆分和延续不会很好地处理长时间运行的任务(或线程),因此不要打扰使用长时间运行的任务。在这个async/await世界上,它们毫无用处。更多细节在这里

You can call ThreadPool.SetMaxThreadsbut before you make this call, make sure you set the minimum number of threads with ThreadPool.SetMinThreads, using values below or equal to the max ones. And by the way, the MSDN documentation is wrong. You CAN go below the number of cores on your machine with those method calls, at least in .NET 4.5 and 4.6 where I used this technique to reduce the processing power of a memory limited 32 bit service.

您可以调用,ThreadPool.SetMaxThreads但在调用之前,请确保ThreadPool.SetMinThreads使用低于或等于最大值的值设置最小线程数。顺便说一句,MSDN 文档是错误的。您可以使用这些方法调用来减少机器上的内核数量,至少在 .NET 4.5 和 4.6 中,我使用这种技术来降低内存受限的 32 位服务的处理能力。

If however you don't wish to restrict the whole app but just the processing part of it, a custom task scheduler will do the job. A long time ago, MS released sampleswith several custom task schedulers, including a LimitedConcurrencyLevelTaskScheduler. Spawn the main processing task manually with Task.Factory.StartNew, providing the custom task scheduler, and every other task spawned by it will use it, including async/awaitand even Task.Yield, used for achieving asynchronousy early on in an asyncmethod.

但是,如果您不希望限制整个应用程序而只限制它的处理部分,自定义任务调度程序将完成这项工作。很久以前,MS 发布带有多个自定义任务调度程序的示例,其中包括LimitedConcurrencyLevelTaskScheduler. 使用 手动生成主要处理任务Task.Factory.StartNew,提供自定义任务调度程序,它生成的所有其他任务都将使用它,包括async/await甚至Task.Yield,用于在async方法的早期实现异步。

But for your particular case, both solutions won't stop exhausting your queue of jobs before completing them. That might not be desirable, depending on the implementation and purpose of that queue of yours. They are more like "fire a bunch of tasks and let the scheduler find the time to execute them" type of solutions. So perhaps something a bit more appropriate here could be a stricter method of control over the execution of the jobs via semaphores. The code would look like this:

但是对于您的特定情况,这两种解决方案都不会在完成之前耗尽您的作业队列。这可能是不可取的,这取决于您的队列的实现和目的。它们更像是“启动一堆任务并让调度程序找到时间执行它们”类型的解决方案。因此,也许这里更合适的方法可能是通过semaphores. 代码如下所示:

semaphore = new SemaphoreSlim(max_concurrent_jobs);

while(...){
 job = Queue.PopJob();
 semaphore.Wait();
 ProcessJobAsync(job);
}

async Task ProcessJobAsync(Job job){
 await Task.Yield();
 ... Process the job here...
 semaphore.Release();
}

There's more than one way to skin a cat. Use what you believe is appropriate.

给猫剥皮的方法不止一种。使用您认为合适的内容。

回答by long2know

I use a message queue/mailbox mechanism to achieve this. It's akin to the actor model. I have a class that has a MailBox. I call this class my "worker." It can receive messages. Those messages are queued and they, essentially, define tasks that I want the worker to run. The worker will use Task.Wait() for its Task to finish before dequeueing the next message and starting the next task.

我使用消息队列/邮箱机制来实现这一点。它类似于演员模型。我有一个带有邮箱的类。我称这个班级为我的“工人”。它可以接收消息。这些消息被排队,它们本质上定义了我希望工作人员运行的任务。工作人员将使用 Task.Wait() 使其任务在出列下一条消息并开始下一项任务之前完成。

By limiting the number of workers I have, I am able to limit the number of concurrent threads/tasks that are being run.

通过限制我拥有的工人数量,我能够限制正在运行的并发线程/任务的数量。

This is outlined, with source code, in my blog post on a distributed compute engine. If you look at the code for IActor and the WorkerNode, I hope it makes sense.

这在我关于分布式计算引擎的博客文章中用源代码进行了概述。如果你查看 IActor 和 WorkerNode 的代码,我希望它是有道理的。

https://long2know.com/2016/08/creating-a-distributed-computing-engine-with-the-actor-model-and-net-core/

https://long2know.com/2016/08/creating-a-distributed-computing-engine-with-the-actor-model-and-net-core/