java ConcurrentHashMap 的线程安全

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/12115727/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-10-31 07:39:02  来源:igfitidea点击:

Thread safety with ConcurrentHashMap

javamultithreadingconcurrencythread-safety

提问by blueSky

I have the following class. I use ConcurrentHashMap. I have many threads writing to the maps and a Timer that saves the data in the map every 5 minutes. I manage to achieve thread safety by using putIfAbsent() when I write entries in the map. However, when I read from it and then remove all entries by clear() method, I want no other thread writes to map while I'm in the process of reading the map contents and then removing them. Obviously my code is not threadsafe even with synchronized(lock){}, b/c the thread that owns the lock in saveEntries(), is not necessarily the same thread that writes into my maps in log() method! Unless I lock the whole code in log() with the same lock object!

我有以下课程。我使用 ConcurrentHashMap。我有很多线程写入地图和一个计时器,每 5 分钟保存一次地图中的数据。当我在映射中写入条目时,我设法通过使用 putIfAbsent() 来实现线程安全。但是,当我从中读取然后通过 clear() 方法删除所有条目时,我不希望在读取地图内容然后删除它们的过程中没有其他线程写入映射。显然,即使使用 synchronized(lock){},我的代码也不是线程安全的,b/c 在 saveEntries() 中拥有锁的线程不一定是在 log() 方法中写入我的映射的同一个线程!除非我使用相同的锁定对象将 log() 中的整个代码锁定!

I was wondering is there any other way to achieve thread safety w/o enforcing synchronizing by an external lock? Any help is greatly appreciated.

我想知道有没有其他方法可以在不通过外部锁强制同步的情况下实现线程安全?任何帮助是极大的赞赏。

public class Logging {

private static Logging instance;    
private static final String vendor1 = "vendor1";
private static final String vendor2 = "vendor2";    
private static long delay = 5 * 60 * 1000;

private ConcurrentMap<String, Event> vendor1Calls = new ConcurrentHashMap<String, Event>();
private ConcurrentMap<String, Event> vendor2Calls = new ConcurrentHashMap<String, Event>();

private Timer timer;    
private final Object lock = new Object();

private Logging(){
    timer = new Timer();                
    timer.schedule(new TimerTask() {
        public void run() {
            try {
                saveEntries();
            } catch (Throwable t) {
                timer.cancel();
                timer.purge();
            }
        }       
    }, 0, delay);
}

public static synchronized Logging getInstance(){     
    if (instance == null){
        instance = new Logging();
    }
    return instance;
 }

public void log(){      
    ConcurrentMap<String, Event> map;
    String key = "";        

    if (vendor1.equalsIgnoreCase(engine)){
        map = vendor1Calls;
    }else if(vendor2.equalsIgnoreCase(engine)){  
        map = vendor2Calls;
    }else{
        return;
    }       


    key = service + "." + method;
// It would be the code if I use a regular HashMap instead of ConcurrentHashMap
    /*Event event = map.get(key);       

    // Map does not contain this service.method, create an Event for the first     time.
    if(event == null){
        event = new Event();            
        map.put(key, event);

        // Map already contains this key, just adjust the numbers.
    }else{
        // Modify the object fields
    }*/
    //}

    // Make it thread-safe using CHM
    Event newEvent = new Event();
    Event existingEvent= map.putIfAbsent(key, newEvent); 

    if(existingEvent!=null && existingEvent!=newEvent){
        // Modify the object fields
}       

private void saveEntries(){

    Map<String, List<Event>> engineCalls = null;
    try {           

        engineCalls = new HashMap<String, List<Event>>();
        List<Event> events = null;

// How can I achieve therad safety here w/o applying any lock?
        //synchronized(lock){
            if(!vendor1Calls.isEmpty()){
                events = new ArrayList<Event>();
                events.addAll(vendor1Calls.values());
                engineCalls.put(vendor1, events);
                vendor1Calls.clear();
            }
            if(!vendor2Calls.isEmpty()){
                events = new ArrayList<Event>();
                events.addAll(vendor2Calls.values());
                engineCalls.put(vendor2, events);
                vendor2Calls.clear();
            }
        //}

// logICalls() saves the events in the DB.          
        DBHandle.logCalls(engineCalls);
    } catch (Throwable t) {         
    } finally {
        if(engineCalls!=null){
            engineCalls.clear();
        }                       
    }   
}       

}

}

采纳答案by Kevin Jin

However, when I read from it and then remove all entries by clear() method, I want no other thread writes to map while I'm in the process of reading the map contents and then removing them.

但是,当我从中读取然后通过 clear() 方法删除所有条目时,我不希望在读取地图内容然后删除它们的过程中没有其他线程写入映射。

I think what you're trying to say is that you don't really care about strictly locking the maps. Instead, you only really care about the loss of any log entries between the vender1Calls.values() and vendor1Calls.clear(), correct?

我认为您想说的是您并不真正关心严格锁定地图。相反,您只关心在 vender1Calls.values() 和 vendor1Calls.clear() 之间丢失任何日志条目,对吗?

In that is the case, I can imagine that you can replace

在这种情况下,我可以想象你可以替换

events.addAll(vendor1Calls.values());
vendor1Calls.clear();

with this in saveEntries:

在 saveEntries 中使用这个:

for (Iterator<Event> iter = vendor1Calls.values().iterator(); iter.hasNext(); ) {
    Event e = iter.next();
    events.add(e);
    iter.remove();
}

That way, you only remove the Events that you added to the events List already. You can still write to the vendor1Calls maps while saveEntries() is still executing, but the iterator skips the values added.

这样,您只需删除已添加到事件列表中的事件。当 saveEntries() 仍在执行时,您仍然可以写入 vendor1Calls 映射,但迭代器会跳过添加的值。

回答by John Vint

Without any external synchronization you cannot achieve this with a CHM. The Iterator views returned are weakly consistent which means the contents of the Map can change while you are actually iterating over it.

如果没有任何外部同步,您将无法通过 CHM 实现这一目标。返回的 Iterator 视图是弱一致的,这意味着 Map 的内容在您实际对其进行迭代时可能会发生变化。

It appears you would need to use a Collections.synchronizedMapto get the functionality you are looking for.

看来您需要使用 aCollections.synchronizedMap来获得您正在寻找的功能。

Edit to make my point more clear:

编辑以使我的观点更清楚:

To achieve this with a synchronizedMapYou would first have to synchronizeon the Map and then you can iterate or copy the contents into another map an then clear.

要实现这一点,synchronizedMap你首先必须synchronize在地图上,然后你可以迭代或复制内容到另一个地图,然后清除。

Map map = Collections.synchronizedMap(new HashMap());

public void work(){
  Map local = new HashMap();
  synchronized(map){
     local.putAll(map);
     map.clear();
  }
  //do work on local instance 
}

Instead of the localinstance, as I mentioned, you can iterate + removesimilar to @Kevin Jin's answer.

local正如我所提到的,您可以iterate + remove类似于@Kevin Jin 的回答,而不是实例。

回答by jtahlborn

An atomic version of this example is shown in this thread(using only the features in ConcurrentMap).

此线程中显示了此示例的原子版本(仅使用 ConcurrentMap 中的功能)。

回答by OldCurmudgeon

My best suggestion would be to use a ReadWriteLockbut as you specifically state that you don't want to use any locks (btw ConcurrentHashMapwill probably use them internally) you could try the following.

我最好的建议是使用ReadWriteLock,但是当您明确声明不想使用任何锁(顺便说一句,ConcurrentHashMap可能会在内部使用它们)时,您可以尝试以下操作。

Use an AtomicReferenceto each of your maps and when the time comes to log their contents use getAndSetto replace the old map with a brand new empty one.

对每个地图使用AtomicReference,当需要记录其内容时,请使用getAndSet将旧地图替换为全新的空地图。

You now have a Map with exclusive use that you can iterate across and clear as much as you like. Unfortunately there is one small problem (that using a lock will get rid of) and that is if another thread is in the process of adding to the map at the time you swap it out with an empty one. You could perhaps add a delay at this point in the hope of waiting long enough for the other thread to finish what it was doing. Perhaps there is some internal functionality of a ConcurrentHashMapyou can use to wait until everyone has finished with it.

您现在拥有一个专属使用的 Map,您可以随心所欲地遍历和清除它。不幸的是,有一个小问题(使用锁可以解决问题),那就是当您用空线程将其换出时,另一个线程是否正在添加到映射中。您或许可以在此时添加延迟,希望等待足够长的时间让另一个线程完成它正在执行的操作。也许 a 的一些内部功能ConcurrentHashMap可以用来等待每个人都完成它。

回答by Binil Thomas

The following code uses a persistent mapfrom the functional javaproject. It uses more memory, but is (AFAIK :) safe to use by multiple threads. The only mutable value in the AtomicReferenceand it is updated with a compare-and-set. The map and event are immutable, and hence thread-safe. Also, instead of clearing the map, I replace the reference to it.

以下代码使用来自功能性 java项目的持久映射。它使用更多内存,但(AFAIK :) 可以安全地由多个线程使用。中唯一的可变值,它用compare-and-set更新。地图和事件是不可变的,因此是线程安全的。此外,我没有清除地图,而是替换了对它的引用。AtomicReference

import fj.F;
import fj.Ord;
import fj.data.TreeMap;

import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

public class Logging
{
    // Event is immutable
    private static class Event
    {
        // updates are done by creating new values
        Event update(String key)
        {
            return new Event();
        }
    }

    // Refactored code pertaining to one vendor into a separate class.
    private static class EngineLogger
    {
        public final String vendor;
        private final AtomicReference<TreeMap<String, Event>> mapRef =
                new AtomicReference<TreeMap<String, Event>>(TreeMap.<String, Event>empty(Ord.stringOrd));

        private EngineLogger(String vendor)
        {
            this.vendor = vendor;
        }

        public void log(String service, String method)
        {
            final String key = service + "." + method;
            boolean updated = false;
            while (! updated)
            {
                // get the current value of the map
                TreeMap<String, Event> currMap = mapRef.get();

                // create an updated value of the map, which is the current map plus info about the new key
                TreeMap<String, Event> newMap = currMap.update(key, new F<Event, Event>()
                {
                    @Override
                    public Event f(Event event)
                    {
                        // Modify the object fields of event, if the map contains the key
                        return event.update(key);
                    }
                    // create a new event if the map does not contain the key
                }, new Event());

                // compare-and-set the new value in .. repeat until this succeeds
                updated = mapRef.compareAndSet(currMap, newMap);
            }
        }

        public List<Event> reset()
        {
            /* replace the reference with a new map */
            TreeMap<String, Event> oldMap = mapRef.getAndSet(TreeMap.<String, Event>empty(Ord.stringOrd));

            /* use the old map to generate the list */
            return new ArrayList<Event>(oldMap.toMutableMap().values());
        }
    }

    private static Logging instance;
    private static long delay = 5 * 60 * 1000;
    private final Timer timer;

    private final EngineLogger vendor1 = new EngineLogger("vendor1");
    private final EngineLogger vendor2 = new EngineLogger("vendor2");

    private Logging()
    {
        timer = new Timer();
        timer.schedule(new TimerTask()
        {
            public void run()
            {
                try
                {
                    saveEntries();
                }
                catch (Throwable t)
                {
                    timer.cancel();
                    timer.purge();
                }
            }
        }, 0, delay);
    }

    public static synchronized Logging getInstance()
    {
        if (instance == null)
        {
            instance = new Logging();
        }
        return instance;
    }

    public void log(String engine, String service, String method)
    {
        if (vendor1.vendor.equals(engine))
        {
            vendor1.log(service, method);
        }
        else if (vendor2.vendor.equals(engine))
        {
            vendor2.log(service, method);
        }
    }

    private void saveEntries()
    {
        Map<String, List<Event>> engineCalls = new HashMap<String, List<Event>>();
        engineCalls.put(vendor1.vendor, vendor1.reset());
        engineCalls.put(vendor2.vendor, vendor2.reset());
        DBHandle.logCalls(engineCalls);
    }
}