Windows API 线程池简单示例

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/8357955/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-15 18:34:39  来源:igfitidea点击:

Windows API Thread Pool simple example

c++windowsmultithreadingthreadpool

提问by nbonneel

[EDIT: thanks to MSalters answer and Raymond Chen's answer to InterlockedIncrement vs EnterCriticalSection/counter++/LeaveCriticalSection, the problem is solved and the code below is working properly. This should provide an interesting simple example of Thread Pool use in Windows]

[编辑:感谢 MSalters 的回答和 Raymond Chen 对InterlockedIncrement vs EnterCriticalSection/counter++/LeaveCriticalSection 的回答,问题解决了,下面的代码工作正常。这应该提供一个有趣的简单示例,在 Windows 中使用线程池]

I don't manage to find a simple example of the following task. My program, for example, needs to increment the values in a huge std::vector by one, so I want to do that in parallel. It needs to do that a bunch of times across the lifetime of the program. I know how to do that using CreateThread at each call of the routine but I don't manage to get rid of the CreateThread with the ThreadPool.

我无法找到以下任务的简单示例。例如,我的程序需要将巨大的 std::vector 中的值加 1,所以我想并行执行此操作。它需要在程序的整个生命周期中多次这样做。我知道如何在每次调用例程时使用 CreateThread 来做到这一点,但我无法通过 ThreadPool 摆脱 CreateThread。

Here is what I do :

这是我所做的:

class Thread {
public:
    Thread(){}
    virtual void run() = 0 ; // I can inherit an "IncrementVectorThread"
};
class IncrementVectorThread: public Thread {
public:
   IncrementVectorThread(int threadID, int nbThreads, std::vector<int> &vec) : id(threadID), nb(nbThreads), myvec(vec) { };

   virtual void run() {
        for (int i=(myvec.size()*id)/nb; i<(myvec.size()*(id+1))/nb; i++)
          myvec[i]++; //and let's assume myvec is properly sized
    }
   int id, nb;
   std::vector<int> &myvec;
};

class ThreadGroup : public std::vector<Thread*> {
public:
    ThreadGroup() { 
         pool = CreateThreadpool(NULL);
         InitializeThreadpoolEnvironment(&cbe);
         cleanupGroup = CreateThreadpoolCleanupGroup();
         SetThreadpoolCallbackPool(&cbe, pool);
         SetThreadpoolCallbackCleanupGroup(&cbe, cleanupGroup, NULL);
         threadCount = 0;
    }
    ~ThreadGroup() {
         CloseThreadpool(pool);
}
    PTP_POOL pool;
    TP_CALLBACK_ENVIRON cbe;
    PTP_CLEANUP_GROUP cleanupGroup;
    volatile long threadCount;
} ;


static VOID CALLBACK runFunc(
                PTP_CALLBACK_INSTANCE Instance,
                PVOID Context,
                PTP_WORK Work) {

   ThreadGroup &thread = *((ThreadGroup*) Context);
   long id = InterlockedIncrement(&(thread.threadCount));
   DWORD tid = (id-1)%thread.size();
   thread[tid]->run();
}

void run_threads(ThreadGroup* thread_group) {
    SetThreadpoolThreadMaximum(thread_group->pool, thread_group->size());
    SetThreadpoolThreadMinimum(thread_group->pool, thread_group->size());

    TP_WORK *worker = CreateThreadpoolWork(runFunc, (void*) thread_group, &thread_group->cbe);
    thread_group->threadCount = 0;
    for (int i=0; i<thread_group->size(); i++) {
        SubmitThreadpoolWork(worker);
     }  
     WaitForThreadpoolWorkCallbacks(worker,FALSE);  
     CloseThreadpoolWork(worker);   
}       

void main() {

   ThreadGroup group;
   std::vector<int> vec(10000, 0);
   for (int i=0; i<10; i++)
      group.push_back(new IncrementVectorThread(i, 10, vec));

   run_threads(&group);
   run_threads(&group);
   run_threads(&group);

   // now, vec should be == std::vector<int>(10000, 3);       
}

So, if I understood well :
- the command CreateThreadpool creates a bunch of Threads (hence, the call to CreateThreadpoolWork is cheap as it doesn't call CreateThread)
- I can have as many thread pools as I want (if I want to do a thread pool for "IncrementVector" and one for my "DecrementVector" threads, I can).
- if I need to divide my "increment vector" task into 10 threads, instead of calling 10 times CreateThread, I create a single "worker", and Submit it 10 times to the ThreadPool with the same parameter (hence, I need the thread ID in the callback to know which part of my std::vector to increment). Here I couldn't find the thread ID, since the function GetCurrentThreadId() returns the real ID of the thread (ie., something like 1528, not something between 0..nb_launched_threads).

所以,如果我理解得很好:
- 命令 CreateThreadpool 创建一堆线程(因此,调用 CreateThreadpoolWork 很便宜,因为它不调用 CreateThread)
- 我可以拥有任意数量的线程池(如果我想这样做)一个用于“IncrementVector”的线程池和一个用于我的“DecrementVector”线程的线程池,我可以)。
- 如果我需要将我的“增量向量”任务分成 10 个线程,而不是调用 10 次 CreateThread,我创建一个“工人”,并使用相同的参数将其提交 10 次到 ThreadPool(因此,我需要线程回调中的 ID 以了解要增加 std::vector 的哪一部分)。在这里我找不到线程 ID,因为函数 GetCurrentThreadId() 返回线程的真实 ID(即,类似于 1528,

Finally, I am not sure I understood the concept well : do I really need a single worker and not 10 if I split my std::vector into 10 threads ?

最后,我不确定我是否很好地理解了这个概念:如果我将 std::vector 拆分为 10 个线程,我真的需要一个工人而不是 10 个吗?

Thanks!

谢谢!

采纳答案by MSalters

You're roughly right up to the last point.

你大致正确到最后一点。

The whole idea about a thread pool is that you don't care how many threads it has. You just throw a lot of work into the thread pool, and let the OS determine how to execute each chunk. So, if you create and submit 10 chunks, the OS may use between 1 and 10 threads from the pool.

关于线程池的整个想法是你不关心它有多少线程。您只需将大量工作投入线程池,并让操作系统确定如何执行每个块。因此,如果您创建并提交 10 个块,操作系统可能会使用池中的 1 到 10 个线程。

You should not care about those thread identities. Don't bother with thread ID's, minimum or maximum number of threads, or stuff like that.

您不应该关心这些线程身份。不要理会线程 ID、最小或最大线程数或类似的东西。

If you don't care about thread identities, then how do you manage what part of the vector to change? Simple. Before creating the threadpool, initialize a counter to zero. In the callback function, call InterlockedIncrementto retrieve and increment the counter. For each submitted work item, you'll get a consecutive integer.

如果你不关心线程身份,那么你如何管理向量的哪一部分要改变?简单的。在创建线程池之前,将计数器初始化为零。在回调函数中,调用InterlockedIncrement以检索和递增计数器。对于每个提交的工作项,您将获得一个连续的整数。