基于参数(名为互斥锁/锁)的 Java 同步

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/12450402/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-31 08:56:36  来源:igfitidea点击:

Java synchronizing based on a parameter (named mutex/lock)

javamultithreadingsynchronized

提问by Doua Beri

I'm looking for a way to synchronize a method based on the parameter it receives, something like this:

我正在寻找一种方法来根据它接收到的参数来同步方法,如下所示:

public synchronized void doSomething(name){
//some code
}

I want the method doSomethingto be synchronized based on the nameparameter like this:

我希望该方法doSomething基于这样的name参数进行同步:

Thread 1: doSomething("a");

线程 1: doSomething("a");

Thread 2: doSomething("b");

线程 2: doSomething("b");

Thread 3: doSomething("c");

线程 3: doSomething("c");

Thread 4: doSomething("a");

线程 4: doSomething("a");

Thread 1 , Thread 2 and Thread 3 will execute the code without being synchronized , but Thread 4 will wait until Thread 1 has finished the code because it has the same "a" value.

线程 1 、线程 2 和线程 3 将在不同步的情况下执行代码,但线程 4 将等到线程 1 完成代码,因为它具有相同的“a”值。

Thanks

谢谢

UPDATE

更新

Based on Tudor explanation I think I'm facing another problem: here is a sample of the new code:

根据 Tudor 的解释,我想我面临另一个问题:这是新代码的示例:

private HashMap locks=new HashMap();
public void doSomething(String name){
    locks.put(name,new Object());
    synchronized(locks.get(name)) {
        // ...
    }
    locks.remove(name);
}

The reason why I don't populate the locks map is because name can have any value.

我不填充锁映射的原因是因为 name 可以有任何值。

Based on the sample above , the problem can appear when adding / deleting values from the hashmap by multiple threads in the same time, since HashMap is not thread-safe.

根据上面的示例,由于 HashMap 不是线程安全的,因此当多个线程同时从 hashmap 中添加/删除值时会出现问题。

So my question is if I make the HashMapa ConcurrentHashMapwhich is thread safe, will the synchronized block stop other threads from accessing locks.get(name) ??

所以我的问题是,如果我使HashMapa 成为ConcurrentHashMap线程安全的,那么同步块会阻止其他线程访问 locks.get(name) 吗??

采纳答案by Tudor

Use a map to associate strings with lock objects:

使用映射将字符串与锁定对象相关联:

Map<String, Object> locks = new HashMap<String, Object>();
locks.put("a", new Object());
locks.put("b", new Object());
// etc.

then:

然后:

public void doSomething(String name){
    synchronized(locks.get(name)) {
        // ...
    }
}

回答by Timmos

The answer of Tudor is fine, but it's static and not scalable. My solution is dynamic and scalable, but it goes with increased complexity in the implementation. The outside world can use this class just like using a Lock, as this class implements the interface. You get an instance of a parameterized lock by the factory method getCanonicalParameterLock.

Tudor 的答案很好,但它是静态的,不可扩展。我的解决方案是动态的和可扩展的,但它会增加实施的复杂性。外界可以像使用 a 一样使用Lock这个类,因为这个类实现了接口。您可以通过工厂方法获得参数化锁的实例getCanonicalParameterLock

package lock;

import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public final class ParameterLock implements Lock {

    /** Holds a WeakKeyLockPair for each parameter. The mapping may be deleted upon garbage collection
     * if the canonical key is not strongly referenced anymore (by the threads using the Lock). */
    private static final Map<Object, WeakKeyLockPair> locks = new WeakHashMap<>();

    private final Object key;
    private final Lock lock;

    private ParameterLock (Object key, Lock lock) {
        this.key = key;
        this.lock = lock;
    }

    private static final class WeakKeyLockPair {
        /** The weakly-referenced parameter. If it were strongly referenced, the entries of
         * the lock Map would never be garbage collected, causing a memory leak. */
        private final Reference<Object> param;
        /** The actual lock object on which threads will synchronize. */
        private final Lock lock;

        private WeakKeyLockPair (Object param, Lock lock) {
            this.param = new WeakReference<>(param);
            this.lock = lock;
        }
    }

    public static Lock getCanonicalParameterLock (Object param) {
        Object canonical = null;
        Lock lock = null;

        synchronized (locks) {
            WeakKeyLockPair pair = locks.get(param);            
            if (pair != null) {                
                canonical = pair.param.get(); // could return null!
            }
            if (canonical == null) { // no such entry or the reference was cleared in the meantime                
                canonical = param; // the first thread (the current thread) delivers the new canonical key
                pair = new WeakKeyLockPair(canonical, new ReentrantLock());
                locks.put(canonical, pair);
            }
        }

        // the canonical key is strongly referenced now...
        lock = locks.get(canonical).lock; // ...so this is guaranteed not to return null
        // ... but the key must be kept strongly referenced after this method returns,
        // so wrap it in the Lock implementation, which a thread of course needs
        // to be able to synchronize. This enforces a thread to have a strong reference
        // to the key, while it isn't aware of it (as this method declares to return a 
        // Lock rather than a ParameterLock).
        return new ParameterLock(canonical, lock);               
    }

    @Override
    public void lock() {
        lock.lock();
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        lock.lockInterruptibly();
    }

    @Override
    public boolean tryLock() {
        return lock.tryLock();
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return lock.tryLock(time, unit);
    }

    @Override
    public void unlock() {
        lock.unlock();
    }

    @Override
    public Condition newCondition() {
        return lock.newCondition();
    }
}

Of course you'd need a canonical key for a given parameter, otherwise threads would not be synchronized as they would be using a different Lock. Canonicalization is the equivalent of the internalization of Strings in Tudor's solution. Where String.intern()is itself thread-safe, my 'canonical pool' is not, so I need extra synchronization on the WeakHashMap.

当然,您需要给定参数的规范密钥,否则线程将不会同步,因为它们将使用不同的锁。规范化相当于 Tudor 解决方案中字符串的内部化。哪里String.intern()本身是线程安全的,我的“规范池”不是,所以我需要在 WeakHashMap 上进行额外的同步。

This solution works for any type of Object. However, make sure to implement equalsand hashCodecorrectly in custom classes, because if not, threading issues will arise as multiple threads could be using different Lock objects to synchronize on!

此解决方案适用于任何类型的对象。但是,一定要落实equalshashCode正确的自定义类,因为如果不是,线程问题那么就会产生多个线程可以使用不同的锁定对象进行同步的!

The choice for a WeakHashMap is explained by the ease of memory management it brings. How else could one know that no thread is using a particular Lock anymore? And if this could be known, how could you safely delete the entry out of the Map? You would need to synchronize upon deletion, because you have a race condition between an arriving thread wanting to use the Lock, and the action of deleting the Lock from the Map. All these things are just solved by using weak references, so the VM does the work for you, and this simplifies the implementation a lot. If you inspected the API of WeakReference, you would find that relying on weak references is thread-safe.

选择 WeakHashMap 的原因是它带来的内存管理方便。否则怎么知道没有线程在使用特定的锁呢?如果可以知道这一点,您如何安全地从地图中删除条目?您需要在删除时进行同步,因为在到达线程想要使用锁和从映射中删除锁的操作之间存在竞争条件。所有这些事情都只是通过使用弱引用来解决的,因此 VM 会为您完成工作,这大大简化了实现。如果你查看了 WeakReference 的 API,你会发现依赖弱引用是线程安全的。

Now inspect this test program (you need to run it from inside the ParameterLock class, due to private visibility of some fields):

现在检查这个测试程序(由于某些字段的私有可见性,您需要从 ParameterLock 类内部运行它):

public static void main(String[] args) {
    Runnable run1 = new Runnable() {

        @Override
        public void run() {
            sync(new Integer(5));
            System.gc();
        }
    };
    Runnable run2 = new Runnable() {

        @Override
        public void run() {
            sync(new Integer(5));
            System.gc();
        }
    };
    Thread t1 = new Thread(run1);
    Thread t2 = new Thread(run2);

    t1.start();
    t2.start();

    try {
        t1.join();
        t2.join();
        while (locks.size() != 0) {
            System.gc();
            System.out.println(locks);
        }
        System.out.println("FINISHED!");
    } catch (InterruptedException ex) {
        // those threads won't be interrupted
    }
}

private static void sync (Object param) {
    Lock lock = ParameterLock.getCanonicalParameterLock(param);
    lock.lock();
    try {
        System.out.println("Thread="+Thread.currentThread().getName()+", lock=" + ((ParameterLock) lock).lock);
        // do some work while having the lock
    } finally {
        lock.unlock();
    }        
}

Chances are very high that you would see that both threads are using the same lock object, and so they are synchronized. Example output:

您很可能会看到两个线程都使用相同的锁对象,因此它们是同步的。示例输出:

Thread=Thread-0, lock=java.util.concurrent.locks.ReentrantLock@8965fb[Locked by thread Thread-0]
Thread=Thread-1, lock=java.util.concurrent.locks.ReentrantLock@8965fb[Locked by thread Thread-1]
FINISHED!

However, with some chance it might be that the 2 threads do not overlap in execution, and therefore it is not required that they use the same lock. You could easily enforce this behavior in debugging mode by setting breakpoints at the right locations, forcing the first or second thread to stop wherever necessary. You will also notice that after the Garbage Collection on the main thread, the WeakHashMap will be cleared, which is of course correct, as the main thread waited for both worker threads to finish their job by calling Thread.join()before calling the garbage collector. This indeed means that no strong reference to the (Parameter)Lock can exist anymore inside a worker thread, so the reference can be cleared from the weak hashmap. If another thread now wants to synchronize on the same parameter, a new Lock will be created in the synchronized part in getCanonicalParameterLock.

但是,有可能这 2 个线程在执行中不重叠,因此不需要它们使用相同的锁。通过在正确的位置设置断点,您可以轻松地在调试模式下强制执行此行为,强制第一个或第二个线程在必要时停止。您还会注意到,在主线程上的垃圾回收之后,WeakHashMap 将被清除,这当然是正确的,因为主线程通过调用等待两个工作线程完成其工作Thread.join()在调用垃圾收集器之前。这确实意味着在工作线程中不再存在对 (Parameter)Lock 的强引用,因此可以从弱哈希图中清除该引用。如果另一个线程现在想要同步同一个参数,则会在同步部分创建一个新的锁getCanonicalParameterLock

Now repeat the test with any pair that has the same canonical representation (= they are equal, so a.equals(b)), and see that it still works:

现在对任何具有相同规范表示(=它们相等,所以a.equals(b))的对重复测试,并查看它仍然有效:

sync("a");
sync(new String("a"))

sync(new Boolean(true));
sync(new Boolean(true));

etc.

等等。

Basically, this class offers you the following functionality:

基本上,此类为您提供以下功能:

  • Parameterized synchronization
  • Encapsulated memory management
  • The ability to work with any type of object (under the condition that equalsand hashCodeis implemented properly)
  • Implements the Lock interface
  • 参数化同步
  • 封装内存管理
  • (该条件下工作的能力,与任何类型的对象equalshashCode被适当地实现的)
  • 实现 Lock 接口

This Lock implementation has been tested by modifying an ArrayList concurrently with 10 threads iterating 1000 times, doing this: adding 2 items, then deleting the last found list entry by iterating the full list. A lock is requested per iteration, so in total 10*1000 locks will be requested. No ConcurrentModificationException was thrown, and after all worker threads have finished the total amount of items was 10*1000. On every single modification, a lock was requested by calling ParameterLock.getCanonicalParameterLock(new String("a")), so a new parameter object is used to test the correctness of the canonicalization.

这个 Lock 实现已经通过同时修改 ArrayList 和 10 个线程迭代 1000 次来测试,这样做是:添加 2 个项目,然后通过迭代完整列表删除最后找到的列表条目。每次迭代都会请求一个锁,因此总共会请求 10*1000 个锁。没有抛出 ConcurrentModificationException,所有工作线程完成后项目总数为 10*1000。在每次修改时,都会通过调用请求锁定ParameterLock.getCanonicalParameterLock(new String("a")),因此使用新的参数对象来测试规范化的正确性。

Please note that you shouldn't be using String literals and primitive types for parameters. As String literals are automatically interned, they always have a strong reference, and so if the first thread arrives with a String literal for its parameter then the lock pool will never be freed from the entry, which is a memory leak. The same story goes for autoboxing primitives: e.g. Integer has a caching mechanism that will reuse existing Integer objects during the process of autoboxing, also causing a strong reference to exist. Addressing this, however, this is a different story.

请注意,您不应该对参数使用字符串文字和原始类型。由于字符串字面量被自动插入,它们始终具有强引用,因此如果第一个线程以其参数的字符串字面量到达,那么锁池将永远不会从条目中释放,这是内存泄漏。自动装箱原语也是如此:例如,Integer 有一个缓存机制,可以在自动装箱过程中重用现有的 Integer 对象,这也会导致存在强引用。然而,解决这个问题,这是一个不同的故事。

回答by Triet Doan

TL;DR:

特尔;博士:

I use ConcurrentReferenceHashMapfrom the Spring Framework. Please check the code below.

我使用Spring Framework 中的ConcurrentReferenceHashMap。请检查下面的代码。



Although this thread is old, it is still interesting. Therefore, I would like to share my approach with Spring Framework.

这个话题虽然老了,但还是很有趣。因此,我想与 Spring Framework 分享我的方法。

What we are trying to implement is called named mutex/lock. As suggested by Tudor's answer, the idea is to have a Mapto store the lock name and the lock object. The code will look like below (I copy it from his answer):

我们正在尝试实现的称为命名互斥锁/锁。正如Tudor's answer所建议,这个想法是有一个Map来存储锁名称和锁对象。代码如下所示(我从他的回答中复制):

Map<String, Object> locks = new HashMap<String, Object>();
locks.put("a", new Object());
locks.put("b", new Object());

However, this approach has 2 drawbacks:

但是,这种方法有两个缺点:

  1. The OP already pointed out the first one: how to synchronize the access to the lockshash map?
  2. How to remove some locks which are not necessary anymore? Otherwise, the lockshash map will keep growing.
  1. OP已经指出了第一个:如何同步对locks哈希映射的访问?
  2. 如何去除一些不再需要的锁?否则,locks哈希映射将继续增长。

The first problem can be solved by using ConcurrentHashMap. For the second problem, we have 2 options: manually check and remove locks from the map, or somehow let the garbage collector knows which locks are no longer used and the GC will remove them. I will go with the second way.

第一个问题可以通过使用ConcurrentHashMap来解决。对于第二个问题,我们有两个选择:手动检查并从地图中移除锁,或者以某种方式让垃圾收集器知道哪些锁不再使用,GC 将移除它们。我会选择第二种方式。

When we use HashMap, or ConcurrentHashMap, it creates strong references. To implement the solution discussed above, weak references should be used instead (to understand what is a strong/weak reference, please refer to this articleor this post).

当我们使用HashMap, or 时ConcurrentHashMap,它会创建强引用。为了实现上面讨论的解决方案,应该使用弱引用来代替(要了解什么是强/弱引用,请参阅这篇文章这篇文章)。



So, I use ConcurrentReferenceHashMapfrom the Spring Framework. As described in the documentation:

所以,我使用Spring Framework 中的ConcurrentReferenceHashMap。如文档中所述:

A ConcurrentHashMapthat uses soft or weak references for both keys and values.

This class can be used as an alternative to Collections.synchronizedMap(new WeakHashMap<K, Reference<V>>())in order to support better performance when accessed concurrently. This implementation follows the same design constraints as ConcurrentHashMapwith the exception that null values and null keys are supported.

一个ConcurrentHashMap使用的键和值软或弱引用。

此类可用作替代以 Collections.synchronizedMap(new WeakHashMap<K, Reference<V>>())在并发访问时支持更好的性能。ConcurrentHashMap除了支持空值和空键之外,此实现遵循相同的设计约束 。

Here is my code. The MutexFactorymanages all the locks with <K>is the type of the key.

这是我的代码。该MutexFactory负责管理所有的锁用<K>是关键的类型。

@Component
public class MutexFactory<K> {

    private ConcurrentReferenceHashMap<K, Object> map;

    public MutexFactory() {
        this.map = new ConcurrentReferenceHashMap<>();
    }

    public Object getMutex(K key) {
        return this.map.compute(key, (k, v) -> v == null ? new Object() : v);
    }
}

Usage:

用法:

@Autowired
private MutexFactory<String> mutexFactory;

public void doSomething(String name){
    synchronized(mutexFactory.getMutex(name)) {
        // ...
    }
}

Unit test (this test uses the awaitilitylibrary for some methods, e.g. await(), atMost(), until()):

单元测试(此测试使用awaitility库来处理某些方法,例如await(), atMost(), until()):

public class MutexFactoryTests {
    private final int THREAD_COUNT = 16;

    @Test
    public void singleKeyTest() {
        MutexFactory<String> mutexFactory = new MutexFactory<>();
        String id = UUID.randomUUID().toString();
        final int[] count = {0};

        IntStream.range(0, THREAD_COUNT)
                .parallel()
                .forEach(i -> {
                    synchronized (mutexFactory.getMutex(id)) {
                        count[0]++;
                    }
                });
        await().atMost(5, TimeUnit.SECONDS)
                .until(() -> count[0] == THREAD_COUNT);
        Assert.assertEquals(count[0], THREAD_COUNT);
    }
}

回答by Thomas Stubbe

I've found a proper answer through another stackoverflow question: How to acquire a lock by a key

我通过另一个 stackoverflow 问题找到了正确的答案:如何通过钥匙获取锁

I copied the answer here:

我在这里复制了答案:

Guava has something like this being released in 13.0; you can get it out of HEAD if you like.

Guava 有类似的东西在 13.0 中发布;如果你愿意,你可以把它弄出来。

Striped more or less allocates a specific number of locks, and then assigns strings to locks based on their hash code. The API looks more or less like

Striped 或多或少地分配特定数量的锁,然后根据它们的哈希码将字符串分配给锁。API 看起来或多或少像

Striped<Lock> locks = Striped.lock(stripes);
Lock l = locks.get(string);
l.lock();
try {
  // do stuff 
} finally {
  l.unlock();
}

More or less, the controllable number of stripes lets you trade concurrency against memory usage, because allocating a full lock for each string key can get expensive; essentially, you only get lock contention when you get hash collisions, which are (predictably) rare.

或多或少,可控的条带数量使您可以在并发性与内存使用之间进行权衡,因为为每个字符串键分配一个完整的锁可能会变得昂贵;本质上,只有在发生哈希冲突时才会发生锁争用,而这种冲突(可预测)很少见。

回答by R. Oosterholt

I've created a tokenProvider based on the IdMutexProviderof McDowell. The manager uses a WeakHashMapwhich takes care of cleaning up unused locks.

我已经基于McDowell的IdMutexProvider创建了一个 tokenProvider 。管理器使用 aWeakHashMap来清理未使用的锁。

You could find my implementation here.

你可以在这里找到我的实现。

回答by Njax3SmmM2x2a0Zf7Hpd

Check out this framework. Seems you're looking for something like this.

看看这个框架。看来你正在寻找这样的东西。

public class WeatherServiceProxy {
...
private final KeyLockManager lockManager = KeyLockManagers.newManager();

public void updateWeatherData(String cityName, Date samplingTime, float temperature) {
        lockManager.executeLocked(cityName, new LockCallback() {
                public void doInLock() {
                        delegate.updateWeatherData(cityName, samplingTime, temperature);
                }
        });
}

https://code.google.com/p/jkeylockmanager/

https://code.google.com/p/jkeylockmanager/

回答by JamesP

I've used a cache to store lock objects. The my cache will expire objects after a period, which really only needs to be longer that the time it takes the synchronized process to run

我使用缓存来存储锁定对象。我的缓存将在一段时间后使对象过期,这实际上只需要比同步进程运行所需的时间更长

`

`

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;

...

private final Cache<String, Object> mediapackageLockCache = CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_EXPIRE, TimeUnit.SECONDS).build();

...

public void doSomething(foo) {
    Object lock = mediapackageLockCache.getIfPresent(foo.toSting());
    if (lock == null) {
        lock = new Object();
        mediapackageLockCache.put(foo.toString(), lock);
    }

    synchronized(lock) {
        // execute code on foo
        ...
    }
}

`

`

回答by Tom Petrillo

I have a much simpler, scalable implementation akin to @timmons post taking advantage of guavas LoadingCachewith weakValues. You will want to read the help files on "equality" to understand the suggestion I have made.

我有一个更简单的,可扩展的实现类似于@timmons邮政服用番石榴的优势LoadingCacheweakValues。您将需要阅读有关“平等”的帮助文件以了解我提出的建议。

Define the following weakValued cache.

定义以下弱值缓存。

private final LoadingCache<String,String> syncStrings = CacheBuilder.newBuilder().weakValues().build(new CacheLoader<String, String>() {
    public String load(String x) throws ExecutionException {
        return new String(x);
    }
});

public void doSomething(String x) {
      x = syncStrings.get(x);
      synchronized(x) {
          ..... // whatever it is you want to do
      }
}

Now! As a result of the JVM, we do not have to worry that the cache is growing too large, it only holds the cached strings as long as necessary and the garbage manager/guava does the heavy lifting.

现在!作为 JVM 的结果,我们不必担心缓存增长过大,它只在必要时保存缓存的字符串,垃圾管理器/番石榴承担了繁重的工作。