ConcurrentHashMap是Java5中支持高并发、高吞吐量的线程安全HashMap实现。
ConcurrentHashMap允许多个修改操作并发进行,其关键字在于使用了锁分离技术。它使用了多个锁来控制对Hash表的不同部分进行修改。ConcurrentHashMap内部使用段(segment)来表示这些不同的部分,每个段其实就是一个小的 Hash table,他们有自己的锁。只要多个修改操作发生在不同的段上,他们就可以并发进行。
有些方法需要跨段,比如size()和containsValue(),他们可能需要锁定整个表而不仅仅是某个段,这需要按顺序锁定所有的段,操作完毕之后又按顺序释放所有的段,这里的顺序很重要,否则可能出现死锁,在ConcurrentHashMap内部,段数组是final的,并且其成员变量实际上也是final的,但是,仅仅是将数组声明为final的并不保证数组成员也是final的,这需要实现上的保证,这可以确保不会出现死锁,因为获得锁的顺序是固定的。不变性是多线程编程占有很重要的地位。
段数组的定义如下:
/** * The segments, each of which is a specialized hash table */ final Segment<K,V>[] segments;
不变(Immutable)和易变(Volatile)
ConcurrentHashMap完全允许多个读操作并发进行,读操作并不需要加锁。但如果使用传统的技术,如HashMap中的实现,如果允许可以在hash链的中间添加或删除元素,则读操作将得到不一致的数据。ConcurrentHashMap实现技术是保证HashEntry几乎是不可变的。HashEntry代表每个Hash链中的一个节点,其结构如下:
static final class HashEntry<K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; }
从代码中可以看出,除了value不是final的,其它值都是final的,这意味着不能从hash链的中间或尾部添加或删除结点,因为这需要修改next引用值,所有的节点的修改只能从头部开始。对于put操作,可以一律添加到hash链的头部。但是对于remove操作,可能需要从中间删除一个节点,这就需要将删除节点前面所有节点复制一遍,最后一个节点指向要删除节点的下一个节点,为了确保读操作能够看到最新的值,将value设置成为volatile即易变的。
测试案例:模拟1000个并发,每个测试1000次操作,循环测试10轮,分别测试put和get操作。
public class ConcurrentHashMapTest { // 定义1000个线程 private static final int threads = 1000; // 定义一个常量 public static final int NUMBER = 1000; public static void main(String[] args) throws InterruptedException { // 创建一个线程同步的HashMap Map<String, Integer> hashMapSync = Collections.synchronizedMap(new HashMap<String, Integer>()); // 创建一个非线程同步的HashMap Map<String, Integer> hashMap = new HashMap<String, Integer>(); // 创建一个线程同步的ConcurrentHashMap对象 Map<String, Integer> concurrentHashMap = new ConcurrentHashMap<String, Integer>(); // 创建哈希表 Map<String, Integer> hashTable = new Hashtable<String, Integer>(); // 定义三个变量 long totalA = 0; long totalB = 0; long totalC = 0; // 遍历往集合中存数据 for (int i = 0; i <= 10; i++) { totalA += testPut(hashMapSync); totalB += testPut(concurrentHashMap); totalC += testPut(hashTable); } // 打印存数据的效率 System.out.println("存值的时间对比:"); System.out.println("Put time HashMapSync=" + totalA + "ms."); System.out.println("Put time ConcurrentHashMap=" + totalB + "ms."); System.out.println("Put time Hashtable=" + totalC + "ms."); // 把变量置为0,测试取值的时间 System.out.println("取值的时间对比:"); totalA = 0; totalB = 0; totalC = 0; for (int i = 0; i <= 10; i++) { totalA += testGet(hashMapSync); totalB += testGet(concurrentHashMap); totalC += testGet(hashTable); } System.out.println("Get time HashMapSync=" + totalA + "ms."); System.out.println("Get time ConcurrentHashMap=" + totalB + "ms."); System.out.println("Get time Hashtable=" + totalC + "ms."); } /** * 测试存储 * * @param map * @return * @throws InterruptedException */ private static long testPut(Map<String, Integer> map) throws InterruptedException { // 获取程序开始的时间 long start = System.currentTimeMillis(); // 循环1000个线程 for (int i = 0; i < threads; i++) { new MapPutThread(map).start(); } while (MapPutThread.counter > 0) { Thread.sleep(1); } // 返回程序执行的时间 return System.currentTimeMillis() - start; } /** * 测试取值方法 * * @param map * @return * @throws InterruptedException */ private static long testGet(Map<String, Integer> map) throws InterruptedException { // 获取程序开始时间 long start = System.currentTimeMillis(); // 循环1000个程序 for (int i = 0; i < threads; i++) { new MapGetThread(map).start(); } while (MapGetThread.counter > 0) { Thread.sleep(1); } // 返回方法执行的时间 return System.currentTimeMillis() - start; } } /** * 存值的线程 * * @author Liao */ class MapPutThread extends Thread { // 定义一个静态变量 static int counter = 0; static Object lock = new Object(); private Map<String, Integer> map; private String key = this.getId() + ""; MapPutThread(Map<String, Integer> map) { synchronized (lock) { counter++; } this.map = map; } @Override public void run() { for (int i = 1; i <= ConcurrentHashMapTest.NUMBER; i++) { map.put(key, i); } synchronized (lock) { counter--; } } } /** * 取值的线程 * * @author Liao */ class MapGetThread extends Thread { static int counter = 0; static Object lock = new Object(); private Map<String, Integer> map; private String key = this.getId() + ""; public MapGetThread(Map<String, Integer> map) { synchronized (lock) { counter++; } this.map = map; } @Override public void run() { for (int i = 1; i <= ConcurrentHashMapTest.NUMBER; i++) { map.get(key); } synchronized (lock) { counter--; } } }
测试结果:
从图中可以看出ConcurrentHashMap的效率是最高的。