Java技術棧
www.javastack.cn
關注閱讀更多優質文章
Disruptor是一個開源架構,研發的初衷是為了解決高并發下隊列鎖的問題,最早由LMAX提出并使用,能夠在無鎖的情況下實作隊列的并發操作,并号稱能夠在一個線程裡每秒處理6百萬筆訂單
官網:http://lmax-exchange.github.io/disruptor/
目前,包括Apache Storm、Camel、Log4j2在内的很多知名項目都應用了Disruptor以擷取高性能
為什麼會産生Disruptor架構
「目前Java内置隊列保證線程安全的方式:」
ArrayBlockingQueue:基于數組形式的隊列,通過加鎖的方式,來保證多線程情況下資料的安全;
LinkedBlockingQueue:基于連結清單形式的隊列,也通過加鎖的方式,來保證多線程情況下資料的安全;
ConcurrentLinkedQueue:基于連結清單形式的隊列,通過CAS的方式
我們知道,在程式設計過程中,加鎖通常會嚴重地影響性能,是以盡量用無鎖方式,就産生了Disruptor這種無鎖高并發架構
基本概念
參考位址:https://github.com/LMAX-Exchange/disruptor/wiki/Introduction#core-concepts
RingBuffer——Disruptor底層資料結構實作,核心類,是線程間交換資料的中轉地;
Sequencer——序号管理器,生産同步的實作者,負責消費者/生産者各自序号、序号栅欄的管理和協調,Sequencer有單生産者,多生産者兩種不同的模式,裡面實作了各種同步的算法;
Sequence——序号,聲明一個序号,用于跟蹤ringbuffer中任務的變化和消費者的消費情況,disruptor裡面大部分的并發代碼都是通過對Sequence的值同步修改實作的,而非鎖,這是disruptor高性能的一個主要原因;
SequenceBarrier——序号栅欄,管理和協調生産者的遊标序号和各個消費者的序号,確定生産者不會覆寫消費者未來得及處理的消息,确儲存在依賴的消費者之間能夠按照正确的順序處理
EventProcessor——事件處理器,監聽RingBuffer的事件,并消費可用事件,從RingBuffer讀取的事件會交由實際的生産者實作類來消費;它會一直偵聽下一個可用的序号,直到該序号對應的事件已經準備好。
EventHandler——業務處理器,是實際消費者的接口,完成具體的業務邏輯實作,第三方實作該接口;代表着消費者。
Producer——生産者接口,第三方線程充當該角色,producer向RingBuffer寫入事件。
Wait Strategy:Wait Strategy決定了一個消費者怎麼等待生産者将事件(Event)放入Disruptor中。
等待政策
源碼位址:https://github.com/LMAX-Exchange/disruptor/blob/master/src/main/java/com/lmax/disruptor/WaitStrategy.java
「BlockingWaitStrategy」
Disruptor的預設政策是BlockingWaitStrategy。在BlockingWaitStrategy内部是使用鎖和condition來控制線程的喚醒。BlockingWaitStrategy是最低效的政策,但其對CPU的消耗最小并且在各種不同部署環境中能提供更加一緻的性能表現。
「SleepingWaitStrategy」
SleepingWaitStrategy 的性能表現跟 BlockingWaitStrategy 差不多,對 CPU 的消耗也類似,但其對生産者線程的影響最小,通過使用
LockSupport.parkNanos(1)
來實作循環等待。
「YieldingWaitStrategy」
YieldingWaitStrategy是可以使用在低延遲系統的政策之一。YieldingWaitStrategy将自旋以等待序列增加到适當的值。在循環體内,将調用
Thread.yield()
以允許其他排隊的線程運作。在要求極高性能且事件處理線數小于 CPU 邏輯核心數的場景中,推薦使用此政策;例如,CPU開啟超線程的特性。
「BusySpinWaitStrategy」
性能最好,适合用于低延遲的系統。在要求極高性能且事件處理線程數小于CPU邏輯核心數的場景中,推薦使用此政策;例如,CPU開啟超線程的特性。
「PhasedBackoffWaitStrategy」
自旋 + yield + 自定義政策,CPU資源緊缺,吞吐量和延遲并不重要的場景。
使用舉例
參考位址:https://github.com/LMAX-Exchange/disruptor/wiki/Getting-Started
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.4</version>
</dependency>
複制
//定義事件event 通過Disruptor 進行交換的資料類型。
public class LongEvent {
private Long value;
public Long getValue() {
return value;
}
public void setValue(Long value) {
this.value = value;
}
}
複制
public class LongEventFactory implements EventFactory<LongEvent> {
public LongEvent newInstance() {
return new LongEvent();
}
}
複制
//定義事件消費者
public class LongEventHandler implements EventHandler<LongEvent> {
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("消費者:"+event.getValue());
}
}
複制
//定義生産者
public class LongEventProducer {
public final RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer byteBuffer) {
// 1.ringBuffer 事件隊列 下一個槽
long sequence = ringBuffer.next();
Long data = null;
try {
//2.取出空的事件隊列
LongEvent longEvent = ringBuffer.get(sequence);
data = byteBuffer.getLong(0);
//3.擷取事件隊列傳遞的資料
longEvent.setValue(data);
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
} finally {
System.out.println("生産這準備發送資料");
//4.釋出事件
ringBuffer.publish(sequence);
}
}
}
複制
public class DisruptorMain {
public static void main(String[] args) {
// 1.建立一個可緩存的線程 提供線程來出發Consumer 的事件處理
ExecutorService executor = Executors.newCachedThreadPool();
// 2.建立工廠
EventFactory<LongEvent> eventFactory = new LongEventFactory();
// 3.建立ringBuffer 大小
int ringBufferSize = 1024 * 1024; // ringBufferSize大小一定要是2的N次方
// 4.建立Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(eventFactory, ringBufferSize, executor,
ProducerType.SINGLE, new YieldingWaitStrategy());
// 5.連接配接消費端方法
disruptor.handleEventsWith(new LongEventHandler());
// 6.啟動
disruptor.start();
// 7.建立RingBuffer容器
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
// 8.建立生産者
LongEventProducer producer = new LongEventProducer(ringBuffer);
// 9.指定緩沖區大小
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (int i = 1; i <= 100; i++) {
byteBuffer.putLong(0, i);
producer.onData(byteBuffer);
}
//10.關閉disruptor和executor
disruptor.shutdown();
executor.shutdown();
}
}
複制
核心設計原理
Disruptor通過以下設計來解決隊列速度慢的問題:
「環形數組結構:」
為了避免垃圾回收,采用數組而非連結清單。同時,數組對處理器的緩存機制更加友好
❝原因:CPU緩存是由很多個緩存行組成的。每個緩存行通常是64位元組,并且它有效地引用主記憶體中的一塊兒位址。一個Java的long類型變量是8位元組,是以在一個緩存行中可以存8個long類型的變量。CPU每次從主存中拉取資料時,會把相鄰的資料也存入同一個緩存行。在通路一個long數組的時候,如果數組中的一個值被加載到緩存中,它會自動加載另外7個。是以你能非常快的周遊這個數組。
❞
「元素位置定位:」
數組長度
2^n
,通過位運算,加快定位的速度。下标采取遞增的形式。不用擔心index溢出的問題。index是long類型,即使100萬QPS的處理速度,也需要30萬年才能用完。
「無鎖設計:」
每個生産者或者消費者線程,會先申請可以操作的元素在數組中的位置,申請到之後,直接在該位置寫入或者讀取資料,整個過程通過原子變量CAS,保證操作的線程安全
資料結構
架構使用RingBuffer來作為隊列的資料結構,RingBuffer就是一個可自定義大小的環形數組。
除數組外還有一個序列号(sequence),用以指向下一個可用的元素,供生産者與消費者使用。
原理圖如下所示:
Sequence
mark:Disruptor通過順序遞增的序号來編号管理通過其進行交換的資料(事件),對資料(事件)的處理過程總是沿着序号逐個遞增處理。
「數組+序列号設計的優勢是什麼呢?」
回顧一下HashMap,在知道索引(index)下标的情況下,存與取數組上的元素時間複雜度隻有O(1),而這個index我們可以通過序列号與數組的長度取模來計算得出,
index=sequence % table.length
。當然也可以用位運算來計算效率更高,此時table.length必須是2的幂次方。
寫資料流程
單線程寫資料的流程:
- 申請寫入m個元素;
- 若是有m個元素可以入,則傳回最大的序列号。這兒主要判斷是否會覆寫未讀的元素;
- 若是傳回的正确,則生産者開始寫入元素。
使用場景
經過測試,Disruptor的的延時和吞吐量都比ArrayBlockingQueue優秀很多,是以,當你在使用ArrayBlockingQueue出現性能瓶頸的時候,你就可以考慮采用Disruptor的代替。
參考:https://github.com/LMAX-Exchange/disruptor/wiki/Performance-Results
當然,Disruptor性能高并不是必然的,是以,是否使用還得經過測試。
Disruptor的最常用的場景就是“生産者-消費者”場景,對場景的就是“一個生産者、多個消費者”的場景,并且要求順序處理。
舉個例子,我們從MySQL的BigLog檔案中順序讀取資料,然後寫入到ElasticSearch(搜尋引擎)中。在這種場景下,BigLog要求一個檔案一個生産者,那個是一個生産者。而寫入到ElasticSearch,則嚴格要求順序,否則會出現問題,是以通常意義上的多消費者線程無法解決該問題,如果通過加鎖,則性能大打折扣
參考:
https://tech.meituan.com/2016/11/18/disruptor.html
https://github.com/LMAX-Exchange/disruptor/wiki