事件/任务队列多线程 C++
声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow
原文地址: http://stackoverflow.com/questions/923922/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me):
StackOverFlow
Event / Task Queue Multithreading C++
提问by Ben Reeves
I would like to create a class whose methods can be called from multiple threads. but instead of executing the method in the thread from which it was called, it should perform them all in it's own thread. No result needs to be returned and It shouldn't block the calling thread.
我想创建一个可以从多个线程调用其方法的类。但不是在调用它的线程中执行该方法,而是应该在它自己的线程中执行它们。不需要返回任何结果,它不应该阻塞调用线程。
A first attempt Implementation I have included below. The public methods insert a function pointer and data into a job Queue, which the worker thread then picks up. However it's not particularily nice code and adding new methods is cumbersome.
我在下面包含的第一次尝试实现。公共方法将一个函数指针和数据插入到工作队列中,然后工作线程会选择该队列。然而,它不是特别好的代码,添加新方法很麻烦。
Ideally I would like to use this as a base class which I can easy add methods (with a variable number of arguments) with minimum hastle and code duplication.
理想情况下,我想将它用作基类,我可以轻松地添加方法(具有可变数量的参数),并且尽量减少麻烦和代码重复。
What is a better way to do this? Is there any existing code available which does something similar? Thanks
有什么更好的方法可以做到这一点?是否有任何现有的代码可以做类似的事情?谢谢
#include <queue>
using namespace std;
class GThreadObject
{
class event
{
public:
void (GThreadObject::*funcPtr)(void *);
void * data;
};
public:
void functionOne(char * argOne, int argTwo);
private:
void workerThread();
queue<GThreadObject::event*> jobQueue;
void functionOneProxy(void * buffer);
void functionOneInternal(char * argOne, int argTwo);
};
#include <iostream>
#include "GThreadObject.h"
using namespace std;
/* On a continuous loop, reading tasks from queue
* When a new event is received it executes the attached function pointer
* It should block on a condition, but Thread code removed to decrease clutter
*/
void GThreadObject::workerThread()
{
//New Event added, process it
GThreadObject::event * receivedEvent = jobQueue.front();
//Execute the function pointer with the attached data
(*this.*receivedEvent->funcPtr)(receivedEvent->data);
}
/*
* This is the public interface, Can be called from child threads
* Instead of executing the event directly it adds it to a job queue
* Then the workerThread picks it up and executes all tasks on the same thread
*/
void GThreadObject::functionOne(char * argOne, int argTwo)
{
//Malloc an object the size of the function arguments
int argumentSize = sizeof(char*)+sizeof(int);
void * myData = malloc(argumentSize);
//Copy the data passed to this function into the buffer
memcpy(myData, &argOne, argumentSize);
//Create the event and push it on to the queue
GThreadObject::event * myEvent = new event;
myEvent->data = myData;
myEvent->funcPtr = >hreadObject::functionOneProxy;
jobQueue.push(myEvent);
//This would be send a thread condition signal, replaced with a simple call here
this->workerThread();
}
/*
* This handles the actual event
*/
void GThreadObject::functionOneInternal(char * argOne, int argTwo)
{
cout << "We've made it to functionTwo char*:" << argOne << " int:" << argTwo << endl;
//Now do the work
}
/*
* This is the function I would like to remove if possible
* Split the void * buffer into arguments for the internal Function
*/
void GThreadObject::functionOneProxy(void * buffer)
{
char * cBuff = (char*)buffer;
functionOneInternal((char*)*((unsigned int*)cBuff), (int)*(cBuff+sizeof(char*)));
};
int main()
{
GThreadObject myObj;
myObj.functionOne("My Message", 23);
return 0;
}
采纳答案by Nikolai Fetissov
There's Futureslibrary making its way into Boostand the C++ standard library. There's also something of the same sort in ACE, but I would hate to recommend it to anyone (as @lothar already pointed out, it's Active Object.)
有期货库正在进入加速和C ++标准库。ACE 中也有类似的东西,但我不想向任何人推荐它(正如@lothar 已经指出的那样,它是 Active Object。)
回答by Duck
回答by nhaa123
You can solve this by using Boost's Thread -library. Something like this (half-pseudo):
您可以使用 Boost 的 Thread -library 来解决这个问题。像这样(半伪):
class GThreadObject
{
...
public:
GThreadObject()
: _done(false)
, _newJob(false)
, _thread(boost::bind(>hreadObject::workerThread, this))
{
}
~GThreadObject()
{
_done = true;
_thread.join();
}
void functionOne(char *argOne, int argTwo)
{
...
_jobQueue.push(myEvent);
{
boost::lock_guard l(_mutex);
_newJob = true;
}
_cond.notify_one();
}
private:
void workerThread()
{
while (!_done) {
boost::unique_lock l(_mutex);
while (!_newJob) {
cond.wait(l);
}
Event *receivedEvent = _jobQueue.front();
...
}
}
private:
volatile bool _done;
volatile bool _newJob;
boost::thread _thread;
boost::mutex _mutex;
boost::condition_variable _cond;
std::queue<Event*> _jobQueue;
};
Also, please note how RAIIallow us to get this code smaller and better to manage.
另外,请注意RAII如何让我们使此代码更小并更好地管理。
回答by nhaa123
Here's a class I wrote for a similar purpose (I use it for event handling but you could of course rename it to ActionQueue -- and rename its methods).
这是我为类似目的编写的一个类(我将它用于事件处理,但您当然可以将其重命名为 ActionQueue —— 并重命名其方法)。
You use it like this:
你像这样使用它:
With function you want to call: void foo (const int x, const int y) { /*...*/ }
使用您要调用的函数: void foo (const int x, const int y) { /*...*/ }
And: EventQueue q;
和: EventQueue q;
q.AddEvent (boost::bind (foo, 10, 20));
q.AddEvent (boost::bind (foo, 10, 20));
In the worker thread
在工作线程中
q.PlayOutEvents ();
q.PlayOutEvents();
Note: It should be fairly easy to add code to block on condition to avoid using up CPU cycles.
注意:将代码添加到条件块中以避免占用 CPU 周期应该相当容易。
The code (Visual Studio 2003 with boost 1.34.1):
代码(Visual Studio 2003 with boost 1.34.1):
#pragma once
#include <boost/thread/recursive_mutex.hpp>
#include <boost/function.hpp>
#include <boost/signals.hpp>
#include <boost/bind.hpp>
#include <boost/foreach.hpp>
#include <string>
using std::string;
// Records & plays out actions (closures) in a safe-thread manner.
class EventQueue
{
typedef boost::function <void ()> Event;
public:
const bool PlayOutEvents ()
{
// The copy is there to ensure there are no deadlocks.
const std::vector<Event> eventsCopy = PopEvents ();
BOOST_FOREACH (const Event& e, eventsCopy)
{
e ();
Sleep (0);
}
return eventsCopy.size () > 0;
}
void AddEvent (const Event& event)
{
Mutex::scoped_lock lock (myMutex);
myEvents.push_back (event);
}
protected:
const std::vector<Event> PopEvents ()
{
Mutex::scoped_lock lock (myMutex);
const std::vector<Event> eventsCopy = myEvents;
myEvents.clear ();
return eventsCopy;
}
private:
typedef boost::recursive_mutex Mutex;
Mutex myMutex;
std::vector <Event> myEvents;
};
I hope this helps. :)
我希望这有帮助。:)
Martin Bilski
马丁·比尔斯基
回答by Ben Reeves
Below is an implementation which doesn't require a "functionProxy" method. Even though it is easier to add new methods, it's still messy.
下面是一个不需要“functionProxy”方法的实现。尽管添加新方法更容易,但它仍然很混乱。
Boost::Bind and "Futures" do seem like they would tidy a lot of this up. I guess I'll have a look at the boost code and see how it works. Thanks for your suggestions everyone.
Boost::Bind 和“Futures”看起来确实会整理很多东西。我想我会看看 boost 代码,看看它是如何工作的。谢谢大家的建议。
GThreadObject.h
线程对象.h
#include <queue>
using namespace std;
class GThreadObject
{
template <int size>
class VariableSizeContainter
{
char data[size];
};
class event
{
public:
void (GThreadObject::*funcPtr)(void *);
int dataSize;
char * data;
};
public:
void functionOne(char * argOne, int argTwo);
void functionTwo(int argTwo, int arg2);
private:
void newEvent(void (GThreadObject::*)(void*), unsigned int argStart, int argSize);
void workerThread();
queue<GThreadObject::event*> jobQueue;
void functionTwoInternal(int argTwo, int arg2);
void functionOneInternal(char * argOne, int argTwo);
};
GThreadObject.cpp
GThreadObject.cpp
#include <iostream>
#include "GThreadObject.h"
using namespace std;
/* On a continuous loop, reading tasks from queue
* When a new event is received it executes the attached function pointer
* Thread code removed to decrease clutter
*/
void GThreadObject::workerThread()
{
//New Event added, process it
GThreadObject::event * receivedEvent = jobQueue.front();
/* Create an object the size of the stack the function is expecting, then cast the function to accept this object as an argument.
* This is the bit i would like to remove
* Only supports 8 byte argument size e.g 2 int's OR pointer + int OR myObject8bytesSize
* Subsequent data sizes would need to be added with an else if
* */
if (receivedEvent->dataSize == 8)
{
const int size = 8;
void (GThreadObject::*newFuncPtr)(VariableSizeContainter<size>);
newFuncPtr = (void (GThreadObject::*)(VariableSizeContainter<size>))receivedEvent->funcPtr;
//Execute the function
(*this.*newFuncPtr)(*((VariableSizeContainter<size>*)receivedEvent->data));
}
//Clean up
free(receivedEvent->data);
delete receivedEvent;
}
void GThreadObject::newEvent(void (GThreadObject::*funcPtr)(void*), unsigned int argStart, int argSize)
{
//Malloc an object the size of the function arguments
void * myData = malloc(argSize);
//Copy the data passed to this function into the buffer
memcpy(myData, (char*)argStart, argSize);
//Create the event and push it on to the queue
GThreadObject::event * myEvent = new event;
myEvent->data = (char*)myData;
myEvent->dataSize = argSize;
myEvent->funcPtr = funcPtr;
jobQueue.push(myEvent);
//This would be send a thread condition signal, replaced with a simple call here
this->workerThread();
}
/*
* This is the public interface, Can be called from child threads
* Instead of executing the event directly it adds it to a job queue
* Then the workerThread picks it up and executes all tasks on the same thread
*/
void GThreadObject::functionOne(char * argOne, int argTwo)
{
newEvent((void (GThreadObject::*)(void*))>hreadObject::functionOneInternal, (unsigned int)&argOne, sizeof(char*)+sizeof(int));
}
/*
* This handles the actual event
*/
void GThreadObject::functionOneInternal(char * argOne, int argTwo)
{
cout << "We've made it to functionOne Internal char*:" << argOne << " int:" << argTwo << endl;
//Now do the work
}
void GThreadObject::functionTwo(int argOne, int argTwo)
{
newEvent((void (GThreadObject::*)(void*))>hreadObject::functionTwoInternal, (unsigned int)&argOne, sizeof(int)+sizeof(int));
}
/*
* This handles the actual event
*/
void GThreadObject::functionTwoInternal(int argOne, int argTwo)
{
cout << "We've made it to functionTwo Internal arg1:" << argOne << " int:" << argTwo << endl;
}
main.cpp
主程序
#include <iostream>
#include "GThreadObject.h"
int main()
{
GThreadObject myObj;
myObj.functionOne("My Message", 23);
myObj.functionTwo(456, 23);
return 0;
}
Edit: Just for completeness I did an implementation with Boost::bind. Key Differences:
编辑:为了完整起见,我使用 Boost::bind 进行了实现。主要区别:
queue<boost::function<void ()> > jobQueue;
void GThreadObjectBoost::functionOne(char * argOne, int argTwo)
{
jobQueue.push(boost::bind(>hreadObjectBoost::functionOneInternal, this, argOne, argTwo));
workerThread();
}
void GThreadObjectBoost::workerThread()
{
boost::function<void ()> func = jobQueue.front();
func();
}
Using the boost implementation for 10,000,000 Iterations of functionOne() it took ~19sec. However the non boost implementation took only ~6.5 sec. So Approx 3x slower. I'm guessing finding a good non-locking queue will be the biggest performance bottle neck here. But it's still quite a big difference.
使用 boost 实现 10,000,000 次 functionOne() 迭代需要大约 19 秒。然而,非 boost 实现只用了大约 6.5 秒。所以大约慢了 3 倍。我猜想找到一个好的非锁定队列将是这里最大的性能瓶颈。但它仍然有很大的不同。
回答by Ignas Limanauskas
For extensibility and maintainability (and other -bilities) you could define an abstract class (or interface) for the "job" that thread is to perform. Then user(s) of your thread pool would implement this interface and give reference to the object to the thread pool. This is very similar to Symbian Active Object design: every AO subclasses CActive and have to implement methods such as Run() and Cancel().
对于可扩展性和可维护性(以及其他能力),您可以为线程要执行的“作业”定义一个抽象类(或接口)。然后线程池的用户将实现此接口并将对象引用给线程池。这与 Symbian Active Object 设计非常相似:每个 AO 都是 CActive 的子类,并且必须实现诸如 Run() 和 Cancel() 之类的方法。
For simplicity your interface (abstract class) might be as simple as:
为简单起见,您的界面(抽象类)可能很简单:
class IJob
{
virtual Run()=0;
};
Then the thread pool, or single thread accepting requests would have something like:
然后线程池或接受请求的单线程将具有以下内容:
class CThread
{
<...>
public:
void AddJob(IJob* iTask);
<...>
};
Naturally you would have multiple tasks that can have all kinds of extra setters / getters / attributes and whatever you need in any walk of life. However, the only must is to implement method Run(), which would perform the lengthy calculations:
自然地,您将有多个任务,这些任务可以具有各种额外的 setter/getter/attributes 以及您在任何生活中需要的任何东西。但是,唯一必须实现的方法是 Run(),该方法将执行冗长的计算:
class CDumbLoop : public IJob
{
public:
CDumbJob(int iCount) : m_Count(iCount) {};
~CDumbJob() {};
void Run()
{
// Do anything you want here
}
private:
int m_Count;
};
回答by lothar
You might be interested in Active Objectone of the ACE Patternsof the ACE framework.
您可能对ACE 框架的ACE 模式之一的Active Object感兴趣。
As Nikolaipointed out futuresare plannedfor standard C++ some time in the future (pun intended).
回答by Dan
You should take a look at the Boost ASIO library. It is designed to dispatch events asynchronously. It can be paired with the Boost Thread library to build the system that you described.
您应该看看 Boost ASIO 库。它旨在异步调度事件。它可以与 Boost Thread 库配对以构建您描述的系统。
You would need to instantiate a single boost::asio::io_service
object and schedule a series of asynchronous events (boost::asio::io_service::post
or boost::asio::io_service::dispatch
). Next, you call the run
member function from nthreads. The io_service
object is thread-safe and guarantees that your asynchronous handlers will only be dispatched in a thread from which you called io_service::run
.
您需要实例化单个boost::asio::io_service
对象并安排一系列异步事件(boost::asio::io_service::post
或boost::asio::io_service::dispatch
)。接下来,您run
从n 个线程调用成员函数。该io_service
对象是线程安全的,并保证您的异步处理程序只会在您调用 的线程中分派io_service::run
。
The boost::asio::strand
object is also useful for simple thread synchronization.
该boost::asio::strand
对象对于简单的线程同步也很有用。
For what it is worth, I think that the ASIO library is a very elegant solution to this problem.
就其价值而言,我认为 ASIO 库是这个问题的一个非常优雅的解决方案。