多線程的難點主要就是多線程通信協作這一塊了,前面筆記二中提到了常見的同步方法,這里主要是進行實例學習了,今天總結了一下3個實例:
1、銀行存款與提款多線程實現,使用Lock鎖和條件Condition。 附加 : 用監視器進行線程間通信
2、生產者消費者實現,使用LinkedList自寫緩沖區。
3、多線程之阻塞隊列學習,用阻塞隊列快速實現生產者消費者模型。 附加:用布爾變量關閉線程
在三種線程同步方法中,我們這里的實例用Lock鎖來實現變量同步,因為它比較靈活直觀。
實現了變量的同步,我們還要讓多個線程之間進行“通話”,就是一個線程完成了某個條件之后,告訴其他線程我完成了這個條件,你們可以行動了。下面就是java提供的條件接口Condition定義的同步方法:
很方便的是,java的Lock鎖里面提供了newConditon()方法可以,該方法返回:一個綁定了lock鎖的Condition實例,有點抽象,其實把它看作一個可以發信息的鎖就可以了,看后面的代碼,應該就能理解了。
1、銀行存款與提款多線程實現。我們模擬ATM機器存款與提款,創建一個賬戶類Account(),該類包含同步方法:
存款方法:deposit()
提款方法:withdraw()
以及一個普通的查詢余額的方法getbalance().
我們創建兩個任務線程,分別調用兩個同步方法,進行模擬操作,看代碼:
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;public class ThreadCoOperation {PRivate static Account account = new Account(); public static void main(String[] args){//創建線程池ExecutorService executor = Executors.newFixedThreadPool(2);executor.execute(new DepositTask());executor.execute(new WithdrawTask());}//存錢public static class DepositTask implements Runnable{@Overridepublic void run() {try {while(true){account.deposit((int)(Math.random()*1000)+1);Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}}}public static class WithdrawTask implements Runnable{@Overridepublic void run() {try{while(true){account.withdraw((int)(Math.random()*1000)+1);Thread.sleep(500);}} catch (InterruptedException e) {e.printStackTrace();}}}public static class Account{//一個鎖是一個Lock接口的實例 它定義了加鎖和釋放鎖的方法 ReentrantLock是為創建相互排斥的鎖的Lock的具體實現private static Lock lock = new ReentrantLock();//創建一個condition,具有發通知功能的鎖,前提是要實現了lock接口private static Condition newDeposit = lock.newCondition();private int balance = 0;public int getBalance(){return balance;}public void withdraw(int amount){lock.lock();try {while(balance < amount){System.out.println("/t/t錢不夠,等待存錢");newDeposit.await();}balance -= amount;System.out.println("/t/t取出"+amount+"塊錢/t剩余"+getBalance());} catch (InterruptedException e) {e.printStackTrace();}finally{lock.unlock();}}public void deposit(int amount){lock.lock();try{balance+=amount;System.out.println("存入"+amount+"塊錢");newDeposit.signalAll(); //發信息喚醒所有的線程}finally{lock.unlock();}}}}
運行截圖
分析:
1、程序中需要注意的:創建一個condition,具有發通知功能的鎖,前提是要實現了lock接口。
2、while(balance < amount)不能改用if判斷,用if會使得線程不安全,使用if會不會進行循環驗證,而while會,我們經??吹絯hile(true),但是不會經常看到if(true).
3、調用了await方法后,要記得使用signalAll()或者signal()將線程喚醒,否則線程永久等待。
最后再來分析一下這個類的結構,有3個類,兩個靜態任務類實現了Runnable接口,是線程類,而另外一個類則是普通的任務類,包含了線程類所用到的方法。我們的主類在main方法前面就實例化一個Account類,以供線程類調用該類里面的同步方法。
這種構造方式是多線程常用到的一種構造方式吧。不難發現后面要手寫的生產者消費者模型也是這樣子構造的。這相當于是一個多線程模板。也是我們學習這個例子最重要的收獲吧。
用監視器進行線程之間的通信
還有一點,接口Lock與Condition都是在java5之后出現的,在這之前,線程通信是通過內置的監視器(monitor)實現的。
監視器是一個相互排斥且具有同步能力的對象,任意對象都有可能成為一個monitor。監視器是通過synchronized關鍵字來對自己加鎖(加鎖解鎖是解決線程同步最基本的思想),使用wait()方法時線程暫停并 等待條件發生,發通知則是通過notify()和notifyAll()方法。大體的模板是這樣子的:
不難看出await()、signal()、signally()是wait()、notify()、notifyAll()的進化形態,所以不建議使用監視器。
2、生產者消費者實現,使用LinkedList自寫緩沖區
這個模型一直很經典,學操作系統的時候還學過,記得linux還用PV操作去實現它,不過這東西是跨學科的。
考慮緩存區buffer的使用者,生產者和消費者,他們都能識別緩沖區是否滿的,且兩種各只能發出一種信號:
生產者:它能發出notEmpty()信號,即緩沖區非空信號,當它看到緩沖區滿的時候,它就調用await等待。
消費者:它能發出notFull()信號,即緩沖區未滿的信號,當它看到緩沖區空的時候,它也調用await等待。
看代碼:
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.Lock;import java.util.concurrent.locks.ReentrantLock;//生產者消費者public class ConsumerProducer {private static Buffer buffer= new Buffer();public static void main(String[] args){ExecutorService executor = Executors.newFixedThreadPool(2);executor.execute(new ProducerTask());executor.execute(new ConsumerTask());executor.shutdown();}public static class ProducerTask implements Runnable{@Overridepublic void run() {int i=1;try {while(true){System.out.println("生產者寫入數據"+i);buffer.write(i++);Thread.sleep((int)(Math.random()*80));} }catch (InterruptedException e) {e.printStackTrace();}}}public static class ConsumerTask implements Runnable{public void run() {try {while(true){System.out.println("/t/t消費讀出數據"+buffer.read());Thread.sleep((int)(Math.random()*100));}} catch (InterruptedException e) {e.printStackTrace();}}}public static class Buffer{private static final int CAPACTIY = 4; //緩沖區容量private java.util.LinkedList<Integer> queue = new java.util.LinkedList<Integer>();private static Lock lock = new ReentrantLock();private static Condition notEmpty = lock.newCondition();private static Condition notFull = lock.newCondition();public void write(int value){lock.lock();try{while(queue.size()==CAPACTIY){System.out.println("緩沖區爆滿");notFull.await();}queue.offer(value);notEmpty.signalAll(); //通知所有的緩沖區未空的情況}catch(InterruptedException ex){ex.printStackTrace();}finally{lock.unlock();}}@SuppressWarnings("finally")public int read(){int value = 0;lock.lock();try{while(queue.isEmpty()){System.out.println("/t/t緩沖區是空的,等待緩沖區非空的情況");notEmpty.await();}value = queue.remove();notFull.signal();}catch(InterruptedException ex){ex.printStackTrace();}finally{lock.unlock();return value;}}}}
運行截圖
程序運行正常,不過稍微延長一下讀取時間,就會出現這樣的情況
程序里面設置的容量是4,可是這里卻可以存入最多5個數據,而且更合理的情況應該是初始緩沖區是空的,后面找了下這個小bug,原來是調用offer()函數應該放在檢測語句之前,如果希望一開始就調用ConsumerTask,在main方法里面調換兩者的順序即可。
3、用阻塞隊列快速實現生產者消費者模型java的強大之處是它有著豐富的類庫,我們學習java在某種程度上就是學習這些類庫。
阻塞隊列是這樣的一種隊列:當試圖向一個滿隊列里添加元素 或者 從空隊列里刪除元素時,隊列會讓線程自動阻塞,且當隊列滿時,隊列會繼續存儲元素,供喚醒后的線程使用。這應該說是專門為消費者生產者模型而設計的一種隊列吧,它實現了Queue接口,主要方法是put()和take()方法。
java支持三個具體的阻塞隊列ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue。都在java.util.concurrent包中。
簡單描述上面三個阻塞隊列:
ArrayBlockingQueue: 該阻塞用數組實現,按照FIFO,即先進先出的原則對數據進行排序,和數組的使用有點相似,它事先需要指定一個容量,不過即便隊列超出這個容量,也是不會報錯滴。
LinkeddBlockingQueue:用鏈表實現,默認隊列大小是Integer.MAX_VALUE,也是按照先進先出的方法對數據排序,性能可能比ArrayBlockingQueue,有待研究。
PriorityBlockingQueue:用優先隊列實現的阻塞隊列,會對元素按照大小進行排序,也可以創建不受限制的隊列,put方法永不阻塞。
ok,看代碼:
import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ConsumerProducerUsingBlockQueue {private static ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<Integer>(2);public static void main(String[] args){ExecutorService executor = Executors.newFixedThreadPool(2);executor.execute(new Consumer());executor.execute(new Producer());try {Thread.sleep(100);executor.shutdownNow(); //暴力關閉,會報錯,不推薦} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}public static class Consumer implements Runnable{@Overridepublic void run() {try{int i=1;while(true){System.out.println("生成者寫入:"+i);buffer.put(i++);Thread.sleep((int)(Math.random())*1000);}}catch(InterruptedException ex){ex.printStackTrace();}}}public static class Producer implements Runnable{@Overridepublic void run() {try{while(true){System.out.println("/t/t消費者取出"+buffer.take());Thread.sleep((int)(Math.random())*10000);}}catch(InterruptedException ex){ex.printStackTrace();}}}}
運行截圖:
沒啥大的問題,就是在關閉線程的時候太過暴力了,會報錯,線程里面的每一個函數都似乎值得研究,之前想通過Interrupt暫停,不過失敗了,就直接使用線程池執行器的shoutdownNow方法來的。后面自己又用了另外一種關閉線程的方法,見下面代碼
使用LinkedBlockingQueue實現消費者生產者且使用布爾變量控制線程關閉
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.LinkedBlockingQueue;public class A_Control_stop {private static LinkedBlockingQueue<String> buffer = new LinkedBlockingQueue<String>();public static void main(String[] args){ExecutorService executor = Executors.newFixedThreadPool(2);executor.execute(new Consumer());executor.execute(new Producer());executor.shutdown();while(!executor.isTerminated()){}System.out.println("所有的的線程都正常結束");}public static class Consumer implements Runnable{private volatile boolean exit = false;@Overridepublic void run() {try{int i=0;String[] str ={"as","d","sd","ew","sdfg","esfr"};while(!exit){System.out.println("生成者寫入:"+str[i]);buffer.put(str[i++]);Thread.sleep((int)(Math.random())*10);if(5==i){exit=true;}}}catch(InterruptedException ex){ex.printStackTrace();}}}public static class Producer implements Runnable{private volatile boolean exit = false;@Overridepublic void run() {try{int i=0;while(!exit){System.out.println("/t/t消費者取出"+buffer.take());i++;Thread.sleep((int)(Math.random())*10);if(5==i){exit=true;}}}catch(InterruptedException ex){ex.printStackTrace();}}}}
截圖
關于阻塞隊列,覺得這篇文章講的不錯,推薦大家看看 聊聊并發----Java中的阻塞隊列
用了幾天,多線程算是學了點皮毛,附注一下:這幾天文章主要是參考了《java程序語言設計進階篇第8版》,老外寫的書講的真心不錯,只不過現在java都已經更新到java8了。在其他一些網站上看到自己的文章,沒有說明轉載什么的,估計是直接“被采集”過去了。
本文出自于博客園蘭幽,轉載請說明出處。
新聞熱點
疑難解答