亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb

首頁 > 開發 > Java > 正文

淺談Java中生產者與消費者問題的演變

2024-07-14 08:42:39
字體:
來源:轉載
供稿:網友

想要了解更多關于Java生產者消費者問題的演變嗎?那就看看這篇文章吧,我們分別用舊方法和新方法來處理這個問題。

生產者消費者問題是一個典型的多進程同步問題。

對于大多數人來說,這個問題可能是我們在學校,執行第一次并行算法所遇到的第一個同步問題。

雖然它很簡單,但一直是并行計算中的最大挑戰 - 多個進程共享一個資源。

問題陳述

生產者和消費者兩個程序,共享一個大小有限的公共緩沖區。

假設一個生產者“生產”一份數據并將其存儲在緩沖區中,而一個消費者“消費”這份數據,并將這份數據從緩沖區中刪除。

再假設現在這兩個程序在并發地運行,我們需要確保當緩沖區的數據已滿時,生產者不會放置新數據進來,也要確保當緩沖區的數據為空時,消費者不會試圖刪除數據緩沖區的數據。

解決方案

為了解決上述的并發問題,生產者和消費者將不得不相互通信。

如果緩沖區已滿,生產者將處于睡眠狀態,直到有通知信息喚醒。

在消費者將一些數據從緩沖區刪除后,消費者將通知生產者,隨后生產者將重新開始填充數據到緩沖區中。

如果緩沖區內容為空的化,那么情況是一樣的,只不過,消費者會先等待生產者的通知。

但如果這種溝通做得不恰當,在進程彼此等待的位置可能導致程序死鎖。

經典的方法

首先來看一個典型的Java方案來解決這個問題。

package ProducerConsumer;import java.util.LinkedList;import java.util.Queue;public class ClassicProducerConsumerExample {  public static void main(String[] args) throws InterruptedException {    Buffer buffer = new Buffer(2);    Thread producerThread = new Thread(new Runnable() {      @Override      public void run() {        try {          buffer.produce();        } catch (InterruptedException e) {          e.printStackTrace();        }      }    });    Thread consumerThread = new Thread(new Runnable() {      @Override      public void run() {        try {          buffer.consume();        } catch (InterruptedException e) {          e.printStackTrace();        }      }    });    producerThread.start();    consumerThread.start();    producerThread.join();    consumerThread.join();  }  static class Buffer {    private Queue<Integer> list;    private int size;    public Buffer(int size) {      this.list = new LinkedList<>();      this.size = size;    }    public void produce() throws InterruptedException {      int value = 0;      while (true) {        synchronized (this) {          while (list.size() >= size) {            // wait for the consumer            wait();          }          list.add(value);          System.out.println("Produced " + value);          value++;          // notify the consumer          notify();          Thread.sleep(1000);        }      }    }    public void consume() throws InterruptedException {      while (true) {        synchronized (this) {          while (list.size() == 0) {            // wait for the producer            wait();          }          int value = list.poll();          System.out.println("Consume " + value);          // notify the producer          notify();          Thread.sleep(1000);        }      }    }  }}

這里我們有生產者和消費者兩個線程,它們共享一個公共緩沖區。生產者線程開始產生新的元素并將它們存儲在緩沖區。如果緩沖區已滿,那么生產者線程進入睡眠狀態,直到有通知喚醒。否則,生產者線程將會在緩沖區創建一個新元素然后通知消費者。就像我之前說的,這個過程也適用于消費者。如果緩沖區為空,那么消費者將等待生產者的通知。否則,消費者將從緩沖區刪除一個元素并通知生產者。

正如你所看到的,在之前的例子中,生產者和消費者的工作都是管理緩沖區的對象。這些線程僅僅調用了buffer.produce()和buffer.consume()兩個方法就搞定了一切。

對于緩沖區是否應該負責創建或者刪除元素,一直都是一個有爭議的話題,但在我看來,緩沖區不應該做這種事情。當然,這取決于你想要達到的目的,但在這種情況下,緩沖區應該只是負責以線程安全的形式存儲合并元素,而不是生產新的元素。

所以,讓我們把生產和消費的邏輯從緩沖對象中進行解耦。

package ProducerConsumer;import java.util.LinkedList;import java.util.Queue;public class ProducerConsumerExample2 {  public static void main(String[] args) throws InterruptedException {    Buffer buffer = new Buffer(2);    Thread producerThread = new Thread(() -> {      try {        int value = 0;        while (true) {          buffer.add(value);          System.out.println("Produced " + value);          value ++;          Thread.sleep(1000);        }      } catch (InterruptedException e) {        e.printStackTrace();      }    });    Thread consumerThread = new Thread(() -> {      try {        while (true) {          int value = buffer.poll();          System.out.println("Consume " + value);          Thread.sleep(1000);        }      } catch (InterruptedException e) {        e.printStackTrace();      }    });    producerThread.start();    consumerThread.start();    producerThread.join();    consumerThread.join();  }  static class Buffer {    private Queue<Integer> list;    private int size;    public Buffer(int size) {      this.list = new LinkedList<>();      this.size = size;    }    public void add(int value) throws InterruptedException {      synchronized (this) {        while (list.size() >= size) {          wait();        }        list.add(value);        notify();      }    }    public int poll() throws InterruptedException {      synchronized (this) {        while (list.size() == 0) {          wait();        }        int value = list.poll();        notify();        return value;      }    }  }}

這樣好多了,至少現在緩沖區僅僅負責以線程安全的形式來存儲和刪除元素。

隊列阻塞(BlockingQueue)

不過,我們還可以進一步改善。

在前面的例子中,我們已經創建了一個緩沖區,每當存儲一個元素之前,緩沖區將等待是否有可用的一個槽以防止沒有足夠的存儲空間,并且,在合并之前,緩沖區也會等待一個新的元素出現,以確保存儲和刪除的操作是線程安全的。

但是,Java本身的庫已經整合了這些操作。它被稱之為BlockingQueue,在這里可以查看它的詳細文檔。

BlockingQueue是一個以線程安全的形式存入和取出實例的隊列。而這就是我們所需要的。

所以,如果我們在示例中使用BlockingQueue,我們就不需要再去實現等待和通知的機制。

接下來,我們來看看具體的代碼。

package ProducerConsumer;import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingDeque;public class ProducerConsumerWithBlockingQueue {  public static void main(String[] args) throws InterruptedException {    BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(2);    Thread producerThread = new Thread(() -> {      try {        int value = 0;        while (true) {          blockingQueue.put(value);          System.out.println("Produced " + value);          value++;          Thread.sleep(1000);        }      } catch (InterruptedException e) {        e.printStackTrace();      }    });    Thread consumerThread = new Thread(() -> {      try {        while (true) {          int value = blockingQueue.take();          System.out.println("Consume " + value);          Thread.sleep(1000);        }      } catch (InterruptedException e) {        e.printStackTrace();      }    });    producerThread.start();    consumerThread.start();    producerThread.join();    consumerThread.join();  }}

雖然runnables看起來跟之前一樣,他們按照之前的方式生產和消費元素。

唯一的區別在于,這里我們使用blockingQueue代替緩沖區對象。

關于Blocking Queue的更多細節

這兒有很多種類型的BlockingQueue:

  • ×××隊列
  • 有界隊列

一個×××隊列幾乎可以無限地增加元素,任何添加操作將不會被阻止。

你可以以這種方式去創建一個×××隊列:

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>();

在這種情況下,由于添加操作不會被阻塞,生產者添加新元素時可以不用等待。每次當生產者想要添加一個新元素時,會有一個隊列先存儲它。但是,這里面也存在一個異常需要捕獲。如果消費者刪除元素的速度比生產者添加新的元素要慢,那么內存將被填滿,我們將可能得到一個OutOfMemory異常。

與之相反的則是有界隊列,存在一個固定大小。你可以這樣去創建它:

BlockingQueue<String> blockingQueue = new LinkedBlockingDeque<>(10);

兩者最主要的區別在于,使用有界隊列的情況下,如果隊列內存已滿,而生產者仍然試圖往里面塞元素,那么隊列將會被阻塞(具體阻塞方式取決于添加元素的方法)直到有足夠的空間騰出來。

往blocking queue里面添加元素一共有以下四種方式:

  • add() - 如果插入成功返回true,否則拋出IllegalStateException
  • put() - 往隊列中插入元素,并在有必要的情況下等待一個可用的槽(slot)
  • offer() - 如果插入元素成功返回true,否則返回false
  • offer(E e, long timeout, TimeUnit unit) – 在隊列沒有滿的情況下,或者為了一個可用的slot而等待指定的時間后,往隊列中插入一個元素。

所以,如果你使用put()方法插入元素,而隊列內存已滿的情況下,我們的生產者就必須等待,直到有可用的slot出現。

以上就是我們上一個案例的全部,這跟ProducerConsumerExample2的工作原理是一樣的。

使用線程池

還有什么地方我們可以優化的?那首先來分析一下我們干了什么,我們實例化了兩個線程,一個被叫做生產者,專門往隊列里面塞元素,另一個被叫做消費者,負責從隊列里面刪元素。

然而,好的軟件技術表明,手動地去創建和銷毀線程是不好的做法。首先創建線程是一項昂貴的任務,每創建一個線程,意味著要經歷一遍下面的步驟:

  • 首先要分配內存給一個線程堆棧
  • 操作系統要創建一個原生線程對應于Java的線程
  • 跟這個線程相關的描述符被添加到JVM內部的數據結構中

首先別誤會我,我們的案例中用了幾個線程是沒有問題的,而那也是并發工作的方式之一。這里的問題是,我們是手動地去創建線程,這可以說是一次糟糕的實踐。如果我們手動地創建線程,除了創建過程中的消耗外,還有另一個問題,就是我們無法控制同時有多少個線程在運行。舉個例子,如果同時有一百萬次請求線上服務,那么每一次請求都會相應的創建一個線程,那么同時會有一百萬個線程在后臺運行,這將會導致[thread starvation](https://en.wikipedia.org/wiki/Starvation_(computer_science))

所以,我們需要一種全局管理線程的方式,這就用到了線程池。

線程池將基于我們選擇的策略來處理線程的生命周期。它擁有有限數量的空閑線程,并在需要解決任務時啟用它們。通過這種方式,我們不需要為每一個新的請求創建一個新線程,因此,我們可以避免出現線程饑餓的問題。

Java線程池的實現包括:

  • 一個任務隊列
  • 一個工作線程的集合
  • 一個線程工廠
  • 管理線程池狀態的元數據

為了同時運行一些任務,你必須把他們先放到任務隊列里。然后,當一個線程可用的時候,它將接收一個任務并運行它??捎玫木€程越多,并行執行的任務就越多。

除了管理線程生命周期,使用線程池還有另一個好處,當你計劃如何分割任務,以便同時執行時,你能想到更多種方式。并行性的單位不再是線程了,而是任務。你設計一些任務來并發執行,而不是讓一些線程通過共享公共的內存塊來并發運行。按照功能需求來思考的方式可以幫助我們避免一些常見的多線程問題,如死鎖或數據競爭等。沒有什么可以阻止我們再次深入這些問題,但是,由于使用了功能范式,我們沒辦法命令式地同步并行計算(鎖)。這比直接使用線程和共享內存所能碰到的幾率要少的多。在我們的例子中,共享一個阻塞隊列不是想要的情況,但我就是想強調這個優勢。

在這里和這里你可以找到更多有關線程池的內容。

說了那么多,接下來我們看看在案例中如何使用線程池。

package ProducerConsumer;import java.util.concurrent.BlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingDeque;public class ProducerConsumerExecutorService {  public static void main(String[] args) {    BlockingQueue<Integer> blockingQueue = new LinkedBlockingDeque<>(2);    ExecutorService executor = Executors.newFixedThreadPool(2);    Runnable producerTask = () -> {      try {        int value = 0;        while (true) {          blockingQueue.put(value);          System.out.println("Produced " + value);          value++;          Thread.sleep(1000);        }      } catch (InterruptedException e) {        e.printStackTrace();      }    };    Runnable consumerTask = () -> {      try {        while (true) {          int value = blockingQueue.take();          System.out.println("Consume " + value);          Thread.sleep(1000);        }      } catch (InterruptedException e) {        e.printStackTrace();      }    };    executor.execute(producerTask);    executor.execute(consumerTask);    executor.shutdown();  }}

這里的區別在于,我們不在手動創建或運行消費者和生產者線程。我們建立一個線程池,它將收到兩個任務,生產者和消費者的任務。生產者和消費者的任務,實際上跟之前例子里面使用的runnable是相同的?,F在,執行程序(線程池實現)將接收任務,并安排它的工作線程去執行他們。

在我們簡單的案例下,一切都跟之前一樣運行。就像之前的例子,我們仍然有兩個線程,他們仍然要以同樣的方式生產和消費元素。雖然我們并沒有讓性能得到提升,但是代碼看起來干凈多了。我們不再手動創建線程,而只是具體說明我們想要什么:我們想要并發執行某些任務。

所以,當你使用一個線程池時。你不需要考慮線程是并發執行的單位,相反的,你把一些任務看作并發執行的就好。以上就是你需要知道的,剩下的由執行程序去處理。執行程序會收到一些任務,然后,它會分配工作線程去處理它們。

總結

首先,我們看到了一個“傳統”的消費者-生產者問題的解決方案。我們盡量避免了重復造沒有必要的車輪,恰恰相反,我們重用了已經測試過的解決方案,因此,我們不是寫一個通知等待系統,而是嘗試使用Java已經提供的blocking queue,因為Java為我們提供了一個非常有效的線程池來管理線程生命周期,讓我們可以擺脫手動創建線程。通過這些改進,消費者-生產者問題的解決方案看起來更可靠和更好理解。

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持VeVb武林網。


注:相關教程知識閱讀請移步到JAVA教程頻道。
發表評論 共有條評論
用戶名: 密碼:
驗證碼: 匿名發表
亚洲香蕉成人av网站在线观看_欧美精品成人91久久久久久久_久久久久久久久久久亚洲_热久久视久久精品18亚洲精品_国产精自产拍久久久久久_亚洲色图国产精品_91精品国产网站_中文字幕欧美日韩精品_国产精品久久久久久亚洲调教_国产精品久久一区_性夜试看影院91社区_97在线观看视频国产_68精品久久久久久欧美_欧美精品在线观看_国产精品一区二区久久精品_欧美老女人bb
欧美激情视频给我| 亚洲欧洲免费视频| 国产极品精品在线观看| 成人激情视频小说免费下载| 91av在线网站| 美女av一区二区三区| 国产精品视频自拍| 国产精品亚发布| 亚洲精品v天堂中文字幕| 久久国产精品久久久久久| 日韩电影免费观看中文字幕| 欧美极品第一页| 国产美女直播视频一区| 91精品国产高清自在线看超| 国产免费成人av| 久久av.com| 国产ts人妖一区二区三区| 国产99视频在线观看| 精品国产一区二区三区久久久狼| 亚洲精品成人久久久| 欧美精品免费在线| 亚洲电影在线观看| 亚洲第一色中文字幕| 奇米成人av国产一区二区三区| 日韩在线视频网| 国产精品网站大全| 欧美精品在线网站| 国产精品电影在线观看| 国产精品99久久99久久久二8| 国产精品香蕉国产| 欧美老妇交乱视频| xxxxx成人.com| 国产日韩欧美中文在线播放| 狠狠爱在线视频一区| 色av中文字幕一区| 久久亚洲精品中文字幕冲田杏梨| 欧美日韩亚洲一区二区三区| 欧美性xxxx极品hd欧美风情| 亚洲欧美精品一区二区| 国产网站欧美日韩免费精品在线观看| 欧美激情啊啊啊| 欧美精品激情在线观看| 亚洲天堂网站在线观看视频| 国产日韩欧美日韩大片| 成人疯狂猛交xxx| 亚洲自拍偷拍色片视频| 91网站免费观看| 欧美壮男野外gaytube| 91色p视频在线| 欧美成人黄色小视频| 欧美成人免费小视频| 久久久噜久噜久久综合| 色综合久久中文字幕综合网小说| 亚洲一区二区三区乱码aⅴ蜜桃女| 欧美成人中文字幕| 91久久久久久久久久久| 一区国产精品视频| 日韩精品丝袜在线| 这里只有精品在线观看| 精品毛片三在线观看| 视频一区视频二区国产精品| 国产精品第七十二页| 国产精品中文字幕久久久| 亚洲美腿欧美激情另类| 亚洲欧美制服综合另类| 欧美大片免费看| 伦理中文字幕亚洲| 日韩成人中文电影| 日韩av在线免费观看一区| xxxx欧美18另类的高清| 国产精品久久久久久久电影| 久久影视电视剧免费网站清宫辞电视| 国产成人亚洲综合91精品| 狠狠色狠狠色综合日日小说| 亚洲奶大毛多的老太婆| 亚洲系列中文字幕| 欧美片一区二区三区| 中文字幕成人精品久久不卡| 国产精品9999| 亚洲人成电影在线| 国产精品第100页| 欧美精品亚州精品| 一区二区av在线| 亚洲福利精品在线| 欧美国产亚洲精品久久久8v| 日韩在线观看高清| 亚洲日本欧美日韩高观看| 成人国产亚洲精品a区天堂华泰| 国产成人免费91av在线| 国内久久久精品| 日韩美女中文字幕| 亚洲黄页视频免费观看| 国产成人综合av| 亚洲成人久久网| 中文字幕久久久av一区| 欧美麻豆久久久久久中文| 91精品国产精品| 国产精品免费久久久久影院| 精品国产一区二区在线| 成人欧美一区二区三区在线| 国产精品久久久久久久av电影| 国产激情综合五月久久| 国产专区精品视频| 欧美成aaa人片在线观看蜜臀| 欧美日韩一区二区三区在线免费观看| 在线中文字幕日韩| 欧美激情va永久在线播放| 国产欧美一区二区三区久久| 尤物tv国产一区| 日韩在线观看免费高清| 亚洲欧美在线免费| 91探花福利精品国产自产在线| 欧美小视频在线| 国产精品偷伦视频免费观看国产| 成人激情视频小说免费下载| 欧美大全免费观看电视剧大泉洋| 中文字幕久久久| 永久免费毛片在线播放不卡| 亚洲成人网在线| 欧美黄色免费网站| 1769国产精品| 成人精品久久一区二区三区| 伊人久久久久久久久久| 91九色视频导航| 午夜精品久久久久久久久久久久久| 亚洲成年人在线| 国产精品久久久久久av下载红粉| 久久久精品视频在线观看| 国产精品久久久久久搜索| 国产精品91在线| 色在人av网站天堂精品| 日韩美女视频中文字幕| 啪一啪鲁一鲁2019在线视频| 欧美在线视频导航| 亚洲女同性videos| 亚洲男人7777| 欧美激情精品久久久久久免费印度| 国产精品电影网| 91九色国产视频| 日韩天堂在线视频| 成人国产亚洲精品a区天堂华泰| 92国产精品久久久久首页| 亚洲久久久久久久久久| 国产精品人人做人人爽| 精品亚洲夜色av98在线观看| 欧美黄色小视频| 欧美高清视频一区二区| 国产精品一区二区久久久久| 成人欧美一区二区三区在线| 欧美巨乳美女视频| 亚洲色图综合网| 亚洲国产欧美一区二区三区同亚洲| 国产91网红主播在线观看| 亚洲欧美日韩视频一区| 国色天香2019中文字幕在线观看| 色七七影院综合| 精品福利在线观看| 精品成人69xx.xyz| 欧美一区视频在线| 欧美午夜片欧美片在线观看| 91精品国产综合久久久久久久久| 久久男人资源视频| 91精品国产91久久久久久最新| 欧美一区二区色|