1、引言
Disruptor是一個開源的java框架,它被設計用于在生產者—消費者(PRoducer-consumer problem,簡稱PCP)問題上獲得盡量高的吞吐量(TPS)和盡量低的延遲。Disruptor是LMAX在線交易平臺的關鍵組成部分,LMAX平臺使用該框架對訂單處理速度能達到600萬TPS,除金融領域之外,其他一般的應用中都可以用到Disruptor,它可以帶來顯著的性能提升。其實Disruptor與其說是一個框架,不如說是一種設計思路,這個設計思路對于存在“并發、緩沖區、生產者—消費者模型、事務處理”這些元素的程序來說,Disruptor提出了一種大幅提升性能(TPS)的方案。
現在有很多人寫過關于Disruptor文章,但是我還是想寫這篇淺析,畢竟不同人的理解是不同的,希望沒接觸過它的人能通過本文對Disruptor有個初步的了解,本文后面給出了一些相關鏈接供參考。
2、什么是Disruptor?為什么速度更快?
簡單的說,Disruptor是一個高性能的Buffer,并提供了使用這個Buffer的框架。為什么說是它性能更好呢?這得從PCP和傳統解決辦法的缺點開始說起。
我們知道,PCP又稱Bounded-Buffer問題,其核心就是保證對一個Buffer的存取操作在多線程環境下不會出錯。使用Java中的ArrayBlockingQueue和LinkedBlockingQueue類能輕松的完成PCP模型,這對于一般程序已經沒問題了,但是對于并發度高、TPS要求較大的系統則不然。
*BlockingQueue使用的是packagejava.util.concurrent.locks中實現的鎖,當多個線程(例如生產者)同時寫入Queue時,鎖的爭搶會導致只有一個生產者可以執行,其他線程都中斷了,也就是線程的狀態從RUNNING切換到BLOCKED,直到某個生產者線程使用完Buffer后釋放鎖,其他線程狀態才從BLOCKED切換到RUNNABLE,然后時間片到其他線程后再進行鎖的爭搶。上述過程中,一般來說生產者存放一個數據到Buffer中所需時間是非常短的,操作系統切換線程上下文的速度也是非常快的,但是當線程數量增多后,OS切換線程所帶來的開銷逐漸增多,鎖的反復申請和釋放成為性能瓶頸。*BlockingQueue除了使用鎖帶來的性能損失外,還可能因為線程爭搶的順序問題造成性能再次損失:實際使用中發現線程的調度順序并不理想,可能出現短時間內OS頻繁調度出生產者或消費者的情況,這樣造成緩沖區可能短時間內被填滿或被清空的極端情況。(理想情況應該是緩沖區長度適中,生產和消費速度基本一致)
對于上面的問題Disruptor的解決方案是:不用鎖。
Disruptor使用一個Ring Buffer存放生產者的“產品”,環形緩沖區實際上還是一段連續內存,之所以稱作環形是因為它對數據存放位置的處理,生產者和消費者各有一個指針(數組下標),消費者的指針指向下一個要讀取的Slot,生產者指針指向下一個要放入的Slot,消費或生產后,各自的指針值p = (p +1) % n,n是緩沖區長度,這樣指針在緩沖區上反復游走,故可以將緩沖區看成環狀。(如右圖)(Ring Buffer并非Disruptor原創,linux內核中就有環形緩沖區的實現。)使用Ring Buffer時:
①當生產者和消費者都只有一個時,由于兩個線程分別操作不同的指針,所以不需要鎖。
②當有多個消費者時,(按Disruptor的設計)每個消費者各自控制自己的指針,依次讀取每個Slot(也就是每個消費者都會讀取到所有的產品),這時只需要保證生產者指針不會超過最慢的消費者(超過最后一個消費者“一圈”)即可,也不需要鎖。
③當有多個生產者時,多個線程共用一個寫指針,此處需要考慮多線程問題,例如兩個生產者線程同時寫數據,當前寫指針=0,運行后其中一個線程應獲得緩沖區0號Slot,另一個應該獲得1號,寫指針=2。對于這種情況,Disruptor使用CAS來保證多線程安全。
CAS(Compare and Swap/Set)是現在CPU普遍支持的一種指令(例如cmpxchg系類指令),CAS操作包含3個操作數:CAS(A,B,C),其功能是:取地址A的值與B比較,如果相同,則將C賦值到地址A。CAS特點是它是由硬件實現的極輕量級指令,同時CPU也保證此操作的原子性。在考慮線程間同步問題時,可以使用Unsafe類的boolean compareAndSwapInt(java.lang.Object arg0, long arg1, int arg2, int arg3);系列方法,對于一個int變量(例如,Ring Buffer的寫指針),使用CAS可以避免多線程訪問帶來的混亂,當compareAndSwap方法true時表明CAS操作成功賦值,返回false則表明地址A處的值并不等于B,此時重新試一遍即可,使用CAS移動寫指針的邏輯如下:
1 //寫指針向后移動n 2 public long next(int n) 3 { 4 //...... 5 long current,next; 6 do 7 { 8 //此處先將寫指針的當前值備份一下 9 current = pointer.get();10 //預計寫指針將要移動到的位置11 next = current + n;12 //......省略:確保從current到current+n的Slot已經被消費者讀完......13 //*原子操作*如果當前寫指針和剛才一樣(說明9-12行的計算有效),那么移動寫指針14 if ( pointer.comapreAndSet(current,next) )15 break; 16 }while ( true )//如果CAS失敗或者還不能移動寫指針,則不斷嘗試17 return next;18 }
OK,我們現在有了一個使用CAS的Ring Buffer,這比用鎖快上不少,但CAS的效率并沒有想象的那么快,根據鏈接[2]pdf中評測:和單一線程無鎖執行某簡單任務相比,使用鎖的時間比無鎖高出2個數量級,CAS也高出了一個數量級。那么Disruptor還有什么提高性能的地方呢?下面列舉一下除了無鎖編程外的其他性能優化點。
①緩存行填充(Cache Line Padding):CPU緩存常以64bytes作為一個緩存行大小,緩存由若干個緩存行組成,緩存寫回主存或主存寫入緩存均是以行為單位,此外每個CPU核心都有自己的緩存(但是若某個核心對某緩存行做出修改,其他擁有同樣緩存的核心需要進行同步),生產者和消費者的指針用long型表示,假設現在只有一個生產者和一個消費者,那么雙方的指針間沒有什么直接聯系,只要不“挨著”,應該可以各改各的指針。OK前面說有點亂,下面問題來了:如果生產者和消費者的指針(加起來共16bytes)出現在同一個緩存行中會怎么樣?例如CPU核心A運行的消費者修改了一下自己的指針值(P1),那么其他核心中所有緩存了P1的緩存行都將失效,并從主存重新調配。這樣做的缺點顯而易見,但是CPU和編譯器并未聰明到避免這個問題,所以需要緩存行填充。雖然問題產生的原因很繞,但是解決方案卻非常簡單:對于一個long型的緩沖區指針,用一個長度為8的long型數組代替。如此一來,一個緩存行被這個數組填充滿,線程對各自指針的修改不會干擾到他人。
②避免GC:寫Java程序的時候,很多人習慣隨手new各種對象,雖然Java的GC會負責回收,但是系統在高壓力情況下頻繁的new必定導致更頻繁的GC,Disruptor避免這個問題的策略是:提前分配。在創建RingBuffer實例時,參數中要求給出緩沖區元素類型的Factory,創建實例時,Ring Buffer會首先將整個緩沖區填滿為Factory所產生的實例,后面生產者生產時,不再用傳統做法(順手new一個實例出來然后add到buffer中),而是獲得之前已經new好的實例,然后設置其中的值。舉個形象的例子就是,若緩沖區是個放很多紙片的地方,紙片上記錄著信息,以前的做法是:每次加入緩沖區時,都從系統那現準備一張紙片,然后再寫好紙片放進緩沖區,消費完就隨手扔掉。現在的做法是:實現準備好所有的紙片,想放入時只需要擦掉原來的信息寫上新的即可。
③成批操作(Batch):Ring Buffer的核心操作是生產和消費,如果能減少這兩個操作的次數,性能必然相應地提高。Disruptor中使用成批操作來減少生產和消費的次數,下面具體說一下Disruptor的生產和消費過程中如何體現Batch的。向RingBuffer生產東西的時候,需要經過2個階段:階段一為申請空間,申請后生產者獲得了一個指針范圍[low,high],然后再對緩沖區中[low,high]這段的所有對象進行setValue(見優化點②),階段2為發布(像這樣ringBuffer.publish(low,high);)。階段1結束后,其他生產者再申請的話,會得到另一段緩沖區。階段2結束后,之前申請的這一段數據就可以被消費者讀到。Disruptor推薦成批生產、成批發布,減少生產時的同步帶來的性能損失。從RingBuffer消費東西的時候也需要兩個階段,階段一為等待生產者的(寫)指針值超過指定值(N,即N之前的數據已經生產過了),階段一執行完后,消費者會得到一個指針值(R),表示Ring Buffer中下標R之前的值是可以讀的。階段2就是具體讀?。裕?。階段一返回值R很有可能大于N,此時消費者應該進行成批讀取操作,將[R,N]范圍內的數據全部處理。
④LMAX架構:(注:指的是LMAX公司在做他們的交易平臺時使用的一些設計思想的集合,嚴格講是LMAX架構包含Disruptor,并非其中的一部分,但是Disruptor的設計中或多或少體現了這些思想,所以在這還是要提一下,關于LMAX架構應該可以寫很多,但限于個人水平,在這只能簡單說說。另外,這個架構是以及極端追求性能的產物,不一定適合大眾。)如下圖所示LMAX架構分為三個部分,輸入/輸出Disruptor,和中間核心的業務邏輯處理器。所有的信息輸入進入InputDisruptor,被業務邏輯處理器讀取后送入OutputDisruptor,最后輸出到其他地方。
對于一般由表現層+業務層+持久層組成的Web系統,LMAX架構指的是業務層,它有如下幾個特點:
a)業務邏輯處理器(簡稱BLP)完全的In-Memory:如上圖,業務邏輯處理器是處理所有業務邏輯的地方,Input Disruptor把輸入(例如訂單數據、用戶操作)以消息的形式(稱作Message或者Event都可以)發到BLP,BLP進行響應。一般系統中我們可能會多線程執行一些業務邏輯代碼,然后這些代碼最終生成一些SQL語句,然后這些語句再去查數據庫,數據庫可能在其他主機,數據庫查詢結果可能直接用了內存中的緩存,最壞情況是數據庫從磁盤中讀取了想要的數據,最后再返回給業務邏輯代碼。這個過程有很多的時間浪費:首先,多線程訪問持久層會涉及到同步問題(鎖,有是這貨)。其次,生成*QL語句、查詢數據庫的耗時也是非常大的。最后,最壞情況下還要進行一大串磁盤IO和內存IO才能取到數據。LMAX對此的解決方案是:把能用到的所有數據全部裝入內存,只有到極少數或周期性需要同步、持久化的時候再訪問數據庫。(這聽起來有點瘋狂,但是仔細想想這些業務真的需要那么大空間嗎?)這么做的好處也是顯而易見,減少了網絡、磁盤的IO后In-Memory系統上的大部分業務邏輯全都變成一些加減乘除運算了。
b)異步-事件驅動:經過a)的修改,如果還存在一些業務邏輯處理過程是需要長時間才能完成的,那么就把它作為一個事件,再拋給其他組件(可能還是Disruptor)等待。業務邏輯處理器需要時刻保持最快速度、最高效率,它不能等待任何事情。
c)每個業務邏輯處理器是單線程的:你沒有聽錯。其實有了a)b)作為前提,會發現多線程所帶來的業務層面同步問題將會極大限制BLP效率、增大BLP的復雜度,和BLP的設計(Keep it simple, stupid.)相悖,如果實在想多線程,可以參照d)。
d)使用多級業務邏輯處理器:有些像管道模式,上圖的3塊結構可以以多種方式組合,一個BLP可以將輸出送往多個Output Disruptor,而這些Disruptor可能是另一些3塊結構的InputDisruptor,即有些BLP是起到分發作用的,另一些是進行具體業務邏輯計算的。每個BLP對應一個線程,整個架構可能比上圖復雜很多。
3、Hello Disruptor
Disruptor最初是由Java實現的,現在也有C/Cpp和.Net版本,Java版最全更新最快,代碼注釋較多比較好懂。說了這么多,本節先給出一個測試例子,展示Disruptor的基本用法,例子中用LinkedBlockingQueue和Disruptor分別實現了單一生產者+單一消費者存取簡單對象的測試,統計了一下雙方消耗的時間,僅供參考。
例子中使用Disruptor 3.2.1。不同版本間的Disruptor一些術語可能有變化,在該版本中,緩沖區里的元素被稱作Event,指針(緩沖區的下標)被稱作Sequence,生產者的指針為RingBuffer.sequencer(private成員),消費者的指針通過ringBufferInstance.newBarrier()得到。//簡單對象:緩沖區中的元素,里面只有一個value,提供setValueprivate class TestObj { public long value; public TestObj(long value) { this.value = value; } public void setValue(long value) { this.value = value; } }public class Test { //待生產的對象個數 final long objCount = 1000000; final long bufSize;//緩沖區大小 { bufSize = getRingBufferSize(objCount); } //獲取RingBuffer的緩沖區大?。?的冪次!加速計算) static long getRingBufferSize(long num) { long s = 2; while ( s < num ) { s <<= 1; } return s; } //使用LinkedBlockingQueue測試 public void testBlocingQueue() throws Exception { final LinkedBlockingQueue<TestObj> queue = new LinkedBlockingQueue<TestObj>(); Thread producer = new Thread(new Runnable() {//生產者 @Override public void run() { try{ for ( long i=1;i<=objCount;i++ ) { queue.put(new TestObj(i));//生產 } }catch ( InterruptedException e ){ } } }); Thread consumer = new Thread(new Runnable() {//消費者 @Override public void run() { try{ TestObj readObj = null; for ( long i=1;i<=objCount;i++ ) { readObj = queue.take();//消費 //DoSomethingAbout(readObj); } }catch ( InterruptedException e ){ } } }); long timeStart = System.currentTimeMillis();//統計時間 producer.start(); consumer.start(); consumer.join(); producer.join(); long timeEnd = System.currentTimeMillis(); DecimalFormat df = (DecimalFormat) DecimalFormat.getInstance(); System.out.println((timeEnd - timeStart) + "/" + df.format(objCount) + " = " + df.format(objCount/(timeEnd - timeStart)*1000) ); } //使用RingBuffer測試 public void testRingBuffer() throws Exception { //創建一個單生產者的RingBuffer,EventFactory是填充緩沖區的對象工廠 // YieldingWaitStrategy等"等待策略"指出消費者等待數據變得可用前的策略 final RingBuffer<TestObj> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<TestObj>() { @Override public TestObj newInstance() { return new TestObj(0); } } , (int)bufSize, new YieldingWaitStrategy()); //創建消費者指針 final SequenceBarrier barrier = ringBuffer.newBarrier(); Thread producer = new Thread(new Runnable() {//生產者 @Override public void run() { for ( long i=1;i<=objCount;i++ ) { long index = ringBuffer.next();//申請下一個緩沖區Slot ringBuffer.get(index).setValue(i);//對申請到的Slot賦值 ringBuffer.publish(index);//發布,然后消費者可以讀到 } } }); Thread consumer = new Thread(new Runnable() {//消費者 @Override public void run() { TestObj readObj = null; int readCount = 0; long readIndex = Sequencer.INITIAL_CURSOR_VALUE; while ( readCount < objCount )//讀取objCount個元素后結束 { try{ long nextIndex = readIndex + 1;//當前讀取到的指針+1,即下一個該讀的位置 long availableIndex = barrier.waitFor(nextIndex);//等待直到上面的位置可讀取 while ( nextIndex <= availableIndex )//從下一個可讀位置到目前能讀到的位置(Batch!) { readObj = ringBuffer.get(nextIndex);//獲得Buffer中的對象 //DoSomethingAbout(readObj); readCount++; nextIndex ++; } readIndex = availableIndex;//刷新當前讀取到的位置 }catch ( Exception ex) { ex.printStackTrace(); } } } }); long timeStart = System.currentTimeMillis();//統計時間 producer.start(); consumer.start(); consumer.join(); producer.join(); long timeEnd = System.currentTimeMillis(); DecimalFormat df = (DecimalFormat) DecimalFormat.getInstance(); System.out.println((timeEnd - timeStart) + "/" + df.format(objCount) + " = " + df.format(objCount/(timeEnd - timeStart)*1000) ); } public static void main(String[] args) throws Exception { Test ins = new Test(); //執行測試 ins.testBlocingQueue(); ins.testRingBuffer(); }}測試代碼
測試結果:
319/1,000,000 = 3,134,000 //使用LinkedBlockingQueue在319毫秒內存取100萬個簡單對象,每秒鐘能執行313萬個
46/1,000,000 = 21,739,000 //使用Disruptor在46毫秒內存取100萬個簡單對象,每秒鐘能執行2173萬個
平均下來使用Disruptor速度能提高7倍。(不同電腦、應用環境下結果可能不一致)
4、隨想:Disruptor、完成端口與Mechanical Sympathy
When pushing performance like this, it starts to become important to take account of the way modern hardware is constructed.
—The LMAX Architecture
“當對性能的追求達到這樣的程度,以致對現代硬件構成的理解變得越來越重要。”這句話恰當地形容了Disruptor/LMAX在對性能方面的追求和失敗。咦,失?。繛槭裁磿@么說呢?Disruptor當然是一個優秀的框架,我說的失敗指的是在開發它的過程中,LMAX曽試圖提高并發程序效率,優化、使用鎖或借助其他模型,但是這些嘗試最終失敗了——然后他們構建了Disruptor。再提問:一個Java程序員在嘗試提高他的程序性能的時候,需要了解很多硬件知識嗎?我想很多人都會回答“不需要”,構建Disruptor的過程中,最初開發人員對這個問題的回答可能也是“不需要”,但是嘗試失敗后他們決定另辟蹊徑??偟目聪翫isruptor的設計:鎖到CAS、緩沖行填充、避免GC等,我感覺這些設計都在刻意“遷就”或者“依賴”硬件設計,這些設計更像是一種“(ugly)hack”(毫無疑問,Disruptor還是目前最優秀的方案之一)。
Disruptor我想到了完成端口,完成端口據說能是Windows上最快的并發網絡“框架”:你只要通過API告訴Windows你想recv哪些socket,然后各個recv操作在內核層面上執行并加入到某個隊列中,最后再使用Worker線程進行處理,大部分工作Windows都為你做好了,不使用鎖也沒有上下文切換和大量線程,是不是和Disruptor異曲同工呢?完成端口和Disruptor在追求性能時,都避免使用并行、鎖、多線程等概念,這些概念的出現自有它們的原因,這里不用多說,但是為了性能(考慮到硬件)卻不能充分使用它們,說明在處理并發、并行問題上,硬件和軟件的發展存在不協調,可能馮氏計算機還是適合單“線程”順序處理信息吧。關于這種不協調,我認為應該是硬件應該會逐步適應軟件,但也有人提出了有意思的Mechanical Sympathy(鏈接[6]),至于未來會如何發展就不是這篇blog能討論的了:) 。
(完)
鏈接:
[1]Disruptor介紹譯文:http://ifeve.com/disruptor/
原文https://code.google.com/p/disruptor/wiki/BlogsAndArticles
[2]Disruptor的GitHub:http://lmax-exchange.github.io/disruptor/
其中http://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf這篇PDF對Disruptor作了很好的闡述。
[3]完成端口:http://blog.csdn.net/piggyxp/article/details/6922277
[4]致敬disruptor:CAS實現高效(偽)無鎖阻塞隊列實踐:http://www.majin163.com/2014/03/24/cas_queue/
[5]Disruptor 源碼分析:http://huangyunbin.VEvb.com/blog/1944232
[6]Mechanical Sympathy:http://mechanical-sympathy.blogspot.com/
----------------------------------(我是分割線)----------------------------------
PS1: 轉載請注明作者。
PS2: 下載:Disruptor介紹PPT
PS3:這是我第5個博客(不過前4個都不是技術blog,笑),以后我會盡量貼一些遇到的問題和思考到這里,水平有限,歡迎各位指出不足!
新聞熱點
疑難解答