如何在C ++中使用任意字符串作为锁?
假设我有一个多线程C ++程序,该程序以对handleRequest(string key)的函数调用的形式处理请求。每个对" handleRequest"的调用都发生在一个单独的线程中,并且" key"有许多可能的值。
我想要以下行为:
- 当它们对
key的值相同时,对handleRequest(key)的同时调用将被序列化。 - 全局序列化被最小化。
handleRequest的主体可能看起来像这样:
void handleRequest(string key) {
KeyLock lock(key);
// Handle the request.
}
问题:如何实现" KeyLock"以获取所需的行为?
一个幼稚的实现可能会像这样开始:
KeyLock::KeyLock(string key) {
global_lock->Lock();
internal_lock_ = global_key_map[key];
if (internal_lock_ == NULL) {
internal_lock_ = new Lock();
global_key_map[key] = internal_lock_;
}
global_lock->Unlock();
internal_lock_->Lock();
}
KeyLock::~KeyLock() {
internal_lock_->Unlock();
// Remove internal_lock_ from global_key_map iff no other threads are waiting for it.
}
...但是这需要在每个请求的开始和结束处使用全局锁定,并为每个请求创建单独的Lock对象。如果在对" handleRequest"的调用之间争用很高,那么这可能不是问题,但是如果争用很低,则可能会带来很多开销。
解决方案
我们可以执行与问题中类似的操作,但不是使用一个global_key_map而是使用几个(可能在数组或者向量中),是由字符串上的一些简单哈希函数确定的。
这样,我们可以将其分散到几个独立的锁中,而不是单个全局锁。
这是内存分配器中经常使用的模式(我不知道该模式是否具有应有的名称)。当请求进入时,由某人确定分配将来自哪个池(通常是请求的大小,但是其他参数也可以考虑),则仅需要锁定该池。如果分配请求来自使用另一个池的另一个线程,则没有锁争用。
这将取决于平台,但是我尝试使用的两种技术是:
- 使用命名的互斥锁/同步对象,其中对象名称=键
- 使用基于文件系统的锁定,在此尝试尝试使用密钥名称创建不可共享的临时文件。如果已经存在(=已锁定),它将失败,并且我们必须轮询才能重试
两种技术都将取决于操作系统的详细信息。进行实验,看看哪个可行。
。
可能是我们想要的std :: map <std :: string,MutexType>,其中MutexType是所需互斥锁的类型。我们可能必须将对映射的访问权包装在另一个互斥锁中,以确保没有其他线程同时插入(并记住在互斥锁锁定之后再次执行检查,以确保另一个线程没有添加该互斥锁。在等待互斥时按下键!)。
相同的原理可以应用于任何其他同步方法,例如关键部分。
提高粒度并锁定整个键范围
这是Mike B答案的一种变体,在这里,我们没有一个可变的锁映射,而是有一个应用于键范围而不是单个键的固定锁阵列。
简化的示例:在启动时创建256个锁的数组,然后使用密钥的第一个字节确定要获取的锁的索引(即,所有以'k'开头的密钥都将由" locks [107]"保护)。
为了维持最佳吞吐量,我们应该分析密钥的分配和争用率。这种方法的好处是零动态分配和简单的清理;我们还可以避免两步锁定。不利的一面是,如果密钥分配随着时间的推移而出现偏差,那么潜在的竞争峰值就会出现。
考虑一下之后,另一种方法可能是这样的:
- 在
handleRequest中,创建一个执行实际工作的Callback。 - 创建一个由互斥锁保护的
multimap <string,Callback *> global_key_map。 - 如果一个线程看到
key已经被处理过,它将Callback *添加到global_key_map中并返回。 - 否则,它将立即调用其回调,然后针对相同的键调用同时显示的回调。
实现了这样的事情:
LockAndCall(string key, Callback* callback) {
global_lock.Lock();
if (global_key_map.contains(key)) {
iterator iter = global_key_map.insert(key, callback);
while (true) {
global_lock.Unlock();
iter->second->Call();
global_lock.Lock();
global_key_map.erase(iter);
iter = global_key_map.find(key);
if (iter == global_key_map.end()) {
global_lock.Unlock();
return;
}
}
} else {
global_key_map.insert(key, callback);
global_lock.Unlock();
}
}
这样做的好处是释放了本来等待键锁定的线程,但除此之外,它与我在问题中发布的天真的解决方案几乎相同。
不过,可以将其与Mike B和Constantin给出的答案结合起来。
/**
* StringLock class for string based locking mechanism
* e.g. usage
* StringLock strLock;
* strLock.Lock("row1");
* strLock.UnLock("row1");
*/
class StringLock {
public:
/**
* Constructor
* Initializes the mutexes
*/
StringLock() {
pthread_mutex_init(&mtxGlobal, NULL);
}
/**
* Lock Function
* The thread will return immediately if the string is not locked
* The thread will wait if the string is locked until it gets a turn
* @param string the string to lock
*/
void Lock(string lockString) {
pthread_mutex_lock(&mtxGlobal);
TListIds *listId = NULL;
TWaiter *wtr = new TWaiter;
wtr->evPtr = NULL;
wtr->threadId = pthread_self();
if (lockMap.find(lockString) == lockMap.end()) {
listId = new TListIds();
listId->insert(listId->end(), wtr);
lockMap[lockString] = listId;
pthread_mutex_unlock(&mtxGlobal);
} else {
wtr->evPtr = new Event(false);
listId = lockMap[lockString];
listId->insert(listId->end(), wtr);
pthread_mutex_unlock(&mtxGlobal);
wtr->evPtr->Wait();
}
}
/**
* UnLock Function
* @param string the string to unlock
*/
void UnLock(string lockString) {
pthread_mutex_lock(&mtxGlobal);
TListIds *listID = NULL;
if (lockMap.find(lockString) != lockMap.end()) {
lockMap[lockString]->pop_front();
listID = lockMap[lockString];
if (!(listID->empty())) {
TWaiter *wtr = listID->front();
Event *thdEvent = wtr->evPtr;
thdEvent->Signal();
} else {
lockMap.erase(lockString);
delete listID;
}
}
pthread_mutex_unlock(&mtxGlobal);
}
protected:
struct TWaiter {
Event *evPtr;
long threadId;
};
StringLock(StringLock &);
void operator=(StringLock&);
typedef list TListIds;
typedef map TMapLockHolders;
typedef map TMapLockWaiters;
private:
pthread_mutex_t mtxGlobal;
TMapLockWaiters lockMap;
};

