在C#中使用多线程加速循环(问题)

时间:2020-03-06 14:25:03  来源:igfitidea点击:

想象一下,我有一个函数可以遍历一百万/十亿个字符串并检查其中的污点。

外汇:

foreach (String item in ListOfStrings)
{
    result.add(CalculateSmth(item));
}

它消耗大量时间,因为CalculateSmth是非常耗时的函数。

我想问:如何在这种过程中集成多线程?

f.ex:我想启动5个线程,每个线程返回一些结果,然后继续进行,直到列表中有项目为止。

也许任何人都可以展示一些示例或者文章。

忘了提及.NET 2.0中的需求

解决方案

我们可以尝试并行扩展(.NET 4.0的一部分)

这些使我们可以编写如下内容:

Parallel.Foreach (ListOfStrings, (item) => 
    result.add(CalculateSmth(item));
);

当然result.add将需要是线程安全的。

并不是我现在在这里有任何好的文章,但是我们想要做的是在Producer-Consumer中使用Threadpool。

生产者循环遍历并创建任务(在这种情况下,可能只是将列表或者堆栈中的项目排队)。例如,消费者是五个线程,它们从堆栈中读取一项,通过计算将其消耗,然后将其存储在其他位置。

这样,多线程只限于这五个线程,并且它们都将完成工作,直到堆栈为空。

要考虑的事情:

  • 在输入和输出列表上放置保护,例如互斥锁。
  • 如果顺序很重要,请确保维持输出顺序。一个示例可能是将它们存储在SortedList或者类似的东西中。
  • 确保CalculateSmth是线程安全的,并且不使用任何全局状态。

我们必须回答的第一个问题是是否应该使用线程

如果函数CalculateSmth()基本上是受CPU限制的,即CPU使用率很高而基本上没有I / O使用率,那么我很难理解使用线程的意义,因为线程将在同一资源上竞争,在这种情况下为CPU。

如果CalculateSmth()同时使用CPU和I / O,则可能是使用线程的重点。

我完全同意我的回答。我做出了一个错误的假设,即我们所谈论的是具有一个核心的单个CPU,但是如今,我们拥有的是多核心CPU,这很糟糕。

Parallel扩展很酷,但这也可以通过使用如下所示的线程池来完成:

using System.Collections.Generic;
using System.Threading;

namespace noocyte.Threading
{
    class CalcState
    {
        public CalcState(ManualResetEvent reset, string input) {
            Reset = reset;
            Input = input;
        }
        public ManualResetEvent Reset { get; private set; }
        public string Input { get; set; }
    }

    class CalculateMT
    {
        List<string> result = new List<string>();
        List<ManualResetEvent> events = new List<ManualResetEvent>();

        private void Calc() {
            List<string> aList = new List<string>();
            aList.Add("test");

            foreach (var item in aList)
            {
                CalcState cs = new CalcState(new ManualResetEvent(false), item);
                events.Add(cs.Reset);
                ThreadPool.QueueUserWorkItem(new WaitCallback(Calculate), cs);
            }
            WaitHandle.WaitAll(events.ToArray());
        }

        private void Calculate(object s)
        {
            CalcState cs = s as CalcState;
            cs.Reset.Set();
            result.Add(cs.Input);
        }
    }
}

请注意,并发并不能神奇地为我们提供更多资源。我们需要确定减慢CalculateSmth速度的原因。

例如,如果它是CPU密集型的(并且我们位于单个内核上),那么无论我们是顺序执行还是并行执行,相同数量的CPU滴答声都会进入代码。另外,管理线程会带来一些开销。同样的论点适用于其他约束条件(例如I / O)

仅当CalculateSmth在执行过程中使资源空闲时,我们才能获得性能提升,而该资源可以由另一个实例使用。这并不少见。例如,如果任务涉及IO,然后是一些CPU,则进程1可能在做CPU,而进程2在做IO。正如mats指出的那样,如果我们拥有基础架构,则由生产者-消费者部门组成的链可以实现这一目标。

我们需要拆分要并行执行的工作。这是一个如何将工作分成两部分的示例:

List<string> work = (some list with lots of strings)

// Split the work in two
List<string> odd = new List<string>();
List<string> even = new List<string>();
for (int i = 0; i < work.Count; i++)
{
    if (i % 2 == 0)
    {
        even.Add(work[i]);
    }
    else
    {
        odd.Add(work[i]);
    }
}

// Set up to worker delegates
List<Foo> oddResult = new List<Foo>();
Action oddWork = delegate { foreach (string item in odd) oddResult.Add(CalculateSmth(item)); };

List<Foo> evenResult = new List<Foo>();
Action evenWork = delegate { foreach (string item in even) evenResult.Add(CalculateSmth(item)); };

// Run two delegates asynchronously
IAsyncResult evenHandle = evenWork.BeginInvoke(null, null);
IAsyncResult oddHandle = oddWork.BeginInvoke(null, null);

// Wait for both to finish
evenWork.EndInvoke(evenHandle);
oddWork.EndInvoke(oddHandle);

// Merge the results from the two jobs
List<Foo> allResults = new List<Foo>();
allResults.AddRange(oddResult);
allResults.AddRange(evenResult);

return allResults;