亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 學院 > 開發設計 > 正文

并發容器ConcurrentHashMap深度剖析

2019-11-14 11:16:14
字體:
來源:轉載
供稿:網友

還記得大學快畢業的時候要準備找工作了,然后就看各種面試相關的書籍,還記得很多面試書中都說到:

HashMap是非線程安全的,HashTable是線程安全的。

那個時候沒怎么寫java代碼,所以根本就沒有聽說過ConcurrentHashMap,只知道面試的時候就記住這句話就行了…至于為什么是線程安全的,內部怎么實現的,通通不了解。今天我們將深入剖析一個比HashTable性能更優的線程安全的Map類,它就是ConcurrentHashMap,本文基于Java 7的源碼做剖析。

ConcurrentHashMap的目的

多線程環境下,使用Hashmap進行put操作會引起死循環,導致CPU利用率接近100%,所以在并發情況下不能使用HashMap。雖然已經有一個線程安全的HashTable,但是HashTable容器使用synchronized(他的get和put方法的實現代碼如下)來保證線程安全,在線程競爭激烈的情況下HashTable的效率非常低下。因為當一個線程訪問HashTable的同步方法時,訪問其他同步方法的線程就可能會進入阻塞或者輪訓狀態。如線程1使用put進行添加元素,線程2不但不能使用put方法添加元素,并且也不能使用get方法來獲取元素,所以競爭越激烈效率越低。

public synchronized V get(Object key) {    Entry<?,?> tab[] = table;    int hash = key.hashCode();    int index = (hash & 0x7FFFFFFF) % tab.length;    for (Entry<?,?> e = tab[index] ; e != null ; e = e.next) {        if ((e.hash == hash) && e.key.equals(key)) {            return (V)e.value;        }    }    return null;}public synchronized V put(K key, V value) {    // Make sure the value is not null    if (value == null) {        throw new NullPointerException();    }    // Makes sure the key is not already in the hashtable.    Entry<?,?> tab[] = table;    int hash = key.hashCode();    int index = (hash & 0x7FFFFFFF) % tab.length;    @SupPRessWarnings("unchecked")    Entry<K,V> entry = (Entry<K,V>)tab[index];    for(; entry != null ; entry = entry.next) {        if ((entry.hash == hash) && entry.key.equals(key)) {            V old = entry.value;            entry.value = value;            return old;        }    }    addEntry(hash, key, value, index);    return null;}

在這么惡劣的環境下,ConcurrentHashMap應運而生。

實現原理

ConcurrentHashMap使用分段鎖技術,將數據分成一段一段的存儲,然后給每一段數據配一把鎖,當一個線程占用鎖訪問其中一個段數據的時候,其他段的數據也能被其他線程訪問,能夠實現真正的并發訪問。如下圖是ConcurrentHashMap的內部結構圖:從圖中可以看到,ConcurrentHashMap內部分為很多個Segment,每一個Segment擁有一把鎖,然后每個Segment(繼承ReentrantLock)下面包含很多個HashEntry列表數組。對于一個key,需要經過三次(為什么要hash三次下文會詳細講解)hash操作,才能最終定位這個元素的位置,這三次hash分別為:

對于一個key,先進行一次hash操作,得到hash值h1,也即h1 = hash1(key);將得到的h1的高幾位進行第二次hash,得到hash值h2,也即h2 = hash2(h1高幾位),通過h2能夠確定該元素的放在哪個Segment;將得到的h1進行第三次hash,得到hash值h3,也即h3 = hash3(h1),通過h3能夠確定該元素放置在哪個HashEntry。

初始化

先看看ConcurrentHashMap的初始化做了哪些事情,構造函數的源碼如下:

public ConcurrentHashMap(int initialCapacity,                             float loadFactor, int concurrencyLevel) {        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)            throw new IllegalArgumentException();        if (concurrencyLevel > MAX_SEGMENTS)            concurrencyLevel = MAX_SEGMENTS;        // Find power-of-two sizes best matching arguments        int sshift = 0;        int ssize = 1;        while (ssize < concurrencyLevel) {            ++sshift;            ssize <<= 1;        }        this.segmentShift = 32 - sshift;        this.segmentMask = ssize - 1;        if (initialCapacity > MAXIMUM_CAPACITY)            initialCapacity = MAXIMUM_CAPACITY;        int c = initialCapacity / ssize;        if (c * ssize < initialCapacity)            ++c;        int cap = MIN_SEGMENT_TABLE_CAPACITY;        while (cap < c)            cap <<= 1;        // create segments and segments[0]        Segment<K,V> s0 =            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),                             (HashEntry<K,V>[])new HashEntry[cap]);        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]        this.segments = ss;    }

傳入的參數有initialCapacity,loadFactor,concurrencyLevel這三個。

initialCapacity表示新創建的這個ConcurrentHashMap的初始容量,也就是上面的結構圖中的Entry數量。默認值為static final int DEFAULT_INITIAL_CAPACITY = 16;loadFactor表示負載因子,就是當ConcurrentHashMap中的元素個數大于loadFactor * 最大容量時就需要rehash,擴容。默認值為static final float DEFAULT_LOAD_FACTOR = 0.75f;concurrencyLevel表示并發級別,這個值用來確定Segment的個數,Segment的個數是大于等于concurrencyLevel的第一個2的n次方的數。比如,如果concurrencyLevel為12,13,14,15,16這些數,則Segment的數目為16(2的4次方)。默認值為static final int DEFAULT_CONCURRENCY_LEVEL = 16;。理想情況下ConcurrentHashMap的真正的并發訪問量能夠達到concurrencyLevel,因為有concurrencyLevel個Segment,假如有concurrencyLevel個線程需要訪問Map,并且需要訪問的數據都恰好分別落在不同的Segment中,則這些線程能夠無競爭地自由訪問(因為他們不需要競爭同一把鎖),達到同時訪問的效果。這也是為什么這個參數起名為“并發級別”的原因。

初始化的一些動作:

驗證參數的合法性,如果不合法,直接拋出異常。concurrencyLevel也就是Segment的個數不能超過規定的最大Segment的個數,默認值為static final int MAX_SEGMENTS = 1 << 16;,如果超過這個值,設置為這個值。然后使用循環找到大于等于concurrencyLevel的第一個2的n次方的數ssize,這個數就是Segment數組的大小,并記錄一共向左按位移動的次數sshift,并令segmentShift = 32 - sshift,并且segmentMask的值等于ssize - 1,segmentMask的各個二進制位都為1,目的是之后可以通過key的hash值與這個值做&運算確定Segment的索引。檢查給的容量值是否大于允許的最大容量值,如果大于該值,設置為該值。最大容量值為static final int MAXIMUM_CAPACITY = 1 << 30;。然后計算每個Segment平均應該放置多少個元素,這個值c是向上取整的值。比如初始容量為15,Segment個數為4,則每個Segment平均需要放置4個元素。最后創建一個Segment實例,將其當做Segment數組的第一個元素。

put操作

put操作的源碼如下:

public V put(K key, V value) {      Segment<K,V> s;      if (value == null)          throw new NullPointerException();      int hash = hash(key);      int j = (hash >>> segmentShift) & segmentMask;      if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck           (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment          s = ensureSegment(j);      return s.put(key, hash, value, false);  }

操作步驟如下:

判斷value是否為null,如果為null,直接拋出異常。key通過一次hash運算得到一個hash值。(這個hash運算下文詳說)將得到hash值向右按位移動segmentShift位,然后再與segmentMask做&運算得到segment的索引j。在初始化的時候我們說過segmentShift的值等于32-sshift,例如concurrencyLevel等于16,則sshift等于4,則segmentShift為28。hash值是一個32位的整數,將其向右移動28位就變成這個樣子:0000 0000 0000 0000 0000 0000 0000 xxxx,然后再用這個值與segmentMask做&運算,也就是取最后四位的值。這個值確定Segment的索引。使用Unsafe的方式從Segment數組中獲取該索引對應的Segment對象。向這個Segment對象中put值,這個put操作也基本是一樣的步驟(通過&運算獲取HashEntry的索引,然后set)。

get操作

get操作的源碼如下:

public V get(Object key) {        Segment<K,V> s; // manually integrate access methods to reduce overhead        HashEntry<K,V>[] tab;        int h = hash(key);        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&            (tab = s.table) != null) {            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);                 e != null; e = e.next) {                K k;                if ((k = e.key) == key || (e.hash == h && key.equals(k)))                    return e.value;            }        }        return null;    }

操作步驟為:

和put操作一樣,先通過key進行兩次hash確定應該去哪個Segment中取數據。使用Unsafe獲取對應的Segment,然后再進行一次&運算得到HashEntry鏈表的位置,然后從鏈表頭開始遍歷整個鏈表(因為Hash可能會有碰撞,所以用一個鏈表保存),如果找到對應的key,則返回對應的value值,如果鏈表遍歷完都沒有找到對應的key,則說明Map中不包含該key,返回null。

size操作

size操作與put和get操作最大的區別在于,size操作需要遍歷所有的Segment才能算出整個Map的大小,而put和get都只關心一個Segment。假設我們當前遍歷的Segment為SA,那么在遍歷SA過程中其他的Segment比如SB可能會被修改,于是這一次運算出來的size值可能并不是Map當前的真正大小。所以一個比較簡單的辦法就是計算Map大小的時候所有的Segment都Lock住,不能更新(包含put,remove等等)數據,計算完之后再Unlock。這是普通人能夠想到的方案,但是牛逼的作者還有一個更好的Idea:先給3次機會,不lock所有的Segment,遍歷所有Segment,累加各個Segment的大小得到整個Map的大小,如果某相鄰的兩次計算獲取的所有Segment的更新的次數(每個Segment都有一個modCount變量,這個變量在Segment中的Entry被修改時會加一,通過這個值可以得到每個Segment的更新操作的次數)是一樣的,說明計算過程中沒有更新操作,則直接返回這個值。如果這三次不加鎖的計算過程中Map的更新次數有變化,則之后的計算先對所有的Segment加鎖,再遍歷所有Segment計算Map大小,最后再解鎖所有Segment。源代碼如下:

public int size() {        // Try a few times to get accurate count. On failure due to        // continuous async changes in table, resort to locking.        final Segment<K,V>[] segments = this.segments;        int size;        boolean overflow; // true if size overflows 32 bits        long sum;         // sum of modCounts        long last = 0L;   // previous sum        int retries = -1; // first iteration isn't retry        try {            for (;;) {                if (retries++ == RETRIES_BEFORE_LOCK) {                    for (int j = 0; j < segments.length; ++j)                        ensureSegment(j).lock(); // force creation                }                sum = 0L;                size = 0;                overflow = false;                for (int j = 0; j < segments.length; ++j) {                    Segment<K,V> seg = segmentAt(segments, j);                    if (seg != null) {                        sum += seg.modCount;                        int c = seg.count;                        if (c < 0 || (size += c) < 0)                            overflow = true;                    }                }                if (sum == last)                    break;                last = sum;            }        } finally {            if (retries > RETRIES_BEFORE_LOCK) {                for (int j = 0; j < segments.length; ++j)                    segmentAt(segments, j).unlock();            }        }        return overflow ? Integer.MAX_VALUE : size;    }

舉個例子:

一個Map有4個Segment,標記為S1,S2,S3,S4,現在我們要獲取Map的size。計算過程是這樣的:第一次計算,不對S1,S2,S3,S4加鎖,遍歷所有的Segment,假設每個Segment的大小分別為1,2,3,4,更新操作次數分別為:2,2,3,1,則這次計算可以得到Map的總大小為1+2+3+4=10,總共更新操作次數為2+2+3+1=8;第二次計算,不對S1,S2,S3,S4加鎖,遍歷所有Segment,假設這次每個Segment的大小變成了2,2,3,4,更新次數分別為3,2,3,1,因為兩次計算得到的Map更新次數不一致(第一次是8,第二次是9)則可以斷定這段時間Map數據被更新,則此時應該再試一次;第三次計算,不對S1,S2,S3,S4加鎖,遍歷所有Segment,假設每個Segment的更新操作次數還是為3,2,3,1,則因為第二次計算和第三次計算得到的Map的更新操作的次數是一致的,就能說明第二次計算和第三次計算這段時間內Map數據沒有被更新,此時可以直接返回第三次計算得到的Map的大小。最壞的情況:第三次計算得到的數據更新次數和第二次也不一樣,則只能先對所有Segment加鎖再計算最后解鎖。

containsValue操作

containsValue操作采用了和size操作一樣的想法:

public boolean containsValue(Object value) {        // Same idea as size()        if (value == null)            throw new NullPointerException();        final Segment<K,V>[] segments = this.segments;        boolean found = false;        long last = 0;        int retries = -1;        try {            outer: for (;;) {                if (retries++ == RETRIES_BEFORE_LOCK) {                    for (int j = 0; j < segments.length; ++j)                        ensureSegment(j).lock(); // force creation                }                long hashSum = 0L;                int sum = 0;                for (int j = 0; j < segments.length; ++j) {                    HashEntry<K,V>[] tab;                    Segment<K,V> seg = segmentAt(segments, j);                    if (seg != null && (tab = seg.table) != null) {                        for (int i = 0 ; i < tab.length; i++) {                            HashEntry<K,V> e;                            for (e = entryAt(tab, i); e != null; e = e.next) {                                V v = e.value;                                if (v != null && value.equals(v)) {                                    found = true;                                    break outer;                                }                            }                        }                        sum += seg.modCount;                    }                }                if (retries > 0 && sum == last)                    break;                last = sum;            }        } finally {            if (retries > RETRIES_BEFORE_LOCK) {                for (int j = 0; j < segments.length; ++j)                    segmentAt(segments, j).unlock();            }        }        return found;    }

關于hash

大家一定還記得使用一個key定位Segment之前進行過一次hash操作吧?這次hash的作用是什么呢?看看hash的源代碼:

private int hash(Object k) {        int h = hashSeed;        if ((0 != h) && (k instanceof String)) {            return sun.misc.Hashing.stringHash32((String) k);        }        h ^= k.hashCode();        // Spread bits to regularize both segment and index locations,        // using variant of single-Word Wang/Jenkins hash.        h += (h <<  15) ^ 0xffffcd7d;        h ^= (h >>> 10);        h += (h <<   3);        h ^= (h >>>  6);        h += (h <<   2) + (h << 14);        return h ^ (h >>> 16);    }

源碼中的注釋是這樣的:

Applies a supplemental hash function to a given hashCode, which defends against poor quality hash functions. This is critical because ConcurrentHashMap uses power-of-two length hash tables, that otherwise encounter collisions for hashCodes that do not differ in lower or upper bits.

這里用到了Wang/Jenkins hash算法的變種,主要的目的是為了減少哈希沖突,使元素能夠均勻的分布在不同的Segment上,從而提高容器的存取效率。假如哈希的質量差到極點,那么所有的元素都在一個Segment中,不僅存取元素緩慢,分段鎖也會失去意義。

舉個簡單的例子:

System.out.println(Integer.parseInt("0001111", 2) & 15);System.out.println(Integer.parseInt("0011111", 2) & 15);System.out.println(Integer.parseInt("0111111", 2) & 15);System.out.println(Integer.parseInt("1111111", 2) & 15);

這些數字得到的hash值都是一樣的,全是15,所以如果不進行第一次預hash,發生沖突的幾率還是很大的,但是如果我們先把上例中的二進制數字使用hash()函數先進行一次預hash,得到的結果是這樣的:

0100|0111|0110|0111|1101|1010|0100|11101111|0111|0100|0011|0000|0001|1011|10000111|0111|0110|1001|0100|0110|0011|11101000|0011|0000|0000|1100|1000|0001|1010

上面這個例子引用自: InfoQ可以看到每一位的數據都散開了,并且ConcurrentHashMap中是使用預hash值的高位參與運算的。比如之前說的先將hash值向右按位移動28位,再與15做&運算,得到的結果都別為:4,15,7,8,沒有沖突!

注意事項

ConcurrentHashMap中的key和value值都不能為null。ConcurrentHashMap的整個操作過程中大量使用了Unsafe類來獲取Segment/HashEntry,這里Unsafe的主要作用是提供原子操作。Unsafe這個類比較恐怖,破壞力極強,一般場景不建議使用,如果有興趣可以到這里做詳細的了解Java中鮮為人知的特性ConcurrentHashMap是線程安全的類并不能保證使用了ConcurrentHashMap的操作都是線程安全的!

聲明

原創文章,轉載請注明出處,本文鏈接:http://qifuguang.me/2015/09/10/[Java并發包學習八]深度剖析ConcurrentHashMap/


發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
成人黄色大片在线免费观看| 精品国产1区2区| 日本老师69xxx| 久久精品福利视频| 欧美激情亚洲视频| 在线观看国产成人av片| 久久久国产精彩视频美女艺术照福利| 国产一区二区日韩精品欧美精品| 色婷婷av一区二区三区在线观看| 成人黄色大片在线免费观看| 5566日本婷婷色中文字幕97| 黄色一区二区三区| 亚洲国产精品热久久| www.亚洲天堂| 亚洲va久久久噜噜噜| 国产精品高潮视频| 国产精品久久久久久久电影| 欧美中文字幕视频| 久久久在线观看| 日本精品中文字幕| 正在播放欧美视频| 98精品在线视频| xxxx欧美18另类的高清| 国产精品美女www| 日韩欧美中文字幕在线播放| 亚洲人成电影网站色www| 国产精品极品尤物在线观看| 大胆人体色综合| 亚洲精品电影网| 久久影院中文字幕| 欧美在线视频免费观看| 久久久久免费精品国产| 九九热r在线视频精品| 亚洲精品美女久久久久| 亚洲成人激情小说| x99av成人免费| 97avcom| 亚洲激情在线视频| 国产精品情侣自拍| 亚洲欧美中文字幕| 久久亚洲综合国产精品99麻豆精品福利| 亚洲精品动漫久久久久| 8090成年在线看片午夜| 国产成人精品免高潮在线观看| 日韩精品极品视频| 欧美午夜宅男影院在线观看| 日韩美女在线观看| 国产精品亚洲自拍| 美女国内精品自产拍在线播放| 欧美专区第一页| 日本精品免费一区二区三区| 国产成+人+综合+亚洲欧洲| 成人网址在线观看| 亚洲第五色综合网| 国产一区二区三区视频免费| 蜜臀久久99精品久久久久久宅男| 国产人妖伪娘一区91| 久久精品国产视频| 色樱桃影院亚洲精品影院| 国产成人亚洲精品| 国产精品久久久久久久9999| 欧美一区二区三区免费视| 亚洲人成电影网站色xx| 中文字幕欧美专区| 欧美精品午夜视频| 中国china体内裑精亚洲片| 中文字幕精品视频| 91久久精品国产91性色| 国产一区二区在线免费| 91免费综合在线| 国产日产亚洲精品| 久久在线观看视频| 91产国在线观看动作片喷水| 国产精品高潮粉嫩av| 久久久久久999| 国产成人av在线播放| 国产成人精品在线视频| 中文字幕免费精品一区| 永久免费毛片在线播放不卡| 亚洲成年网站在线观看| 欧美午夜宅男影院在线观看| 久久久久女教师免费一区| 亚洲一区二区三区成人在线视频精品| 成人a在线视频| 欧美高清在线视频观看不卡| 久久中文精品视频| 国产成人亚洲综合| 亚洲欧美另类人妖| 国产日韩精品视频| 国内精久久久久久久久久人| 日韩亚洲成人av在线| 九九久久国产精品| 久久色在线播放| 日韩高清免费在线| 欧美日韩电影在线观看| 在线精品视频视频中文字幕| 日韩成人在线视频观看| 国产成人拍精品视频午夜网站| 国产精品久久久久免费a∨| 亚洲精品中文字| 国产97在线观看| 亚洲精品美女在线观看| 午夜精品在线观看| 国产精品视频免费在线观看| 中文字幕欧美精品日韩中文字幕| 俺去了亚洲欧美日韩| 国内精品久久久久| 日韩av电影手机在线| 亚洲乱码一区二区| 国产精品久久久久久久久久久久| 91精品国产综合久久香蕉922| 亚洲国产精品美女| 亚洲电影天堂av| 精品欧美激情精品一区| 亚洲欧美日韩在线高清直播| 日韩在线观看免费高清完整版| 一区二区亚洲欧洲国产日韩| 51色欧美片视频在线观看| 狠狠躁夜夜躁人人爽超碰91| 国产欧美一区二区| 欧美一区二区三区免费观看| 国内成人精品视频| 成人午夜在线影院| 欧美丝袜一区二区| 久久国产精品久久精品| 国产精品av在线播放| 亚洲欧美一区二区精品久久久| 久久91亚洲精品中文字幕| 欧美日韩视频在线| 国产乱肥老妇国产一区二| 国产精品久久久久久久久免费看| 日韩美女中文字幕| 国产精品成人在线| 亚洲黄在线观看| 91精品国产高清久久久久久91| 欧美亚洲伦理www| 在线观看日韩www视频免费| 色综合视频一区中文字幕| 亚洲免费视频一区二区| 成人做爰www免费看视频网站| 在线亚洲午夜片av大片| 日韩av在线免费| 亚洲精品v欧美精品v日韩精品| 国模吧一区二区三区| 欧美另类在线观看| 欧美第一黄色网| 欧美日韩黄色大片| 久久精品久久精品亚洲人| 欧美激情在线观看视频| 久久久久久欧美| 国产高清视频一区三区| 日韩欧美福利视频| 日韩电影在线观看免费| 色综合伊人色综合网站| 久久久久久久久久久免费| 成人情趣片在线观看免费| 一区二区欧美日韩视频| 国产在线观看91精品一区| 欧美一区二区大胆人体摄影专业网站| 中文字幕欧美日韩va免费视频| 亚洲v日韩v综合v精品v| 日韩成人在线视频观看| 精品国产自在精品国产浪潮| 日韩中文字幕在线免费观看|