在上篇博客(【java并發編程實戰】-----“J.U.C”:Semaphore)中,LZ介紹了Semaphore,下面LZ介紹CyclicBarrier。在JDK API中是這么介紹的:
一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,這些線程必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待線程后可以重用,所以稱它為循環 的 barrier。
CyclicBarrier 支持一個可選的 Runnable 命令,在一組線程中的最后一個線程到達之后(但在釋放所有線程之前),該命令只在每個屏障點運行一次。若在繼續所有參與線程之前更新共享狀態,此屏障操作 很有用。
對于失敗的同步嘗試,CyclicBarrier 使用了一種要么全部要么全不 (all-or-none) 的破壞模式:如果因為中斷、失敗或者超時等原因,導致線程過早地離開了屏障點,那么在該屏障點等待的其他所有線程也將通過 BrokenBarrierException(如果它們幾乎同時被中斷,則用 InterruptedException)以反常的方式離開。
CyclicBarrier結構如下:
從上圖可以看到CyclicBarrier內部使用ReentrantLock獨占鎖實現的。其構造函數如下:
CyclicBarrier(int parties):創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處于等待狀態時啟動,但它不會在啟動 barrier 時執行預定義的操作。
CyclicBarrier(int parties, Runnable barrierAction):創建一個新的 CyclicBarrier,它將在給定數量的參與者(線程)處于等待狀態時啟動,并在啟動 barrier 時執行給定的屏障操作,該操作由最后一個進入 barrier 的線程執行。
public CyclicBarrier(int parties) { this(parties, null); } public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
在CyclicBarrier中,最重要的方法就是await(),在所有參與者都已經在此 barrier 上調用 await 方法之前,將一直等待。其源代碼如下:
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen; } }
await內部調用dowait():
PRivate int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { //獨占鎖 final ReentrantLock lock = this.lock; //獲取獨占鎖 lock.lock(); try { //保存當前"Generation" final Generation g = generation; //當前generation“已損壞”,拋出BrokenBarrierException異常 //拋出該異常一般都是某個線程在等待某個處于“斷開”狀態的CyclicBarrier if (g.broken) throw new BrokenBarrierException(); //當前線程中斷,通過breakBarrier終止終止CyclicBarrier if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } //計數器-1 int index = --count; //如果計數器 == 0 //表示所有線程都已經到位,觸發動作(是否執行某項任務) if (index == 0) { // tripped boolean ranAction = false; try { //barrierCommand線程要執行的任務 final Runnable command = barrierCommand; //執行的任務!=null,執行任務 if (command != null) command.run(); ranAction = true; //喚醒所有等待線程,并更新generation。 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } //循環一直執行,直到下面三個if一個條件滿足才會退出循環 for (;;) { try { //如果不是超時等待,則調用await等待 if (!timed) trip.await(); //調用awaitNanos等待 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { Thread.currentThread().interrupt(); } } //當前generation“已損壞”,拋出BrokenBarrierException異常 //拋出該異常一般都是某個線程在等待某個處于“斷開”狀態的CyclicBarrier if (g.broken) throw new BrokenBarrierException(); //generation已經更新,返回index if (g != generation) return index; //“超時等待”,并且時間已到,則通過breakBarrier()終止CyclicBarrier if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { //釋放獨占鎖 lock.unlock(); } }
在dowait方法中其實處理邏輯還是比較簡單的:
1、首先判斷該barrier是否已經斷開了,如果斷開則拋出BrokenBarrierException異常;
2、判斷計算器index是否等于0,如果等于0,則表示所有的線程準備就緒,已經到達某個公共屏障點了,barrier可以進行后續工作了(是否執行某項任務(構造函數決定));然后調用nextGeneration方法進行更新換代工作(其中會喚醒所有等待的線程);
3、通過for循環(for(;;))使線程一直處于等待狀態。直到“有parties個線程到達barrier” 或 “當前線程被中斷” 或 “超時”這3者之一發生。
在dowait中有Generation這樣一個對象。該對象是CyclicBarrier的一個成員變量:
private static class Generation { boolean broken = false; }
Generation描述著CyclicBarrier的更顯換代。在CyclicBarrier中,同一批線程屬于同一代。當有parties個線程到達barrier,generation就會被更新換代。其中broken標識該當前CyclicBarrier是否已經處于中斷狀態。
對于中斷,CyclicBarrier是通過breakBarrier()實現的:
private void breakBarrier() { generation.broken = true; count = parties; trip.signalAll(); }
在breakBarrier()中除了將broken設置為true,還會調用signalAll將在CyclicBarrier處于等待狀態的線程全部喚醒。
在超時的判斷中,CyclicBarrier根據timed的值來執行不同的wait。await、awaitNanos都是Condition中的方法。
當index = --count等于0時,標識“有parties個線程到達barrier”,臨界條件到達,則執行相應的動作。執行完動作后,則調用nextGeneration進行更新換代:
private void nextGeneration() { //喚醒所有處于等待狀態的線程 trip.signalAll(); //初始化計數器 count = parties; //產生新的Generation對象 generation = new Generation(); }
1、線程等待到一定條件后才會繼續進行。
public class CyclicBarrierTest_1 { private static CyclicBarrier barrier; static class threadTest1 extends Thread{ public void run() { System.out.println(Thread.currentThread().getName() + "達到..."); try { barrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "執行完成..."); } } public static void main(String[] args) { barrier = new CyclicBarrier(5); for(int i = 1 ; i <= 5 ; i++){ new threadTest1().start(); } }}
------執行結果:
Thread-0達到...Thread-1達到...Thread-3達到...Thread-2達到...Thread-4達到...Thread-4執行完成...Thread-0執行完成...Thread-1執行完成...Thread-2執行完成...Thread-3執行完成...
2、線程等待到一定條件后,執行某項任務。比如說我們等車,只有當車坐滿后,汽車才會發動。
這個只需要對上面的代碼進行小動作的改動即可:
public class CyclicBarrierTest_2 { private static CyclicBarrier barrier; static class threadTest1 extends Thread{ public void run() { System.out.println(Thread.currentThread().getName() + "達到..."); try { barrier.await(); } catch (Exception e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "執行完成..."); } } public static void main(String[] args) { barrier = new CyclicBarrier(5,new Runnable() { @Override public void run() { System.out.println("執行CyclicBarrier中的任務....."); } }); for(int i = 1 ; i <= 5 ; i++){ new threadTest1().start(); } }}
-------執行結果:
Thread-0達到...Thread-1達到...Thread-3達到...Thread-4達到...Thread-2達到...執行CyclicBarrier中的任務.....Thread-2執行完成...Thread-0執行完成...Thread-3執行完成...Thread-1執行完成...Thread-4執行完成...
參考文獻:
1、Java多線程系列--“JUC鎖”10之 CyclicBarrier原理和示例
新聞熱點
疑難解答