C++11 中的线程池
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/15752659/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Thread pooling in C++11
提问by Yktula
Relevant questions:
相关问题:
About C++11:
关于 C++11:
- C++11: std::thread pooled?
- Will async(launch::async) in C++11 make thread pools obsolete for avoiding expensive thread creation?
About Boost:
关于升压:
How do I get a pool of threadsto send tasks to, without creating and deleting them over and over again? This means persistent threads to resynchronize without joining.
如何获得一个线程池以将任务发送到,而无需一遍又一遍地创建和删除它们?这意味着持久线程无需加入即可重新同步。
I have code that looks like this:
我有如下代码:
namespace {
std::vector<std::thread> workers;
int total = 4;
int arr[4] = {0};
void each_thread_does(int i) {
arr[i] += 2;
}
}
int main(int argc, char *argv[]) {
for (int i = 0; i < 8; ++i) { // for 8 iterations,
for (int j = 0; j < 4; ++j) {
workers.push_back(std::thread(each_thread_does, j));
}
for (std::thread &t: workers) {
if (t.joinable()) {
t.join();
}
}
arr[4] = std::min_element(arr, arr+4);
}
return 0;
}
Instead of creating and joining threads each iteration, I'd prefer to send tasks to my worker threads each iteration and only create them once.
我宁愿每次迭代都将任务发送到我的工作线程,并且只创建一次,而不是每次迭代都创建和加入线程。
回答by vit-vit
You can use C++ Thread Pool Library, https://github.com/vit-vit/ctpl.
您可以使用 C++ 线程池库,https://github.com/vit-vit/ctpl。
Then the code your wrote can be replaced with the following
然后你写的代码可以用下面的替换
#include <ctpl.h> // or <ctpl_stl.h> if ou do not have Boost library
int main (int argc, char *argv[]) {
ctpl::thread_pool p(2 /* two threads in the pool */);
int arr[4] = {0};
std::vector<std::future<void>> results(4);
for (int i = 0; i < 8; ++i) { // for 8 iterations,
for (int j = 0; j < 4; ++j) {
results[j] = p.push([&arr, j](int){ arr[j] +=2; });
}
for (int j = 0; j < 4; ++j) {
results[j].get();
}
arr[4] = std::min_element(arr, arr + 4);
}
}
You will get the desired number of threads and will not create and delete them over and over again on the iterations.
您将获得所需数量的线程,并且不会在迭代中一遍又一遍地创建和删除它们。
回答by PhD AP EcE
This is copied from my answer to another very similar post, hope it can help:
这是从我对另一个非常相似的帖子的回答中复制的,希望它可以帮助:
1) Start with maximum number of threads a system can support:
1) 从系统可以支持的最大线程数开始:
int Num_Threads = thread::hardware_concurrency();
2) For an efficient threadpool implementation, once threads are created according to Num_Threads, it's better not to create new ones, or destroy old ones (by joining). There will be performance penalty, might even make your application goes slower than the serial version.
2)为了高效的线程池实现,一旦根据Num_Threads创建了线程,最好不要创建新的,也不要销毁旧的(通过join)。会有性能损失,甚至可能使您的应用程序运行速度比串行版本慢。
Each C++11 thread should be running in their function with an infinite loop, constantly waiting for new tasks to grab and run.
每个 C++11 线程都应该以无限循环的方式在其函数中运行,不断等待新任务的获取和运行。
Here is how to attach such function to the thread pool:
以下是如何将此类函数附加到线程池:
int Num_Threads = thread::hardware_concurrency();
vector<thread> Pool;
for(int ii = 0; ii < Num_Threads; ii++)
{ Pool.push_back(thread(Infinite_loop_function));}
3) The Infinite_loop_function
3) Infinite_loop_function
This is a "while(true)" loop waiting for the task queue
这是一个等待任务队列的“while(true)”循环
void The_Pool:: Infinite_loop_function()
{
while(true)
{
{
unique_lock<mutex> lock(Queue_Mutex);
condition.wait(lock, []{return !Queue.empty() || terminate_pool});
Job = Queue.front();
Queue.pop();
}
Job(); // function<void()> type
}
};
4) Make a function to add job to your Queue
4)创建一个函数来将作业添加到您的队列中
void The_Pool:: Add_Job(function<void()> New_Job)
{
{
unique_lock<mutex> lock(Queue_Mutex);
Queue.push(New_Job);
}
condition.notify_one();
}
5) Bind an arbitrary function to your Queue
5)将任意函数绑定到您的队列
Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));
Once you integrate these ingredients, you have your own dynamic threading pool. These threads always run, waiting for job to do.
一旦您集成了这些成分,您就拥有了自己的动态线程池。这些线程始终运行,等待工作完成。
I apologize if there are some syntax errors, I typed these code and and I have a bad memory. Sorry that I cannot provide you the complete thread pool code, that would violate my job integrity.
如果有一些语法错误,我很抱歉,我输入了这些代码并且我记性不好。抱歉,我无法向您提供完整的线程池代码,这会违反我的工作诚信。
Edit: to terminate the pool, call the shutdown() method:
编辑:要终止池,请调用 shutdown() 方法:
XXXX::shutdown(){
{
unique_lock<mutex> lock(threadpool_mutex);
terminate_pool = true;} // use this flag in condition.wait
condition.notify_all(); // wake up all threads.
// Join all threads.
for(std::thread &every_thread : thread_vector)
{ every_thread.join();}
thread_vector.clear();
stopped = true; // use this flag in destructor, if not set, call shutdown()
}
回答by Kerrek SB
A pool of threads means that all your threads are running, all the time – in other words, the thread function never returns. To give the threads something meaningful to do, you have to design a system of inter-thread communication, both for the purpose of telling the thread that there's something to do, as well as for communicating the actual work data.
线程池意味着您的所有线程一直在运行——换句话说,线程函数永远不会返回。为了让线程做一些有意义的事情,你必须设计一个线程间通信系统,目的是告诉线程有事情要做,以及传达实际的工作数据。
Typically this will involve some kind of concurrent data structure, and each thread would presumably sleep on some kind of condition variable, which would be notified when there's work to do. Upon receiving the notification, one or several of the threads wake up, recover a task from the concurrent data structure, process it, and store the result in an analogous fashion.
通常,这将涉及某种并发数据结构,并且每个线程可能会在某种条件变量上休眠,当有工作要做时会得到通知。收到通知后,一个或多个线程唤醒,从并发数据结构中恢复任务,处理它,并以类似的方式存储结果。
The thread would then go on to check whether there's even more work to do, and if not go back to sleep.
然后该线程将继续检查是否还有更多工作要做,如果没有,则返回睡眠状态。
The upshot is that you have to design all this yourself, since there isn't a natural notion of "work" that's universally applicable. It's quite a bit of work, and there are some subtle issues you have to get right. (You can program in Go if you like a system which takes care of thread management for you behind the scenes.)
结果是您必须自己设计所有这些,因为没有普遍适用的“工作”的自然概念。这是相当多的工作,并且您必须解决一些微妙的问题。(如果你喜欢一个在幕后为你处理线程管理的系统,你可以用 Go 编程。)
回答by didierc
A threadpool is at core a set of threads all bound to a function working as an event loop. These threads will endlessly wait for a task to be executed, or their own termination.
线程池的核心是一组线程,所有线程都绑定到作为事件循环工作的函数。这些线程将无休止地等待任务被执行,或者它们自己的终止。
The threadpool job is to provide an interface to submit jobs, define (and perhaps modify) the policy of running these jobs (scheduling rules, thread instantiation, size of the pool), and monitor the status of the threads and related resources.
线程池作业是提供一个接口来提交作业,定义(也可能是修改)运行这些作业的策略(调度规则、线程实例化、池的大小),并监控线程和相关资源的状态。
So for a versatile pool, one must start by defining what a task is, how it is launched, interrupted, what is the result (see the notion of promise and future for that question), what sort of events the threads will have to respond to, how they will handle them, how these events shall be discriminated from the ones handled by the tasks. This can become quite complicated as you can see, and impose restrictions on how the threads will work, as the solution becomes more and more involved.
因此,对于多功能池,必须首先定义任务是什么,它如何启动、中断、结果是什么(请参阅该问题的 promise 和 future 的概念)、线程必须响应什么样的事件到,他们将如何处理它们,如何将这些事件与任务处理的事件区分开来。正如您所看到的,这会变得非常复杂,并且随着解决方案变得越来越复杂,并且对线程的工作方式施加了限制。
The current tooling for handling events is fairly barebones(*): primitives like mutexes, condition variables, and a few abstractions on top of that (locks, barriers). But in some cases, these abstrations may turn out to be unfit (see this related question), and one must revert to using the primitives.
当前用于处理事件的工具相当简单(*):诸如互斥锁、条件变量之类的原语,以及在此之上的一些抽象(锁、屏障)。但在某些情况下,这些抽象可能会变得不合适(请参阅此相关问题),并且必须恢复使用原语。
Other problems have to be managed too:
还必须管理其他问题:
- signal
- i/o
- hardware (processor affinity, heterogenous setup)
- 信号
- 输入/输出
- 硬件(处理器关联、异构设置)
How would these play out in your setting?
这些将如何在您的环境中发挥作用?
This answerto a similar question points to an existing implementation meant for boost and the stl.
这个对类似问题的回答指向了一个用于 boost 和 stl 的现有实现。
I offered a very crude implementationof a threadpool for another question, which doesn't address many problems outlined above. You might want to build up on it. You might also want to have a look of existing frameworks in other languages, to find inspiration.
我为另一个问题提供了一个非常粗略的线程池实现,它没有解决上面列出的许多问题。你可能想以此为基础。您可能还想查看其他语言的现有框架,以寻找灵感。
(*) I don't see that as a problem, quite to the contrary. I think it's the very spirit of C++ inherited from C.
(*) 我不认为这是一个问题,恰恰相反。我认为这是从 C 继承的 C++ 精神。
回答by Tyler
Edit: This now requires C++17 and concepts. (As of 9/12/16, only g++ 6.0+ is sufficient.)
编辑:这现在需要 C++17 和概念。(截至 2016 年 9 月 12 日,只有 g++ 6.0+ 就足够了。)
The template deduction is a lot more accurate because of it, though, so it's worth the effort of getting a newer compiler. I've not yet found a function that requires explicit template arguments.
但是,模板推导因此更加准确,因此值得努力获得更新的编译器。我还没有找到需要显式模板参数的函数。
It also now takes any appropriate callable object (and is still statically typesafe!!!).
它现在也接受任何适当的可调用对象(并且仍然是静态类型安全的!!!)。
It also now includes an optional green threading priority thread pool using the same API. This class is POSIX only, though. It uses the ucontext_t
API for userspace task switching.
它现在还包括一个使用相同 API 的可选绿色线程优先级线程池。不过,此类仅适用于 POSIX。它使用ucontext_t
API 进行用户空间任务切换。
I created a simple library for this. An example of usage is given below. (I'm answering this because it was one of the things I found before I decided it was necessary to write it myself.)
我为此创建了一个简单的库。下面给出了一个使用示例。(我回答这个是因为这是我在决定有必要自己写之前发现的事情之一。)
bool is_prime(int n){
// Determine if n is prime.
}
int main(){
thread_pool pool(8); // 8 threads
list<future<bool>> results;
for(int n = 2;n < 10000;n++){
// Submit a job to the pool.
results.emplace_back(pool.async(is_prime, n));
}
int n = 2;
for(auto i = results.begin();i != results.end();i++, n++){
// i is an iterator pointing to a future representing the result of is_prime(n)
cout << n << " ";
bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result.
if(prime)
cout << "is prime";
else
cout << "is not prime";
cout << endl;
}
}
You can pass async
any function with any (or void) return value and any (or no) arguments and it will return a corresponding std::future
. To get the result (or just wait until a task has completed) you call get()
on the future.
您可以传递async
具有任何(或无效)返回值和任何(或无)参数的任何函数,它将返回相应的std::future
. 要获得结果(或只是等到任务完成),您可以调用get()
未来。
Here's the github: https://github.com/Tyler-Hardin/thread_pool.
这是 github:https: //github.com/Tyler-Hardin/thread_pool。
回答by rustyx
Something like this might help (taken from a working app).
像这样的东西可能会有所帮助(取自一个工作应用程序)。
#include <memory>
#include <boost/asio.hpp>
#include <boost/thread.hpp>
struct thread_pool {
typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;
thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) {
for (int i = 0; i < threads; ++i) {
auto worker = [this] { return service.run(); };
grp.add_thread(new boost::thread(worker));
}
}
template<class F>
void enqueue(F f) {
service.post(f);
}
~thread_pool() {
service_worker.reset();
grp.join_all();
service.stop();
}
private:
boost::asio::io_service service;
asio_worker service_worker;
boost::thread_group grp;
};
You can use it like this:
你可以这样使用它:
thread_pool pool(2);
pool.enqueue([] {
std::cout << "Hello from Task 1\n";
});
pool.enqueue([] {
std::cout << "Hello from Task 2\n";
});
Keep in mind that reinventing an efficientasynchronous queuing mechanism is not trivial.
请记住,重新发明一种高效的异步排队机制并非易事。
Boost::asio::io_service is a very efficient implementation, or actually is a collection of platform-specific wrappers (e.g. it wraps I/O completion ports on Windows).
Boost::asio::io_service 是一个非常有效的实现,或者实际上是一个特定于平台的包装器的集合(例如它包装了 Windows 上的 I/O 完成端口)。
回答by Halcyon
This is another thread pool implementation that is very simple, easy to understand and use, uses only C++11 standard library, and can be looked at or modified for your uses, should be a nice starter if you want to get into using thread pools:
这是另一个线程池实现,非常简单,易于理解和使用,仅使用 C++11 标准库,可以查看或修改以供您使用,如果您想开始使用线程,应该是一个不错的入门池:
回答by pio
Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool:
function_pool.h
函数池.h
#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>
class Function_pool
{
private:
std::queue<std::function<void()>> m_function_queue;
std::mutex m_lock;
std::condition_variable m_data_condition;
std::atomic<bool> m_accept_functions;
public:
Function_pool();
~Function_pool();
void push(std::function<void()> func);
void done();
void infinite_loop_func();
};
function_pool.cpp
函数池.cpp
#include "function_pool.h"
Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}
Function_pool::~Function_pool()
{
}
void Function_pool::push(std::function<void()> func)
{
std::unique_lock<std::mutex> lock(m_lock);
m_function_queue.push(func);
// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
lock.unlock();
m_data_condition.notify_one();
}
void Function_pool::done()
{
std::unique_lock<std::mutex> lock(m_lock);
m_accept_functions = false;
lock.unlock();
// when we send the notification immediately, the consumer will try to get the lock , so unlock asap
m_data_condition.notify_all();
//notify all waiting threads.
}
void Function_pool::infinite_loop_func()
{
std::function<void()> func;
while (true)
{
{
std::unique_lock<std::mutex> lock(m_lock);
m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
if (!m_accept_functions && m_function_queue.empty())
{
//lock will be release automatically.
//finish the thread loop and let it join in the main thread.
return;
}
func = m_function_queue.front();
m_function_queue.pop();
//release the lock
}
func();
}
}
main.cpp
主程序
#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>
Function_pool func_pool;
class quit_worker_exception : public std::exception {};
void example_function()
{
std::cout << "bla" << std::endl;
}
int main()
{
std::cout << "stating operation" << std::endl;
int num_threads = std::thread::hardware_concurrency();
std::cout << "number of threads = " << num_threads << std::endl;
std::vector<std::thread> thread_pool;
for (int i = 0; i < num_threads; i++)
{
thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
}
//here we should send our functions
for (int i = 0; i < 50; i++)
{
func_pool.push(example_function);
}
func_pool.done();
for (unsigned int i = 0; i < thread_pool.size(); i++)
{
thread_pool.at(i).join();
}
}
回答by Amir Fo
You can use thread_poolfrom boost library:
您可以使用boost 库中的thread_pool:
void my_task(){...}
int main(){
int threadNumbers = thread::hardware_concurrency();
boost::asio::thread_pool pool(threadNumbers);
// Submit a function to the pool.
boost::asio::post(pool, my_task);
// Submit a lambda object to the pool.
boost::asio::post(pool, []() {
...
});
}
You also can use threadpoolfrom open source community:
您还可以使用来自开源社区的线程池:
void first_task() {...}
void second_task() {...}
int main(){
int threadNumbers = thread::hardware_concurrency();
pool tp(threadNumbers);
// Add some tasks to the pool.
tp.schedule(&first_task);
tp.schedule(&second_task);
}
回答by cantordust
A threadpool with no dependencies outside of STL is entirely possible. I recently wrote a small header-only threadpool libraryto address the exact same problem. It supports dynamic pool resizing (changing the number of workers at runtime), waiting, stopping, pausing, resuming and so on. I hope you find it useful.
在 STL 之外没有依赖关系的线程池是完全可能的。我最近编写了一个小型头文件线程池库来解决完全相同的问题。它支持动态池调整大小(在运行时更改工作线程的数量)、等待、停止、暂停、恢复等。希望对你有帮助。