SynchronousQueue是一種很特別的BlockingQueue,任何一個添加元素的操作都必須等到另外一個線程拿走元素才會結束。也就是SynchronousQueue本身不會存儲任何元素,相當于生產者和消費者手遞手直接交易。
SynchronousQueue有一個fair選項,如果fair為true,稱為fair模式,否則就是unfair模式。
在fair模式下,所有等待的生產者線程或者消費者線程會按照開始等待時間依次排隊,然后按照等待先后順序進行匹配交易。這種情況用隊列實現。
在unfair模式下,則剛好相反,后來先匹配,這種情況用棧實現。
*/ public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack(); }
因為添加元素和拿走元素是類似手遞手交易的,所以對于拿走元素和添加元素操作,SynchronousQueue調用的是Transferer同一個方法transfer。
當object為null時表示是拿走元素,用于消費者線程,否則則是添加元素,用于生產者線程。因此transfer方法是分析的重點。
abstract Object transfer(Object e, boolean timed, long nanos);
首先來看用于fair模式的TransferQueue的transfer方法:
看代碼之前,來理一下邏輯:
1. 開始隊列肯定是空。
2. 線程進入隊列,如果隊列是空的,那么就添加該線程進入隊列,然后進行等待(要么有匹配線程出現,要么就是該請求超時取消)
3. 第二個線程進入,如果前面一個線程跟它屬于不同類型,也就是說兩者是可以匹配的,那么就從隊列刪除第一個線程。
如果是相同的線程,那么做法參照2。
理清了基本邏輯,也就是會有兩種情況:
1. 隊列為空或者隊列中的等待線程是相同類型
2. 隊列中的等待線程是匹配的類型
Object transfer(Object e, boolean timed, long nanos) { QNode s = null; // e不是null表示是生成者線程,e就是產品,反之就是消費者線程 boolean isData = (e != null); for (;;) { QNode t = tail; QNode h = head; // tail和head在隊列創建時會被初始化成一個虛擬節點 // 因此發現沒有初始化,重新循環等待直到初始化完成 if (t == null || h == null) continue; // 隊列為空或等待線程類型相同(不同類型才能匹配) // 這兩種情況都要把當前線程加入到等待隊列中 if (h == t || t.isData == isData) { QNode tn = t.next; // tail對象已經被更新,出現不一致讀的現象,重新循環 if (t != tail) continue; // 添加線程到等待隊列時會先更新當前tail的next,然后 // 更新tail本身,因此出現只有next被更新的情況,應該 // 更新tail,然后重新循環 if (tn != null) { advanceTail(t, tn); continue; } // 設定了超時,剩余等待時間耗盡的時候,就無需再等待 if (timed && nanos <= 0) return null; // 首次使用s的時候,新建一個節點保存當前線程和數據來初始化s if (s == null) s = new QNode(e, isData); // 嘗試更新tail的next,把新建節點添加到tail的后面,如果失敗了,就重新循環 if (!t.casNext(null, s)) continue; // 把新建的節點設置為tail advanceTail(t, s); // 等待匹配線程,成功匹配則返回的匹配的值 // 否則返回當前節點,因此s和x相同表示請求被取消 Object x = awaitFulfill(s, e, timed, nanos); if (x == s) { clean(t, s); return null; } // 這個時候已經匹配成功了,s應該是排在第一個的等待線程 // 如果s依然在隊列中,那么需要更新head。 // 更新head的方法是把s這個排在第一位的節點作為新的head // 因此需要重置一些屬性使它變成虛擬節點 if (!s.isOffList()) { advanceHead(t, s); if (x != null) s.item = s; s.waiter = null; } // x不為null表示拿到匹配線程的數據(消費者拿到生產者的數據), // 因此返回該數據,否則返回本身的數據(生成者返回自己的數據) return (x != null) ? x : e; } else { // 線程可以匹配 // 因為是隊列,因此匹配的是第一個節點 QNode m = h.next; // 同樣需要檢查不一致讀的情況 if (t != tail || m == null || h != head) continue; Object x = m.item; // 匹配失敗時,把m從隊列中移走,重新循環 if (isData == (x != null) || // m已經被匹配了 x == m || // m已經被取消了 !m.casItem(x, e)) { // 用CAS設置m的數據為null advanceHead(h, m); continue; } // 匹配成功,更新head advanceHead(h, m); // 解除m的線程等待狀態 LockSupport.unpark(m.waiter); // 返回匹配的數據 return (x != null) ? x : e; } } }
接著來用于Unfair模式的TransferStack的transfer方法
大體邏輯應該是一樣的,不同就是隊列的入隊和出隊操作對應到棧時就是入棧和出棧的操作。
Object transfer(Object e, boolean timed, long nanos) { SNode s = null; int mode = (e == null) ? REQUEST : DATA; for (;;) { SNode h = head; // 棧為空或者節點類型相同的情況 if (h == null || h.mode == mode) { if (timed && nanos <= 0) { // 檢查棧頂節點是否已經取消,如果已經取消,彈出節點 // 重新循環,接著檢查新的棧頂節點 if (h != null && h.isCancelled()) casHead(h, h.next); else return null; // 新建節點,并且嘗試把新節點入棧 } else if (casHead(h, s = snode(s, e, h, mode))) { // 等待匹配,如果發現是被取消的情況,則釋放節點,返回null SNode m = awaitFulfill(s, timed, nanos); if (m == s) { clean(s); return null; } // 如果匹配的成功兩個節點是棧頂的兩個節點 // 把這兩個節點都彈出 if ((h = head) != null && h.next == s) casHead(h, s.next); // help s's fulfiller return (mode == REQUEST) ? m.item : s.item; } } else if (!isFulfilling(h.mode)) { // 棧頂節點沒有和其他線程在匹配,可以匹配 if (h.isCancelled()) // 棧頂節點的請求已經被取消 casHead(h, h.next); // 移除棧頂元素重新循環 // 嘗試把該節點也入棧,該節點設置為正在匹配的狀態 // 也就是isFulfilling返回true else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { for (;;) { // 棧頂節點(當前線程的節點)和它的下一個節點進行匹配,m為null意味著 // 棧里沒有其他節點了,因為前面該節點入棧了,需要彈出這個節點重新循環 SNode m = s.next; if (m == null) { casHead(s, null); s = null; break; } // 這個時候是有節點可以匹配的,嘗試為這兩個節點做匹配 SNode mn = m.next; // m和s匹配成功,彈出這兩個節點,返回數據;匹配失敗,把m移除 if (m.tryMatch(s)) { casHead(s, mn); return (mode == REQUEST) ? m.item : s.item; } else s.casNext(m, mn); } } // 棧頂正在匹配,參見代碼: // else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) { // 做法基本類似,只是這里幫助其他線程匹配,無論成功與否 // 都要重新循環 } else { SNode m = h.next; if (m == null) casHead(h, null); else { SNode mn = m.next; if (m.tryMatch(h)) casHead(h, mn); else h.casNext(m, mn); } } } }
TransferQueue和TransferStack的算法實現可以參考 這里
新聞熱點
疑難解答