在C#中实现多线程(代码审查)

时间:2020-03-06 14:50:14  来源:igfitidea点击:

你好。

我正在尝试在应用程序中实现一些多线程代码。此代码的目的是验证数据库为其提供的项目。验证可能要花相当长的时间(几百毫秒到几秒钟),因此,对于每个项目,此过程需要分叉到其自己的线程中。

数据库开始可能每秒提供20或者30个项目,但开始迅速下降,最终在24小时内达到约65K个项目,此时应用程序退出。

我希望有经验的人可以看一下我的代码,看看是否有明显的问题。与我一起工作的人都不知道多线程,因此我真的只是我自己一个人。

这是代码。这有点长,但应该很清楚。如果我们有任何反馈或者建议,请告诉我。谢谢!

public class ItemValidationService
{
    /// <summary>
    /// The object to lock on in this class, for multithreading purposes.
    /// </summary>
    private static object locker = new object();

    /// <summary>Items that have been validated.</summary>
    private HashSet<int> validatedItems;

    /// <summary>Items that are currently being validated.</summary>
    private HashSet<int> validatingItems;

    /// <summary>Remove an item from the index if its links are bad.</summary>
    /// <param name="id">The ID of the item.</param>
    public void ValidateItem(int id)
    {
        lock (locker)
        {
            if
            (
                !this.validatedItems.Contains(id) &&
                !this.validatingItems.Contains(id)
            ){
                ThreadPool.QueueUserWorkItem(sender =>
                {
                    this.Validate(id);
                });
            }
        }

    } // method

    private void Validate(int itemId)
    {
        lock (locker)
        {
            this.validatingItems.Add(itemId);
        }

        // *********************************************
        // Time-consuming routine to validate an item...
        // *********************************************

        lock (locker)
        {
            this.validatingItems.Remove(itemId);
            this.validatedItems.Add(itemId);
        }

    } // method

} // class

解决方案

如果我们对时间不敏感,并且具有轻量的零星处理功能,那么线程池是一个方便的选择。但是,我记得在MSDN上读到的内容不适用于这种性质的大规模处理。

我用它来做类似的事情,对此感到遗憾。我在后续的应用程序中采用了工作线程方法,并且对我拥有的控制级别更加满意。

在工作线程模型中,我最喜欢的模式是创建一个包含任务项队列的主线程。然后派遣一帮工人,从该队列中弹出项目以进行处理。我使用了阻塞队列,以便在没有项目的过程中,工作人员只是阻塞,直到有东西推入队列。在此模型中,主线程从某些源(数据库等)生成工作项,而工作线程则使用这些工作项。

我会担心这里的表现。我们指示数据库可能每秒提供20到30个项目,并且一个项目可能需要花费几秒钟的时间来进行验证。那可能是大量线程-使用指标,最坏的情况下是60-90个线程!我认为我们需要在这里重新考虑设计。迈克尔提到了一个不错的模式。队列的使用确实有助于保持事物的控制和组织。信号量也可以用来控制创建的线程数-即我们可以允许最大数量的线程,但是在较小的负载下,如果最终完成任务的数量较少,则不必创建最大数量-即。我们自己的池大小可以动态设置上限。

当使用线程池时,我还发现在执行工作时监视池中线程的执行更加困难。因此,除非是失火和遗忘,否则我赞成执行更具控制性的执行。我知道我们提到过,应用程序在完成65K个项目后就退出了。我们如何监视线程​​以确定它们是否完成了工作-即所有排队的工作人员都完成了。我们是否正在监视HashSet中所有项目的状态?我认为,通过对项目进行排队并让自己的工作线程消耗掉该队列,我们可以获得更多的控制权。但是,这可能会以更多开销为代价,从而在线程之间发出信号以指示何时所有项目都已排队以允许它们退出。

请注意,QueueUserWorkItem可能会失败

我赞同使用阻塞队列和工作线程的想法。这是我过去使用过的阻塞队列实现,效果很好:
http://www.codeproject.com/KB/recipes/boundedblockingqueue.aspx

验证逻辑涉及什么?如果它主要是CPU约束,那么我在盒子上每个处理器/核心最多创建1个工作线程。这将告诉我们处理器的数量:
Environment.ProcessorCount

如果验证涉及I / O(例如文件访问或者数据库访问),那么我们可以使用的线程数要多于处理器的数量。

随问题一起发布的代码中可能存在逻辑错误,具体取决于" ValidateItem(int id)"中的项目ID来自何处。为什么?因为尽管我们在对工作项进行排队之前已正确锁定了validateatingItems和validatedItems队列,但是我们直到新线程启动后才将其添加到validateatingItems队列中。这意味着可能存在一个时间间隔,其中另一个线程以相同的ID调用" ValidateItem(id)"(除非它在单个主线程上运行)。

我将在排队之前在锁内将项目添加到validatingItems队列中。

编辑:同样,QueueUserWorkItem()返回一个布尔值,因此我们应该使用返回值来确保该项目已排队,然后将其添加到validatingItems队列中。

ThreadPool可能不是一次大量塞入其中的最佳选择。我们可能需要研究其功能的上限和/或者自己动手。

另外,如果我们期望没有重复的验证,则代码中存在竞争条件。致电

this.validatingItems.Add(itemId);

需要在主线程(ValidateItem)中发生,而不是在线程池线程(Validate方法)中发生。该调用应该在将工作项排队到池之前的一行进行。

通过不检查QueueUserWorkItem的返回值发现了更严重的错误。排队可能会失败,为什么它不会引发异常对我们所有人来说都是一个谜。如果返回false,则需要删除添加到validatingItems列表中的项目,并处理错误(可能引发异常)。

我们也可以尝试使用CCR并发和协调运行时。它埋在Microsoft Robotics Studio内,但是提供了出色的API来执行此类操作。

我们只需要创建一个"端口"(本质上是一个队列),挂接一个接收器(将东西张贴到接收器上就可以调用该方法),然后将工作项张贴到该接收器上。 CCR处理队列和工作线程以在其上运行。

这是Channel9上有关CCR的视频。

它具有很高的性能,甚至被用于非机器人类的东西(Myspace.com在幕后将其用于内容交付网络)。