傳統的HashMap
不是線程安全的, 所以多線程進行put()
和get()
操作的時候可能會引發問題. 還有一個叫做HashTable
的數據結構, 它使用的是synchronized
來保證線程安全, 但是效率很低, 因為不能并發讀.
ConcurrentHashMap
采用的是鎖分段技術, 將數據分成一段一段存儲, 然后給每一段數據配一把鎖, 當一個線程對某段數據進行寫操作的時候, 其他段的數據也能被其他線程安全訪問(讀寫).
ConcurrentHashMap
包含一個Segment
數組;一個Segment
里面包含一個HashEntry
鏈表.Segment
是一種可重入鎖, 在ConcurrentHashMap
中扮演鎖的角色. HashEntry
則是實際存儲鍵值對數據的地方. 當對HashEntry
的數據進行修改時, 必須首先獲得與它對應的Segment
鎖.
初始化segments數組
if (concurrencyLevel > MAX_SEGMENTS) { // 如果自定義的concurrencyLevel超過允許最大的并發數 concurrencyLevel = MAX_SEGMENTS; // 設定為允許的最大的并發數}int sshift = 0;int ssize = 1;while (ssize < concurrencyLevel) { ++sshift; // 為了能夠使用按位與的散列算法來定位segments數組的索引, 必須保證segments數組的長度是2的N次方 // 例如concurrencyLevel是14, 15或者16, 那么鎖的個數也總是16 ssize <<= 1;}segmentShift = 32 -sshift; // 用于定位參與散列運算的位數長度segmentMask = ssize - 1; // 散列運算的掩碼, 各個二進制位都是1this.segments = Segment.newArray(ssize); // 創建Segment數組初始化單個segment
if (initialCapacity > MAXIMUM_CAPACITY) { // 如果自定義的Capacity超過了單個segment允許的最大的Capacity initialCapacity = MAXIMUM_CAPACITY; // 設定為單個segment允許的最大的Capacity}int c = initialCapacity / ssize; // 計算單個segment下的HashEntry數組的長度基值if (c * ssize < initialCapacity) { // 確保數組能夠容納所有鍵值對 ++c;}int cap = 1;while (cap < c) { cap <<= 1; // 將HashEntry的長度向2的N次方對齊}for (int i = 0; i < this.segments.length; ++i) { // 初始化各個segment元素, 實際上是創建cap長度的HashEntry數組, loadFactor指定了HashEntry數組總體上允許的最大負載百分比 this.segments[i] = new Segment<K, V>(cap, loadFactor)}定位Segment 在插入和獲取元素的時候, 必須先通過散列算法定位到Segment
, ConcurrentHashMap
首先使用Wang/Jenkins hash的變種算法對元素的hashCode
進行一次再散列.
變種算法的代碼如下:
// 本段代碼無需理解, 只需要知道是一個對要插入ConcurrentHashMap元素的原始hash值的再散列(hash)的過程即可PRivate static int hash(int h) { h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16);}/* 用再散列后的元素的的hash值來定位segment數組索引 */final Segment<K, V> segmentFor(int hash) { // hash 是再散列后的元素的hash值 // 使用hash值的高四位進行Segment數組索引的計算 // 這里假設segmentMask為1111, segmentShift為28, 也就是Segment數組的長度為16 return segments[(hash >>> segmetnShift) & segmentMask]; }之所以進行再散列, 目的是減少散列沖突, 使元素能夠均勻分布在不同的Segment
上. 如若負載不均, 那么分段鎖就會失去意義.
主要介紹三種操作: get
, put
和size
操作
步驟如下:
對元素的原始hash值先進行再散列;通過這個散列值定位到segment
數組索引;通過散列算法定位到具體的HashEntry
數組中的元素;public V get(Object key) { int hash = hash(key.hashCode()); // 再散列 return segmentFor(hash).get(key, hash); // 點調用segmentFor返回具體的segment, 然后通過hash值獲取元素}需要注意的是, 定位Segment
參與計算是再散列后的hash值的高位, 而定位HashEntry
參與計算的是再散列后的has值的低位, 這是為了避免兩次計算使用的hash值相同, 導致元素在Segment
中散開了, 但是沒有在HashEntry
中散開.
get的高效之處在于整個get過程不需要加鎖, 因為value
和count
都是volatile
類型的值. 所以可以多線程并發讀, 但是寫入仍然是獨占的(對每個數據段獨占).
put操作是需要加鎖的. 需要完成以下操作:
通過再散列的hash值定位到需要插入的Segment
;判斷是否需要對Segment
的HashEntry
數組進行擴容(通過判斷是否超過 負載因子*總容量 來判斷);定位添加元素的位置;在擴容的時候, 首先會在當前需要擴容的Segment
中創建一個容量是之前兩倍的數組, 然后將原數組里的元素進行再散列后插入到新的數組里. 為了高效, ConcurrentHashMap
不會對整個容器進行擴容, 僅會對某個segment進行擴容.
size
操作獲取count
的累加值的具體步驟是:
segment
數組的來累加count
;如果統計的過程中, 容器的count
出現了變化, 則再采用加鎖的方式統計Segment
數組中所有元素的個數.上述的判斷容器的count
是否發生變化是通過一個modCount
變量來統計的, 任何會造成count變化的操作, 比如put
, remove
, clean
方法操作元素都會將modCount
加1, 所以在進行size操作前后比較modCount
元素是否發生變化, 就可以得知容器的大小是否發生變化.
是一個線程安全的基于鏈接節點的非阻塞FIFO無界隊列.
對于阻塞算法, 有兩種實現思路:
使用一個鎖, 也就是入隊出隊用同一個鎖使用兩個鎖, 入隊出隊用兩個鎖;對于非阻塞算法, 可以借助循環CAS的方式來實現(注: 實際上是通過加輕量級鎖的方式, 循環CAS會消耗較多的時間片, 只有確認同步塊運行速度較快的情況才能使用輕量級鎖).
以下是ConcurrentLinkedQueue
的結構
阻塞隊列是支持阻塞的插入和阻塞的移除的隊列.
阻塞的插入 意思是當隊列滿時, 隊列會阻塞插入元素的線程, 直到隊列不滿; 阻塞的移除 意思是在隊列為空時, 獲取元素的線程會等待隊列變為非空;
常見的應用場景是生產者和消費者的場景, 作為二者獲取元素的容器.
ArrayBlockingQueue
一個由數組結構組成的有界阻塞隊列LinkedBlockingQueue
一個由鏈表結構組成的有界(初始化和最大值均為Integer.MAX_VALUE
大小, 可以看做無界隊列)阻塞隊列PriorityBlockingQueue
一個支持優先級排序的無界阻塞隊列DelayQueue
一個使用優先級隊列實現的無界阻塞隊列, 在創建元素的時候可以指定多久才能從隊列中獲取當前元素, 只有在延遲期滿才能從隊列中提取元素.SynchronousQueue
一個不存儲元素的阻塞隊列, 也就是容量是0, 直接將生產者線程的數據傳遞給消費者. 每一個put
操作必須等待一個take
操作, 否則會被阻塞LinkedTransferQueue
一個由鏈表結構組成的無界阻塞隊列, 如果有消費者在等待數據, 就直接將生產者的數據傳遞給消費者, 如果沒有消費者在等待獲取數據, 就將數據放置在隊列尾部.LinkedBlockingDeque
一個由鏈表結構組成的雙向阻塞隊列. 由于兩端都可以入隊和出隊, 比前述隊列減少了一半的數據競爭, 可以用在”工作竊取”模式中未完待續
新聞熱點
疑難解答