CyclicBarrier和CountDownLatch一樣,都是關于線程的計數器。其實原理都是一樣的只是,CyclicBarrier與CountDownLatch 最大區別在 CyclicBarrier 在運行錯誤可以重新set數值,重新跑線程,而CountDownLatch 只能減一 不能重新設置。
CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier我已經到達了屏障,然后當前線程被阻塞。示例代碼如代碼清單8-3所示。
public class CyclicBarrierTest {staticCyclicBarrier c = new CyclicBarrier(2);public static void main(String[] args) {new Thread(new Runnable() {@Overridepublic void run() {try {c.await();} catch (Exception e) {}System.out.PRintln(1);}}).start();try {c.await();} catch (Exception e) {}System.out.println(2);}}
因為主線程和子線程的調度是由CPU決定的,兩個線程都有可能先執行,所以會產生兩種輸出1,2 或者2,1
如果把new CyclicBarrier(2)修改成new CyclicBarrier(3),則主線程和子線程會永遠等待,因為沒有第三個線程執行await方法,即沒有第三個線程到達屏障,所以之前到達屏障的兩個線程都不會繼續執行。
CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties,Runnable barrier-Action),用于在線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景,如代碼清單8-4所示。
public class CyclicBarrierTest2 {static CyclicBarrier c = new CyclicBarrier(2, new A());public static void main(String[] args) {new Thread(new Runnable() {@Overridepublic void run() {try {c.await();} catch (Exception e) {}System.out.println(1);}}).start();try {c.await();} catch (Exception e) {}System.out.println(2);}static class A implements Runnable {@Overridepublic void run() {System.out.println(3);}}}
CyclicBarrier的應用場景
CyclicBarrier可以用于多線程計算數據,最后合并計算結果的場景。例如,用一個Excel保存了用戶所有銀行流水,每個Sheet保存一個賬戶近一年的每筆銀行流水,現在需要統計用戶的日均銀行流水,先用多線程處理每個sheet里的銀行流水,都執行完之后,得到每個sheet的日均銀行流水,最后,再用barrierAction用這些線程的計算結果,計算出整個Excel的日均銀行流水,如代碼清單8-5所示。
public class BankWaterService implements Runnable {/*** 創建4個屏障,處理完之后執行當前類的run方法*/private CyclicBarrier c = new CyclicBarrier(4, this);/*** 假設只有4個sheet,所以只啟動4個線程*/private Executor executor = Executors.newFixedThreadPool(4);/*** 保存每個sheet計算出的銀流結果*/private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<String, Integer>();private void count() {for (int i = 0; i < 4; i++) {executor.execute(new Runnable() {@Overridepublic void run() {// 計算當前sheet的銀流數據,計算代碼省略sheetBankWaterCount.put(Thread.currentThread().getName(), 1);// 銀流計算完成,插入一個屏障try {c.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}});}}@Overridepublic void run() {int result = 0;// 匯總每個sheet計算出的結果for (Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {result += sheet.getValue();}// 將結果輸出sheetBankWaterCount.put("result", result);System.out.println(result);}public static void main(String[] args) {BankWaterService bankWaterCount = new BankWaterService();bankWaterCount.count();}}
新聞熱點
疑難解答