亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 開發 > Java > 正文

Java多線程工具篇BlockingQueue的詳解

2024-07-14 08:43:47
字體:
來源:轉載
供稿:網友

前言:

在新增的Concurrent包中,BlockingQueue很好的解決了多線程中,如何高效安全“傳輸”數據的問題。通過這些高效并且線程安全的隊列類,為我們快速搭建高質量的多線程程序帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員,包括他們各自的功能以及常見使用場景。

認識BlockingQueue

阻塞隊列,顧名思義,首先它是一個隊列,而一個隊列在數據結構中通過一個共享的隊列,可以使得數據由隊列的一端輸入,從另外一端輸出;

常用的隊列主要有以下兩種:(當然通過不同的實現方式,還可以延伸出很多不同類型的隊列,DelayQueue就是其中的一種)

  • 先進先出(FIFO):先插入的隊列的元素也最先出隊列,類似于排隊的功能。從某種程度上來說這種隊列也體現了一種公平性。
  • 后進先出(LIFO):后插入隊列的元素最先出隊列,這種隊列優先處理最近發生的事件。

多線程環境中,通過隊列可以很容易實現數據共享,比如經典的“生產者”和“消費者”模型中,通過隊列可以很便利地實現兩者之間的數據共享。假設我們有若干生產者線程,另外又有若干個消費者線程。如果生產者線程需要把準備好的數據共享給消費者線程,利用隊列的方式來傳遞數據,就可以很方便地解決他們之間的數據共享問題。但如果生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的情況呢?理想情況下,如果生產者產出數據的速度大于消費者消費的速度,并且當生產出來的數據累積到一定程度的時候,那么生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數據處理完畢,反之亦然。然而,在concurrent包發布以前,在多線程環境下,我們每個程序員都必須去自己控制這些細節,尤其還要兼顧效率和線程安全,而這會給我們的程序帶來不小的復雜度。好在此時,強大的concurrent包橫空出世了,而他也給我們帶來了強大的BlockingQueue。(在多線程領域:所謂阻塞,在某些情況下會掛起線程(即阻塞),一旦條件滿足,被掛起的線程又會自動被喚醒)

BlockingQueue的兩個常見阻塞場景:

當隊列中沒有數據的情況下,消費者端的所有線程都會被自動阻塞(掛起),直到有數據放入隊列。

當隊列中填滿數據的情況下,生產者端的所有線程都會被自動阻塞(掛起),直到隊列中有空的位置,線程被自動喚醒。

這也是我們在多線程環境下,為什么需要BlockingQueue的原因。作為BlockingQueue的使用者,我們再也不需要關心什么時候需要阻塞線程,什么時候需要喚醒線程,因為這一切BlockingQueue都給你一手包辦了。既然BlockingQueue如此神通廣大,讓我們一起來見識下它的常用方法:

BlockingQueue的核心方法:

放入數據:

  offer(anObject):表示如果可能的話,將anObject加到BlockingQueue里,即如果BlockingQueue可以容納,
    則返回true,否則返回false.(本方法不阻塞當前執行方法的線程)
  offer(E o, long timeout, TimeUnit unit),可以設定等待的時間,如果在指定的時間內,還不能往隊列中
    加入BlockingQueue,則返回失敗。
  put(anObject):把anObject加到BlockingQueue里,如果BlockQueue沒有空間,則調用此方法的線程被阻斷
    直到BlockingQueue里面有空間再繼續.

獲取數據:

  poll(time):取走BlockingQueue里排在首位的對象,若不能立即取出,則可以等time參數規定的時間,
    取不到時返回null;
  poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的對象,如果在指定時間內,
    隊列一旦有數據可取,則立即返回隊列中的數據。否則知道時間超時還沒有數據可取,返回失敗。
  take():取走BlockingQueue里排在首位的對象,若BlockingQueue為空,阻斷進入等待狀態直到
    BlockingQueue有新的數據被加入; 
  drainTo():一次性從BlockingQueue獲取所有可用的數據對象(還可以指定獲取數據的個數), 
    通過該方法,可以提升獲取數據效率;不需要多次分批加鎖或釋放鎖。

常見BlockingQueue

在了解了BlockingQueue的基本功能后,讓我們來看看BlockingQueue家庭大致有哪些成員? 

BlockingQueue成員詳細介紹

1. ArrayBlockingQueue

基于數組的阻塞隊列實現,在ArrayBlockingQueue內部,維護了一個定長數組,以便緩存隊列中的數據對象,這是一個常用的阻塞隊列,除了一個定長數組外,ArrayBlockingQueue內部還保存著兩個整形變量,分別標識著隊列的頭部和尾部在數組中的位置。

ArrayBlockingQueue在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味著兩者無法真正并行運行,這點尤其不同于LinkedBlockingQueue;按照實現原理來分析,ArrayBlockingQueue完全可以采用分離鎖,從而實現生產者和消費者操作的完全并行運行。Doug Lea之所以沒這樣去做,也許是因為ArrayBlockingQueue的數據寫入和獲取操作已經足夠輕巧,以至于引入獨立的鎖機制,除了給代碼帶來額外的復雜性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在于,前者在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而后者則會生成一個額外的Node對象。這在長時間內需要高效并發地處理大批量數據的系統中,其對于GC的影響還是存在一定的區別。而在創建ArrayBlockingQueue時,我們還可以控制對象的內部鎖是否采用公平鎖,默認采用非公平鎖。

2. LinkedBlockingQueue

基于鏈表的阻塞隊列,同ArrayListBlockingQueue類似,其內部也維持著一個數據緩沖隊列(該隊列由一個鏈表構成),當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,并緩存在隊列內部,而生產者立即返回;只有當隊列緩沖區達到最大值緩存容量時(LinkedBlockingQueue可以通過構造函數指定該值),才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對于消費者這端的處理也基于同樣的原理。而LinkedBlockingQueue之所以能夠高效的處理并發數據,還因為其對于生產者端和消費者端分別采用了獨立的鎖來控制數據同步,這也意味著在高并發的情況下生產者和消費者可以并行地操作隊列中的數據,以此來提高整個隊列的并發性能。

作為開發者,我們需要注意的是,如果構造一個LinkedBlockingQueue對象,而沒有指定其容量大小,LinkedBlockingQueue會默認一個類似無限大小的容量(Integer.MAX_VALUE),這樣的話,如果生產者的速度一旦大于消費者的速度,也許還沒有等到隊列滿阻塞產生,系統內存就有可能已被消耗殆盡了。

ArrayBlockingQueue和LinkedBlockingQueue是兩個最普通也是最常用的阻塞隊列,一般情況下,在處理多線程間的生產者消費者問題,使用這兩個類足以。

下面的代碼演示了如何使用BlockingQueue:

import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;/** * @author jackyuj */public class BlockingQueueTest {  public static void main(String[] args) throws InterruptedException {    // 聲明一個容量為10的緩存隊列    BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);    Producer producer1 = new Producer(queue);    Producer producer2 = new Producer(queue);    Producer producer3 = new Producer(queue);    Consumer consumer = new Consumer(queue);    // 借助Executors    ExecutorService service = Executors.newCachedThreadPool();    // 啟動線程    service.execute(producer1);    service.execute(producer2);    service.execute(producer3);    service.execute(consumer);    // 執行10s    Thread.sleep(10 *1000);    producer1.stop();    producer2.stop();    producer3.stop();    Thread.sleep(2000);    // 退出Executor    service.shutdown();  }}
import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;/** * 消費者線程 * @author jackyuj */public class Consumerimplements Runnable {  public Consumer(BlockingQueue<String> queue) {    this.queue = queue;  }  public void run() {    System.out.println("啟動消費者線程!");    Random r = new Random();    boolean isRunning = true;    try {      while (isRunning) {        System.out.println("正從隊列獲取數據...");        String data = queue.poll(2, TimeUnit.SECONDS);        if (null != data) {          System.out.println("拿到數據:" + data);          System.out.println("正在消費數據:" + data);          Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));        }else {          // 超過2s還沒數據,認為所有生產線程都已經退出,自動退出消費線程。          isRunning = false;        }      }    }catch (InterruptedException e) {      e.printStackTrace();      Thread.currentThread().interrupt();    }finally {      System.out.println("退出消費者線程!");    }  }  private BlockingQueue<String> queue;  private static final int   DEFAULT_RANGE_FOR_SLEEP = 1000;}import java.util.Random;import java.util.concurrent.BlockingQueue;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;/** * 生產者線程 * @author jackyuj */public class Producerimplements Runnable {  public Producer(BlockingQueue queue) {    this.queue = queue;  }  public void run() {    String data = null;    Random r = new Random();    System.out.println("啟動生產者線程!");    try {      while (isRunning) {        System.out.println("正在生產數據...");        Thread.sleep(r.nextInt(DEFAULT_RANGE_FOR_SLEEP));        data = "data:" + count.incrementAndGet();        System.out.println("將數據:" + data + "放入隊列...");        if (!queue.offer(data,2, TimeUnit.SECONDS)) {          System.out.println("放入數據失敗:" + data);        }      }    }catch (InterruptedException e) {      e.printStackTrace();      Thread.currentThread().interrupt();    }finally {      System.out.println("退出生產者線程!");    }  }  public void stop() {    isRunning = false;  }  private volatile boolean   isRunning        = true;  private BlockingQueue queue;  private static AtomicInteger count          = new AtomicInteger();  private static final int   DEFAULT_RANGE_FOR_SLEEP = 1000;}

3. DelayQueue

DelayQueue中的元素只有當其指定的延遲時間到了,才能夠從隊列中獲取到該元素。DelayQueue是一個沒有大小限制的隊列,因此往隊列中插入數據的操作(生產者)永遠不會被阻塞,而只有獲取數據的操作(消費者)才會被阻塞。

使用場景:

DelayQueue使用場景較少,但都相當巧妙,常見的例子比如使用一個DelayQueue來管理一個超時未響應的連接隊列。

4. PriorityBlockingQueue

基于優先級的阻塞隊列(優先級的判斷通過構造函數傳入的Compator對象來決定),但需要注意的是PriorityBlockingQueue并不會阻塞數據生產者,而只會在沒有可消費的數據時,阻塞數據的消費者。因此使用的時候要特別注意,生產者生產數據的速度絕對不能快于消費者消費數據的速度,否則時間一長,會最終耗盡所有的可用堆內存空間。在實現PriorityBlockingQueue時,內部控制線程同步的鎖采用的是公平鎖。

5. SynchronousQueue

一種無緩沖的等待隊列,類似于無中介的直接交易,有點像原始社會中的生產者和消費者,生產者拿著產品去集市銷售給產品的最終消費者,而消費者必須親自去集市找到所要商品的直接生產者,如果一方沒有找到合適的目標,那么對不起,大家都在集市等待。相對于有緩沖的BlockingQueue來說,少了一個中間經銷商的環節(緩沖區),如果有經銷商,生產者直接把產品批發給經銷商,而無需在意經銷商最終會將這些產品賣給那些消費者,由于經銷商可以庫存一部分商品,因此相對于直接交易模式,總體來說采用中間經銷商的模式會吞吐量高一些(可以批量買賣);但另一方面,又因為經銷商的引入,使得產品從生產者到消費者中間增加了額外的交易環節,單個產品的及時響應性能可能會降低。

聲明一個SynchronousQueue有兩種不同的方式,它們之間有著不太一樣的行為。公平模式和非公平模式的區別:

如果采用公平模式:SynchronousQueue會采用公平鎖,并配合一個FIFO隊列來阻塞多余的生產者和消費者,從而體系整體的公平策略;

但如果是非公平模式(SynchronousQueue默認):SynchronousQueue采用非公平鎖,同時配合一個LIFO隊列來管理多余的生產者和消費者,而后一種模式,如果生產者和消費者的處理速度有差距,則很容易出現饑渴的情況,即可能有某些生產者或者是消費者的數據永遠都得不到處理。

小結

BlockingQueue不光實現了一個完整隊列所具有的基本功能,同時在多線程環境下,他還自動管理了多線間的自動等待于喚醒功能,從而使得程序員可以忽略這些細節,關注更高級的功能。 

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作具有一定的參考學習價值,謝謝大家對VeVb武林網的支持。


注:相關教程知識閱讀請移步到JAVA教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
亚洲免费视频网站| 亚洲精品少妇网址| 丝袜一区二区三区| 成人免费网站在线观看| 国产欧美一区二区三区在线| 日韩精品欧美国产精品忘忧草| 国产视频综合在线| 成人黄色av免费在线观看| 久久久久久久久久久久久久久久久久av| 欧美丰满少妇xxxxx做受| 26uuu亚洲伊人春色| 久久精品色欧美aⅴ一区二区| 欧洲美女7788成人免费视频| 亚洲一区二区精品| 欧美精品九九久久| 亚洲美女免费精品视频在线观看| 欧美日韩在线观看视频| 97视频色精品| 久久免费少妇高潮久久精品99| 高清一区二区三区四区五区| 亚洲精品videossex少妇| 久久久av亚洲男天堂| 欧美激情极品视频| 国产亚洲精品久久久久动| 日本精品免费一区二区三区| 一区二区三区久久精品| 国产精品丝袜高跟| 国产成人精品日本亚洲专区61| 91wwwcom在线观看| 久久久爽爽爽美女图片| 中文字幕精品—区二区| 日韩高清电影好看的电视剧电影| 国产精品高潮视频| 国产成人精品久久亚洲高清不卡| 亚洲天堂第二页| 国产精品av在线播放| 日韩欧美综合在线视频| 国产精品69精品一区二区三区| 日韩在线免费观看视频| 欧美亚洲午夜视频在线观看| 成人亲热视频网站| 久久久视频在线| 国产成人精品优优av| 精品在线欧美视频| 欧美激情精品久久久久久变态| 国产精品日韩久久久久| 欧美在线视频一二三| 亚洲一区中文字幕在线观看| 精品福利樱桃av导航| 欧美与欧洲交xxxx免费观看| 久久伊人91精品综合网站| 日韩在线观看你懂的| 久热国产精品视频| 亚洲第一精品电影| 欧美一级片久久久久久久| 国产一区二区黑人欧美xxxx| 中文字幕日韩高清| 色综合五月天导航| 国产精品揄拍500视频| 日韩精品电影网| 欧美视频一区二区三区…| 精品亚洲va在线va天堂资源站| 国产精品99久久久久久人| 免费91在线视频| 国产午夜精品麻豆| 国产免费一区二区三区香蕉精| 国产日韩中文字幕在线| 国产成人精品在线| 欧美视频一二三| 欧美激情精品久久久久久蜜臀| 热久久这里只有精品| 欧美性高潮床叫视频| 久久精品国产99国产精品澳门| 日韩在线中文字| 亚洲美女福利视频网站| 国产亚洲精品日韩| 久久夜色精品国产| 国产一区二区丝袜| 久久免费在线观看| 国产精品久久久久高潮| 国产精品视频一区国模私拍| 有码中文亚洲精品| 国产精品极品美女粉嫩高清在线| 日韩中文字幕亚洲| 国产精品久久久久久久7电影| 久久国产精品久久国产精品| 国产一区二区三区在线播放免费观看| 欧美大尺度激情区在线播放| 亚洲精品国产欧美| 影音先锋欧美在线资源| 中文字幕亚洲色图| 在线日韩av观看| 国产精品久久久久久久久久东京| 91九色视频在线| 久久精品国产视频| 国产自产女人91一区在线观看| 久久av资源网站| 91产国在线观看动作片喷水| 久久亚洲国产精品成人av秋霞| 亚洲自拍偷拍区| 欧美高清视频免费观看| 美女久久久久久久久久久| 亚洲一区二区中文字幕| 亚洲激情在线视频| 欧美国产精品人人做人人爱| 亚洲欧美另类中文字幕| 国产视频精品一区二区三区| 欧美大片免费观看在线观看网站推荐| 欧美一级视频一区二区| 日本韩国在线不卡| 成人免费午夜电影| 亚洲欧美国产日韩中文字幕| 欧美另类交人妖| 色噜噜国产精品视频一区二区| 日韩av电影在线网| 成人黄色影片在线| 精品国产一区二区在线| 丰满岳妇乱一区二区三区| 欧美在线视频免费观看| 亚洲精品理论电影| 国产美女扒开尿口久久久| 国产成人涩涩涩视频在线观看| 中文字幕av日韩| 欧美高清电影在线看| 国产午夜一区二区| 久久99精品久久久久久噜噜| 亚洲影院高清在线| 欧美中文字幕在线| 国产69精品久久久久99| 在线成人免费网站| 国产成人精品久久亚洲高清不卡| 国产精品视频一区二区三区四| 欧美日韩国产一区二区三区| 久久久成人精品视频| 欧美性生交xxxxx久久久| 国产亚洲精品久久久| 欧美另类精品xxxx孕妇| 国产ts一区二区| 综合国产在线视频| 亚洲视频欧美视频| 国产精品吹潮在线观看| 国产z一区二区三区| 国产欧美日韩高清| 精品偷拍一区二区三区在线看| 日韩av中文字幕在线播放| 亚洲一区中文字幕在线观看| 中文字幕亚洲在线| 欧美多人乱p欧美4p久久| 国产精自产拍久久久久久| 九九热精品视频在线播放| 欧美高跟鞋交xxxxhd| 亚洲国产日韩精品在线| 日韩中文字幕久久| 一区二区亚洲欧洲国产日韩| 68精品国产免费久久久久久婷婷| 成人黄色短视频在线观看| 日产精品久久久一区二区福利| 国产va免费精品高清在线观看| 国产精品久久久久久久9999| 日韩黄色在线免费观看| 国产一区二中文字幕在线看| 日韩国产高清视频在线| 色综合久久中文字幕综合网小说| 久久综合国产精品台湾中文娱乐网|