C# MaxDegreeOfParallelism 有什么作用?

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/9538452/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-09 07:49:06  来源:igfitidea点击:

What does MaxDegreeOfParallelism do?

c#.net-4.0task-parallel-libraryparallel-extensionsparallel.foreach

提问by Akash Kava

I am using Parallel.ForEach and I am doing some database updates, now without setting MaxDegreeOfParallelism , a dual core processor machine results in sql client timeouts, where else quad core processor machine somehow does not timeout.

我正在使用 Parallel.ForEach 并且我正在做一些数据库更新,现在没有设置 MaxDegreeOfParallelism ,双核处理器机器会导致 sql 客户端超时,否则四核处理器机器不知何故不会超时。

Now I have no control over what kind of processor cores are available where my code runs, but is there some settings I can change with MaxDegreeOfParallelism that will probably run less operations simultaneously and not result in timeouts?

现在我无法控制在我的代码运行的地方可用的处理器内核类型,但是是否可以使用 MaxDegreeOfParallelism 更改一些设置,这些设置可能会同时运行更少的操作并且不会导致超时?

I can increase timeouts but it isnt a good solution, if on lower CPU I can process less operations simultaneously, that will put less load on cpu.

我可以增加超时,但这不是一个好的解决方案,如果在较低的 CPU 上我可以同时处理更少的操作,那么 CPU 的负载就会减少。

Ok I have read all other posts and MSDN too, but will setting MaxDegreeOfParallelism to lower value make my quad core machines suffer?

好的,我也阅读了所有其他帖子和 MSDN,但是将 MaxDegreeOfParallelism 设置为较低的值会使我的四核机器受到影响吗?

For example, is there anyway to do something like, if CPU has two cores, then use 20, if CPU has four cores then 40?

例如,有没有办法做这样的事情,如果 CPU 有两个内核,那么使用 20,如果 CPU 有四个内核,那么使用 40?

采纳答案by jimrandomh

The answer is that it is the upper limit for the entire parallel operation, irrespective of the number of cores.

答案是它是整个并行操作的上限,与核数无关。

So even if you don't use the CPU because you are waiting on IO, or a lock, no extra tasks will run in parallel, only the maximum that you specifiy.

因此,即使您因为等待 IO 或锁而没有使用 CPU,也不会并行运行额外的任务,只有您指定的最大值。

To find this out, I wrote this piece of test code. There is an artificial lock in there to stimulate the TPL to use more threads. The same will happen when your code is waiting for IO or database.

为了找到这个,我写了这段测试代码。那里有一个人工锁来刺激 TPL 使用更多线程。当您的代码等待 IO 或数据库时,也会发生同样的情况。

class Program
{
    static void Main(string[] args)
    {
        var locker = new Object();
        int count = 0;
        Parallel.For
            (0
             , 1000
             , new ParallelOptions { MaxDegreeOfParallelism = 2 }
             , (i) =>
                   {
                       Interlocked.Increment(ref count);
                       lock (locker)
                       {
                           Console.WriteLine("Number of active threads:" + count);
                           Thread.Sleep(10);
                        }
                        Interlocked.Decrement(ref count);
                    }
            );
    }
}

If I don't specify MaxDegreeOfParallelism, the console logging shows that up to around 8 tasks are running at the same time. Like this:

如果我不指定 MaxDegreeOfParallelism,控制台日志会显示多达大约 8 个任务同时运行。像这样:

Number of active threads:6
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:6
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7
Number of active threads:7

It starts lower, increases over time and at the end it is trying to run 8 at the same time.

它开始较低,随着时间的推移增加,最后它试图同时运行 8。

If I limit it to some arbitrary value (say 2), I get

如果我将其限制为某个任意值(比如 2),我会得到

Number of active threads:2
Number of active threads:1
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2
Number of active threads:2

Oh, and this is on a quadcore machine.

哦,这是在四核机器上。

回答by SolidSnake

it sets number of threads to run in parallel...

它设置并行运行的线程数...

回答by jimrandomh

It sounds like the code that you're running in parallel is deadlocking, which means that unless you can find and fix the issue that's causing that, you shouldn't parallelize it at all.

听起来您并行运行的代码是死锁的,这意味着除非您能找到并修复导致该问题的问题,否则根本不应对其进行并行化。

回答by bugged87

For example, is there anyway to do something like, if CPU has two cores, then use 20, if CPU has four cores then 40?

例如,有没有办法做这样的事情,如果 CPU 有两个内核,那么使用 20,如果 CPU 有四个内核,那么使用 40?

You can do this to make parallelism dependent on the number of CPU cores:

您可以这样做以使并行性取决于 CPU 内核的数量:

var options = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount * 10 };
Parallel.ForEach(sourceCollection, options, sourceItem =>
{
    // do something
});

However, newer CPU's tend to use hyper-threading to simulate extra cores. So if you have a quad-core processor, then Environment.ProcessorCountwill probably report this as 8 cores. I've found that if you set the parallelism to account for the simulated cores then it actually slows down other threads such as UI threads.

然而,较新的 CPU 倾向于使用超线程来模拟额外的内核。因此,如果您有一个四核处理器,那么Environment.ProcessorCount可能会将其报告为 8 核。我发现,如果您将并行性设置为考虑模拟核心,那么它实际上会减慢其他线程(例如 UI 线程)的速度。

So although the operation will finish a bit faster, an application UI may experience significant lag during this time. Dividing the `Environment.ProcessorCount' by 2 seems to achieve the same processing speeds while still keeping the CPU available for UI threads.

因此,尽管操作完成得更快一些,但在此期间应用程序 UI 可能会出现明显的延迟。将“Environment.ProcessorCount”除以 2 似乎可以实现相同的处理速度,同时仍保持 CPU 可用于 UI 线程。

回答by grep65535

Something else to consider, especially for those finding this many years later, is depending on your situation it's usually best to collect all data in a DataTable and then use SqlBulkCopy toward the end of each major task.

其他需要考虑的事情,特别是对于多年后发现这一点的人来说,取决于您的情况,通常最好收集 DataTable 中的所有数据,然后在每个主要任务结束时使用 SqlBulkCopy。

For example I have a process that I made that runs through millions of files and I ran into the same errors when each file transaction made a DB query to insert the record. I instead moved to storing it all in a DataTable in memory for each share I iterated through, dumping the DataTable into my SQL Server and clearing it between each separate share. The bulk insert takes a split second and has the benefit of not opening thousands of connections at once.

例如,我制作了一个运行数百万个文件的进程,当每个文件事务进行数据库查询以插入记录时,我遇到了相同的错误。我转而将其全部存储在内存中的 DataTable 中,用于我迭代的每个共享,将 DataTable 转储到我的 SQL Server 中并在每个单独的共享之间清除它。批量插入需要一瞬间,并且具有不会同时打开数千个连接的好处。

EDIT: Here's a quick & dirty working example The SQLBulkCopy method:

编辑:这是一个快速而肮脏的工作示例 SQLBulkCopy 方法:

private static void updateDatabase(DataTable targetTable)
    {
        try
        {
            DataSet ds = new DataSet("FileFolderAttribute");
            ds.Tables.Add(targetTable);
            writeToLog(targetTable.TableName + " - Rows: " + targetTable.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            writeToLog(@"Opening SQL connection", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            Console.WriteLine(@"Opening SQL connection");
            SqlConnection sqlConnection = new SqlConnection(sqlConnectionString);
            sqlConnection.Open();
            SqlBulkCopy bulkCopy = new SqlBulkCopy(sqlConnection, SqlBulkCopyOptions.TableLock | SqlBulkCopyOptions.FireTriggers | SqlBulkCopyOptions.UseInternalTransaction, null);
            bulkCopy.DestinationTableName = "FileFolderAttribute";
            writeToLog(@"Copying data to SQL Server table", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            Console.WriteLine(@"Copying data to SQL Server table");
            foreach (var table in ds.Tables)
            {
                writeToLog(table.ToString(), logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                Console.WriteLine(table.ToString());
            }
            bulkCopy.WriteToServer(ds.Tables[0]);

            sqlConnection.Close();
            sqlConnection.Dispose();
            writeToLog(@"Closing SQL connection", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            writeToLog(@"Clearing local DataTable...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
            Console.WriteLine(@"Closing SQL connection");
            Console.WriteLine(@"Clearing local DataTable...");
            targetTable.Clear();
            ds.Tables.Remove(targetTable);
            ds.Clear();
            ds.Dispose();
        }
        catch (Exception error)
        {
            errorLogging(error, getCurrentMethod(), logDatabaseFile);
        }
    }

...and for dumping it into the datatable:

...并将其转储到数据表中:

private static void writeToDataTable(string ServerHostname, string RootDirectory, string RecordType, string Path, string PathDirectory, string PathFileName, string PathFileExtension, decimal SizeBytes, decimal SizeMB, DateTime DateCreated, DateTime DateModified, DateTime DateLastAccessed, string Owner, int PathLength, DateTime RecordWriteDateTime)
    {
        try
        {
            if (tableToggle)
            {
                DataRow toInsert = results_1.NewRow();
                toInsert[0] = ServerHostname;
                toInsert[1] = RootDirectory;
                toInsert[2] = RecordType;
                toInsert[3] = Path;
                toInsert[4] = PathDirectory;
                toInsert[5] = PathFileName;
                toInsert[6] = PathFileExtension;
                toInsert[7] = SizeBytes;
                toInsert[8] = SizeMB;
                toInsert[9] = DateCreated;
                toInsert[10] = DateModified;
                toInsert[11] = DateLastAccessed;
                toInsert[12] = Owner;
                toInsert[13] = PathLength;
                toInsert[14] = RecordWriteDateTime;

                results_1.Rows.Add(toInsert);
            }
            else
            {
                DataRow toInsert = results_2.NewRow();
                toInsert[0] = ServerHostname;
                toInsert[1] = RootDirectory;
                toInsert[2] = RecordType;
                toInsert[3] = Path;
                toInsert[4] = PathDirectory;
                toInsert[5] = PathFileName;
                toInsert[6] = PathFileExtension;
                toInsert[7] = SizeBytes;
                toInsert[8] = SizeMB;
                toInsert[9] = DateCreated;
                toInsert[10] = DateModified;
                toInsert[11] = DateLastAccessed;
                toInsert[12] = Owner;
                toInsert[13] = PathLength;
                toInsert[14] = RecordWriteDateTime;

                results_2.Rows.Add(toInsert);
            }


        }
        catch (Exception error)
        {
            errorLogging(error, getCurrentMethod(), logFile);
        }
    }

...and here's the context, the looping piece itself:

...这是上下文,循环片段本身:

private static void processTargetDirectory(DirectoryInfo rootDirectory, string targetPathRoot)
    {
        DateTime StartTime = DateTime.Now;
        int directoryCount = 0;
        int fileCount = 0;
        try
        {                
            manageDataTables();

            Console.WriteLine(rootDirectory.FullName);
            writeToLog(@"Working in Directory: " + rootDirectory.FullName, logFile, getLineNumber(), getCurrentMethod(), true);

            applicationsDirectoryCount++;

            // REPORT DIRECTORY INFO //
            string directoryOwner = "";
            try
            {
                directoryOwner = File.GetAccessControl(rootDirectory.FullName).GetOwner(typeof(System.Security.Principal.NTAccount)).ToString();
            }
            catch (Exception error)
            {
                //writeToLog("\t" + rootDirectory.FullName, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
                writeToLog("[" + error.Message + "] - " + rootDirectory.FullName, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
                errorLogging(error, getCurrentMethod(), logFile);
                directoryOwner = "SeparatedUser";
            }

            writeToRawLog(serverHostname + "," + targetPathRoot + "," + "Directory" + "," + rootDirectory.Name + "," + rootDirectory.Extension + "," + 0 + "," + 0 + "," + rootDirectory.CreationTime + "," + rootDirectory.LastWriteTime + "," + rootDirectory.LastAccessTime + "," + directoryOwner + "," + rootDirectory.FullName.Length + "," + DateTime.Now + "," + rootDirectory.FullName + "," + "", logResultsFile, true, logFile);
            //writeToDBLog(serverHostname, targetPathRoot, "Directory", rootDirectory.FullName, "", rootDirectory.Name, rootDirectory.Extension, 0, 0, rootDirectory.CreationTime, rootDirectory.LastWriteTime, rootDirectory.LastAccessTime, directoryOwner, rootDirectory.FullName.Length, DateTime.Now);
            writeToDataTable(serverHostname, targetPathRoot, "Directory", rootDirectory.FullName, "", rootDirectory.Name, rootDirectory.Extension, 0, 0, rootDirectory.CreationTime, rootDirectory.LastWriteTime, rootDirectory.LastAccessTime, directoryOwner, rootDirectory.FullName.Length, DateTime.Now);

            if (rootDirectory.GetDirectories().Length > 0)
            {
                Parallel.ForEach(rootDirectory.GetDirectories(), new ParallelOptions { MaxDegreeOfParallelism = directoryDegreeOfParallelism }, dir =>
                {
                    directoryCount++;
                    Interlocked.Increment(ref threadCount);
                    processTargetDirectory(dir, targetPathRoot);
                });

            }

            // REPORT FILE INFO //
            Parallel.ForEach(rootDirectory.GetFiles(), new ParallelOptions { MaxDegreeOfParallelism = fileDegreeOfParallelism }, file =>
            {
                applicationsFileCount++;
                fileCount++;
                Interlocked.Increment(ref threadCount);
                processTargetFile(file, targetPathRoot);
            });

        }
        catch (Exception error)
        {
            writeToLog(error.Message, logExceptionsFile, getLineNumber(), getCurrentMethod(), true);
            errorLogging(error, getCurrentMethod(), logFile);
        }
        finally
        {
            Interlocked.Decrement(ref threadCount);
        }

        DateTime EndTime = DateTime.Now;
        writeToLog(@"Run time for " + rootDirectory.FullName + @" is: " + (EndTime - StartTime).ToString() + @" | File Count: " + fileCount + @", Directory Count: " + directoryCount, logTimingFile, getLineNumber(), getCurrentMethod(), true);
    }

Like noted above, this is quick & dirty, but works very well.

如上所述,这既快速又脏,但效果很好。

For memory-related issues I ran into once I got to around 2,000,000 records, I had to create a second DataTable and alternate between the 2, dumping the records to SQL server between alternation. So my SQL connections consist of 1 every 100,000 records.

对于我遇到大约 2,000,000 条记录后遇到的与内存相关的问题,我不得不创建第二个 DataTable 并在这两个数据表之间交替,在交替之间将记录转储到 SQL 服务器。所以我的 SQL 连接包含每 100,000 条记录 1 个。

I managed that like this:

我是这样管理的:

private static void manageDataTables()
    {
        try
        {
            Console.WriteLine(@"[Checking datatable size] toggleValue: " + tableToggle + " | " + @"r1: " + results_1.Rows.Count + " - " + @"r2: " + results_2.Rows.Count);
            if (tableToggle)
            {
                int rowCount = 0;
                if (results_1.Rows.Count > datatableRecordCountThreshhold)
                {
                    tableToggle ^= true;
                    writeToLog(@"results_1 row count > 100000 @ " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    rowCount = results_1.Rows.Count;
                    logResultsFile = "FileServerReport_Results_" + DateTime.Now.ToString("yyyyMMdd-HHmmss") + ".txt";
                    Thread.Sleep(5000);
                    if (results_1.Rows.Count != rowCount)
                    {
                        writeToLog(@"results_1 row count increased, @ " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                        rowCount = results_1.Rows.Count;
                        Thread.Sleep(15000);
                    }
                    writeToLog(@"results_1 row count stopped increasing, updating database...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    updateDatabase(results_1);
                    results_1.Clear();
                    writeToLog(@"results_1 cleared, count: " + results_1.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                }

            }
            else
            {
                int rowCount = 0;
                if (results_2.Rows.Count > datatableRecordCountThreshhold)
                {
                    tableToggle ^= true;
                    writeToLog(@"results_2 row count > 100000 @ " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    rowCount = results_2.Rows.Count;
                    logResultsFile = "FileServerReport_Results_" + DateTime.Now.ToString("yyyyMMdd-HHmmss") + ".txt";
                    Thread.Sleep(5000);
                    if (results_2.Rows.Count != rowCount)
                    {
                        writeToLog(@"results_2 row count increased, @ " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                        rowCount = results_2.Rows.Count;
                        Thread.Sleep(15000);
                    }
                    writeToLog(@"results_2 row count stopped increasing, updating database...", logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                    updateDatabase(results_2);
                    results_2.Clear();
                    writeToLog(@"results_2 cleared, count: " + results_2.Rows.Count, logDatabaseFile, getLineNumber(), getCurrentMethod(), true);
                }
            }
        }
        catch (Exception error)
        {
            errorLogging(error, getCurrentMethod(), logDatabaseFile);
        }
    }

Where "datatableRecordCountThreshhold = 100000"

其中“datatableRecordCountThreshhold = 100000”