C++ 使用 boost asio 的线程池

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/12215395/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-08-27 16:02:24  来源:igfitidea点击:

Thread pool using boost asio

c++threadpoolboost-asio

提问by vivek

I am trying to create a limited thread pool class using boost::asio. But I am stuck at one point can some one help me.

我正在尝试使用 boost::asio 创建一个有限的线程池类。但我被困在某一点有人可以帮助我。

The only problem is the place where I should decrease counter?

唯一的问题是我应该减少计数器的地方?

code does not work as expected.

代码没有按预期工作。

the problem is I don't know when my thread will finish execution and how I will come to know that it has return to pool

问题是我不知道我的线程何时完成执行以及我如何知道它已返回池

#include <boost/asio.hpp>
#include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>
#include <stack>

using namespace std;
using namespace boost;

class ThreadPool
{
    static int count;
    int NoOfThread;
    thread_group grp;
    mutex mutex_;
    asio::io_service io_service;
    int counter;
    stack<thread*> thStk ;

public:
    ThreadPool(int num)
    {   
        NoOfThread = num;
        counter = 0;
        mutex::scoped_lock lock(mutex_);

        if(count == 0)
            count++;
        else
            return;

        for(int i=0 ; i<num ; ++i)
        {
            thStk.push(grp.create_thread(boost::bind(&asio::io_service::run, &io_service)));
        }
    }
    ~ThreadPool()
    {
        io_service.stop();
        grp.join_all();
    }

    thread* getThread()
    {
        if(counter > NoOfThread)
        {
            cout<<"run out of threads \n";
            return NULL;
        }

        counter++;
        thread* ptr = thStk.top();
        thStk.pop();
        return ptr;
    }
};
int ThreadPool::count = 0;


struct callable
{
    void operator()()
    {
        cout<<"some task for thread \n";
    }
};

int main( int argc, char * argv[] )
{

    callable x;
    ThreadPool pool(10);
    thread* p = pool.getThread();
    cout<<p->get_id();

    //how i can assign some function to thread pointer ?
    //how i can return thread pointer after work done so i can add 
//it back to stack?


    return 0;
}

回答by Tanner Sansbury

In short, you need to wrap the user's provided task with another function that will:

简而言之,您需要使用另一个函数包装用户提供的任务,该函数将:

  • Invoke the user function or callable object.
  • Lock the mutex and decrement the counter.
  • 调用用户函数或可调用对象。
  • 锁定互斥锁并递减计数器。


I may not be understanding all the requirements for this thread pool. Thus, for clarity, here is an explicit list as to what I believe are the requirements:

我可能不了解这个线程池的所有要求。因此,为了清楚起见,这里有一份关于我认为的要求的明确清单:

  • The pool manages the lifetime of the threads. The user should not be able to delete threads that reside within the pool.
  • The user can assign a task to the pool in a non-intrusive way.
  • When a task is being assigned, if all threads in the pool are currently running other tasks, then the task is discarded.
  • 池管理线程的生命周期。用户不应能够删除驻留在池中的线​​程。
  • 用户可以以非侵入性的方式将任务分配给池。
  • 在分配任务时,如果池中的所有线程当前都在运行其他任务,则该任务将被丢弃。

Before I provide an implementation, there are a few key points I would like to stress:

在我提供一个实现之前,我想强调几个关键点:

  • Once a thread has been launched, it will run until completion, cancellation, or termination. The function the thread is executing cannot be reassigned. To allow for a single thread to execute multiple functions over the course of its life, the thread will want to launch with a function that will read from a queue, such as io_service::run(), and callable types are posted into the event queue, such as from io_service::post().
  • io_service::run()returns if there is no work pending in the io_service, the io_serviceis stopped, or an exception is thrown from a handler that the thread was running. To prevent io_serivce::run()from returning when there is no unfinished work, the io_service::workclass can be used.
  • Defining the task's type requirements (i.e. the task's type must be callable by object()syntax) instead of requiring a type (i.e. task must inherit from process), provides more flexibility to the user. It allows the user to supply a task as a function pointer or a type providing a nullary operator().
  • 线程启动后,它将一直运行到完成、取消或终止。线程正在执行的函数不能重新分配。为了允许单个线程在其生命周期内执行多个函数,该线程将希望使用从队列中读取的函数启动,例如io_service::run(),并且可调用类型被发布到事件队列中,例如 from io_service::post()
  • io_service::run()如果 中没有待处理的工作io_serviceio_service停止或从线程正在运行的处理程序抛出异常,则返回。为了防止io_serivce::run()在没有未完成的工作时返回,io_service::work可以使用该类。
  • 定义任务的类型要求(即任务的类型必须可通过object()语法调用)而不是要求类型(即任务必须从 继承process),为用户提供了更大的灵活性。它允许用户将任务作为函数指针或提供 nullary 的类型提供operator()

Implementation using boost::asio:

实施使用boost::asio

#include <boost/asio.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
  boost::asio::io_service io_service_;
  boost::asio::io_service::work work_;
  boost::thread_group threads_;
  std::size_t available_;
  boost::mutex mutex_;
public:

  /// @brief Constructor.
  thread_pool( std::size_t pool_size )
    : work_( io_service_ ),
      available_( pool_size )
  {
    for ( std::size_t i = 0; i < pool_size; ++i )
    {
      threads_.create_thread( boost::bind( &boost::asio::io_service::run,
                                           &io_service_ ) );
    }
  }

  /// @brief Destructor.
  ~thread_pool()
  {
    // Force all threads to return from io_service::run().
    io_service_.stop();

    // Suppress all exceptions.
    try
    {
      threads_.join_all();
    }
    catch ( const std::exception& ) {}
  }

  /// @brief Adds a task to the thread pool if a thread is currently available.
  template < typename Task >
  void run_task( Task task )
  {
    boost::unique_lock< boost::mutex > lock( mutex_ );

    // If no threads are available, then return.
    if ( 0 == available_ ) return;

    // Decrement count, indicating thread is no longer available.
    --available_;

    // Post a wrapped task into the queue.
    io_service_.post( boost::bind( &thread_pool::wrap_task, this,
                                   boost::function< void() >( task ) ) );
  }

private:
  /// @brief Wrap a task so that the available count can be increased once
  ///        the user provided task has completed.
  void wrap_task( boost::function< void() > task )
  {
    // Run the user supplied task.
    try
    {
      task();
    }
    // Suppress all exceptions.
    catch ( const std::exception& ) {}

    // Task has finished, so increment count of available threads.
    boost::unique_lock< boost::mutex > lock( mutex_ );
    ++available_;
  }
};

A few comments about the implementation:

关于实现的几点意见:

  • Exception handling needs to occur around the user's task. If the user's function or callable object throws an exception that is not of type boost::thread_interrupted, then std::terminate()is called. This is the the result of Boost.Thread's exceptions in thread functionsbehavior. It is also worth reading Boost.Asio's effect of exceptions thrown from handlers.
  • If the user provides the taskvia boost::bind, then the nested boost::bindwill fail to compile. One of the following options is required:
    • Not support taskcreated by boost::bind.
    • Meta-programming to perform compile-time branching based on whether or not the user's type if the result of boost::bindso that boost::protectcould be used, as boost::protectonly functions properly on certain function objects.
    • Use another type to pass the taskobject indirectly. I opted to use boost::functionfor readability at the cost of losing the exact type. boost::tuple, while slightly less readable, could also be used to preserve the exact type, as seen in the Boost.Asio's serializationexample.
  • 异常处理需要围绕用户的任务进行。如果用户的函数或可调用对象抛出非类型的异常boost::thread_interrupted,则std::terminate()调用。这是 Boost.Thread在线程函数行为中出现异常的结果。Boost.Asio's effect of exceptions throws from handlers也值得一读。
  • 如果用户提供taskvia boost::bind,则嵌套boost::bind将无法编译。需要以下选项之一:
    • 不支持taskboost::bind.
    • 元编程进行编译时根据if的结果是否在用户的类型分支boost::bind,这样boost::protect可以使用,因为boost::protect只有正常运行某些功能的对象。
    • 使用另一种类型task间接传递对象。我选择以boost::function丢失确切类型为代价来提高可读性。 boost::tuple,虽然可读性稍差,但也可用于保留确切类型,如 Boost.Asio 的序列化示例所示。

Application code can now use the thread_pooltype non-intrusively:

应用程序代码现在可以thread_pool非侵入性地使用该类型:

void work() {};

struct worker
{
  void operator()() {};
};

void more_work( int ) {};

int main()
{ 
  thread_pool pool( 2 );
  pool.run_task( work );                        // Function pointer.
  pool.run_task( worker() );                    // Callable object.
  pool.run_task( boost::bind( more_work, 5 ) ); // Callable object.
}

The thread_poolcould be created without Boost.Asio, and may be slightly easier for maintainers, as they no longer need to know about Boost.Asiobehaviors, such as when does io_service::run()return, and what is the io_service::workobject:

thread_pool可以在没有 Boost.Asio的情况下创建,并且对于维护者来说可能稍微容易一些,因为他们不再需要了解Boost.Asio行为,例如何时io_service::run()返回,以及io_service::work对象是什么:

#include <queue>
#include <boost/bind.hpp>
#include <boost/thread.hpp>

class thread_pool
{
private:
  std::queue< boost::function< void() > > tasks_;
  boost::thread_group threads_;
  std::size_t available_;
  boost::mutex mutex_;
  boost::condition_variable condition_;
  bool running_;
public:

  /// @brief Constructor.
  thread_pool( std::size_t pool_size )
    : available_( pool_size ),
      running_( true )
  {
    for ( std::size_t i = 0; i < pool_size; ++i )
    {
      threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ;
    }
  }

  /// @brief Destructor.
  ~thread_pool()
  {
    // Set running flag to false then notify all threads.
    {
      boost::unique_lock< boost::mutex > lock( mutex_ );
      running_ = false;
      condition_.notify_all();
    }

    try
    {
      threads_.join_all();
    }
    // Suppress all exceptions.
    catch ( const std::exception& ) {}
  }

  /// @brief Add task to the thread pool if a thread is currently available.
  template < typename Task >
  void run_task( Task task )
  {
    boost::unique_lock< boost::mutex > lock( mutex_ );

    // If no threads are available, then return.
    if ( 0 == available_ ) return;

    // Decrement count, indicating thread is no longer available.
    --available_;

    // Set task and signal condition variable so that a worker thread will
    // wake up andl use the task.
    tasks_.push( boost::function< void() >( task ) );
    condition_.notify_one();
  }

private:
  /// @brief Entry point for pool threads.
  void pool_main()
  {
    while( running_ )
    {
      // Wait on condition variable while the task is empty and the pool is
      // still running.
      boost::unique_lock< boost::mutex > lock( mutex_ );
      while ( tasks_.empty() && running_ )
      {
        condition_.wait( lock );
      }
      // If pool is no longer running, break out.
      if ( !running_ ) break;

      // Copy task locally and remove from the queue.  This is done within
      // its own scope so that the task object is destructed immediately
      // after running the task.  This is useful in the event that the
      // function contains shared_ptr arguments bound via bind.
      {
        boost::function< void() > task = tasks_.front();
        tasks_.pop();

        lock.unlock();

        // Run the task.
        try
        {
          task();
        }
        // Suppress all exceptions.
        catch ( const std::exception& ) {}
      }

      // Task has finished, so increment count of available threads.
      lock.lock();
      ++available_;
    } // while running_
  }
};