Disruptor并發架構簡介
Martin Fowler在自己網站上寫了一篇LMAX架構的文章,在文章中他介紹了LMAX是一種新型零售金融交易平台,它能夠以很低的延遲産生大量交易。這個系統是建立在JVM平台上,其核心是一個業務邏輯處理器,它能夠在一個線程裡每秒處理6百萬訂單。業務邏輯處理器完全是運作在記憶體中,使用事件源驅動方式。業務邏輯處理器的核心是Disruptor。
Disruptor它是一個開源的并發架構,并獲得2011 Duke’s 程式架構創新獎,能夠在無鎖的情況下實作網絡的Queue并發操作。
Disruptor是一個高性能的異步處理架構,或者可以認為是最快的消息架構(輕量的JMS),也可以認為是一個觀察者模式的實作,或者事件監聽模式的實作。
目前我們使用disruptor已經更新到了3.x版本,比之前的2.x版本性能更加的優秀,提供更多的API使用方式。
下載下傳disruptor-3.3.2.jar引入我們的項目既可以開始disruptor之旅。
在使用之前,首先說明disruptor主要功能加以說明,你可以了解為他是一種高效的”生産者-消費者”模型。也就性能遠遠高于傳統的BlockingQueue容器。
官方學習網站:http://ifeve.com/disruptor-getting-started/
(1)使用Disruptor
- 第一:建立一個Event類,用來承載資料,因為Disruptor是一個事件驅動的,是以再Disruptor中是以事件綁定資料進行傳遞的
- 第二:建立一個工廠Event類,用于建立Event類執行個體對象
- 第三:需要有一個監聽事件類,用于處理資料(Event類)
- 第四:我們需要進行測試代碼編寫。執行個體化Disruptor執行個體,配置一系列參數。然後我們對Disruptor執行個體綁定監聽事件類,接受并處理資料。
- 第五:在Disruptor中,真正存儲資料的核心叫做RingBuffer,我們通過Disruptor執行個體拿到它,然後把資料生産出來,把資料加入到RingBuffer的執行個體對象中即可。
package com.zyh.study.disruptor.helloworld;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
public class Test {
public static void main(String[] args) throws Exception {
//建立緩沖池
ExecutorService executor = Executors.newCachedThreadPool();
//建立bufferSize ,也就是RingBuffer大小,必須是2的N次方
int ringBufferSize = * ; //
/**
//BlockingWaitStrategy 是最低效的政策,但其對CPU的消耗最小并且在各種不同部署環境中能提供更加一緻的性能表現
WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
//SleepingWaitStrategy 的性能表現跟BlockingWaitStrategy差不多,對CPU的消耗也類似,但其對生産者線程的影響最小,适合用于異步日志類似的場景
WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
//YieldingWaitStrategy 的性能是最好的,适合用于低延遲的系統。在要求極高性能且事件處理線數小于CPU邏輯核心數的場景中,推薦使用此政策;例如,CPU開啟超線程的特性
WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
*/
//建立disruptor
Disruptor<Student> disruptor = new Disruptor<Student>(
new EventFactory<Student>() {
@Override
public Student newInstance() {
return new Student();
}
}, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
// 連接配接消費事件方法
disruptor.handleEventsWith(new EventHandler<Student>(){
@Override
public void onEvent(Student student, long l, boolean b) throws Exception {
System.out.println(student.toString());
}
});
// 啟動
disruptor.start();
//Disruptor 的事件釋出過程是一個兩階段送出的過程:
//釋出事件
RingBuffer<Student> ringBuffer = disruptor.getRingBuffer();
List<Student> stus = Arrays.asList(
new Student("xiaohuihui",,"男"),
new Student("mingming",,"男"),
new Student("fangfang",,"女"),
new Student("ganggang",,"男"),
new Student("chunchun",,"女")
);
for (Student stu : stus) {
//生産者釋出資料
ringBuffer.publishEvent(new EventTranslator<Student>() {
@Override
public void translateTo(Student student, long l) {
student.setAge(stu.getAge());
student.setName(stu.getName());
student.setSex(stu.getSex());
}
});
}
disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理;
executor.shutdown();//關閉 disruptor 使用的線程池;如果需要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉;
}
}
(2)Disruptor術語
- RingBuffer:被看做Disruptor最主要的元件,然而從3.0開始RingBuffer僅僅負責存儲和更新再Disruptor中流通的資料。對一些特殊的使用場景能夠被使用者(使用其他資料結構)完全替代。
- Sequence:Disruptor使用Sequence來表示一個特殊元件處理的序号。和Disruptor一樣,每一個消費者(EventProcessor)都維持着一個Sequence。大部分的并發代碼依賴這些Sequence值得運轉,是以Sequence支援多種目前為AtomicLong類的特性。
- Sequencer:這是Disruptor真正的核心。實作了這個接口的兩種生産者(單生産者和多生産者)均實作了所有的并發算法,為了在生産者和消費者之間進行準确快速的資料傳遞。
- SequenceBarrier:由Sequencer生成,并且包含了已經釋出的Sequence的引用,這些Sequence源于Sequencer和一些獨立的消費者的Sequence。它包含了決定是否有供消費者消費的Event的邏輯。用來權衡當消費者無法從RingBuffer裡面擷取事件時的處理政策。(例如:當生産者太慢,消費者太快,會導緻消費者擷取不到新的事件會根據該政策進行處理,預設會堵塞)
- WaitStrategy:決定一個消費者将如何等待生産者将Event置入Disruptor的政策。用來權衡當生産者無法将新的事件放進RingBuffer時的處理政策。(例如:當生産者太快,消費者太慢,會導緻生成者擷取不到新的事件槽來插入新事件,則會根據該政策進行處理,預設會堵塞)
- Event:從生産者到消費者過程中所處理的資料單元。Disruptor中沒有代碼表示Event,因為它完全是由使用者定義的。
- EventProcessor:主要事件循環,處理Disruptor中的Event,并且擁有消費者的Sequence。它有一個實作類是BatchEventProcessor,包含了event loop有效的實作,并且将回調到一個EventHandler接口的實作對象。
- EventHandler:由使用者實作并且代表了Disruptor中的一個消費者的接口。
- Producer:由使用者實作,它調用RingBuffer來插入事件(Event),在Disruptor中沒有相應的實作代碼,由使用者實作。
- WorkProcessor:確定每個sequence隻被一個processor消費,在同一個WorkPool中的處理多個WorkProcessor不會消費同樣的sequence。
- WorkerPool:一個WorkProcessor池,其中WorkProcessor将消費Sequence,是以任務可以在實作WorkHandler接口的worker之間移交
- LifecycleAware:當BatchEventProcessor啟動和停止時,于實作這個接口用于接收通知。
(3)Disruptor應用
Disruptor實際上是對RingBuffer的封裝,是以我們也可以直接使用RingBuffer類
API提供的生産者接口
EventTranslator<V>
與
EventTranslatorOneArg<V v, Object data>
,前者不能動态傳參,後者可以動态傳遞一個參數
data
,
V
為需要建立的資料對象,
data
為實際資料,實作
translateTo(V v, long sequeue, Object data)
方法,其中
v
就是下一個可用事件槽裡面的對象,
data
為傳進來的真實資料,調用
ringBuffer.publishEvent(EventTranslatorOneArg translator, Object data);
來釋出資料到RingBuffer中。
package com.zyh.study.disruptor.helloworld;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
public class LongEventMain {
public static void main(String[] args) throws Exception {
//建立緩沖池
ExecutorService executor = Executors.newCachedThreadPool();
//建立bufferSize ,也就是RingBuffer大小,必須是的N次方
int ringBufferSize = * ; //
/**
//BlockingWaitStrategy 是最低效的政策,但其對CPU的消耗最小并且在各種不同部署環境中能提供更加一緻的性能表現
WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
//SleepingWaitStrategy 的性能表現跟BlockingWaitStrategy差不多,對CPU的消耗也類似,但其對生産者線程的影響最小,适合用于異步日志類似的場景
WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
//YieldingWaitStrategy 的性能是最好的,适合用于低延遲的系統。在要求極高性能且事件處理線數小于CPU邏輯核心數的場景中,推薦使用此政策;例如,CPU開啟超線程的特性
WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
*/
//建立disruptor
Disruptor<Student> disruptor = new Disruptor<Student>(()->{
return new Student();
}, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
// 連接配接消費事件方法
disruptor.handleEventsWith(new EventHandler<Student>(){
@Override
public void onEvent(Student student, long l, boolean b) throws Exception {
System.out.println(student.toString());
}
});
// 啟動
disruptor.start();
//Disruptor 的事件釋出過程是一個兩階段送出的過程:
//釋出事件
RingBuffer<Student> ringBuffer = disruptor.getRingBuffer();
List<Student> stus = Arrays.asList(
new Student("xiaohuihui",,"男"),
new Student("mingming",,"男"),
new Student("fangfang",,"女"),
new Student("ganggang",,"男"),
new Student("chunchun",,"女")
);
//生産者釋出資料
ringBuffer.publishEvent(new EventTranslatorOneArg<Student, List<Student>>() {
@Override
public void translateTo(Student student, long l, List<Student> students) {
student.setName(students.get().getName());
student.setAge(students.get().getAge());
student.setSex(students.get().getSex());
}
},stus);
// for (Student stu : stus) {
// //生産者釋出資料
// ringBuffer.publishEvent(new EventTranslator<Student>() {
// @Override
// public void translateTo(Student student, long l) {
// student.setAge(stu.getAge());
// student.setName(stu.getName());
// student.setSex(stu.getSex());
//
// }
// });
// }
disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理;
executor.shutdown();//關閉 disruptor 使用的線程池;如果需要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉;
}
}
API提供的消費者接口
- WorkerPool :
其中RingBuffer為資料緩沖區,sequenceBarrier是消費者與生産者之間的協調政策,API預設提供了一個實作類ProcessingSequenceBarrier,可以通過WorkerPool<Order>(RingBuffer<V> ringBuffer, SequenceBarrier sequenceBarrier, ExceptionHandler<? super V> exceptionHandler, WorkHandler<? super V>... workHandlers)
RingBuffer.newBarrier(Sequence... sequencesToTrack);
來擷取,exceptionHandler為異常處理函數,當handler發生異常時回調該函數,workHandlers為實作了EventHandler接口的消息業務處理類,可以有多個。
WorkerPool啟動的方法是
WorkerPool.start(Executor executor)
- BatchEventProcessor :
其中RingBuffer為資料緩沖區,sequenceBarrier是消費者與生産者之間的協調政策,API預設提供了一個實作類ProcessingSequenceBarrier,可以通過BatchEventProcessor<V>(RingBuffer extends DataProvider, SequenceBarrier sequenceBarrier, EventHandler<? super V> eventHandler)
RingBuffer.newBarrier(Sequence... sequencesToTrack);
來擷取,eventHandler為實作了EventHandler接口的消息業務處理類。
BatchEventProcessor啟動的方法是
Executor.submit(BatchEventProcessor batchEventProcessor)
注意
SequenceBarrier是用來協調消費者和生成者之間效率的政策類,是以要想Barrier生效,必須要将消費者消費的Seuence傳遞給RingBuffer,然後由RingBuffer進行協調:
ringBuffer.addGatingSequences(BatchEventProcessor.getSequence());
多消費者時使用
BatchEventProcessor.getWorkerSequences()
(這兩個方法WorkerPool同樣适用)。這是在直接使用RingBuffer時需要進行的處理,如果通過Disruptor去進行調用,在調用handleEventsWith注冊消費者方法時會自動添加該處理。
Disruptor注冊消費者的方法是:
Disruptor.handleEventsWith(final EventHandler<? super T>... handlers)
Disruptor提供了一些複雜的并行運作方式。
1、生産者A生成的資料同時被B,C兩個消費者消費,兩者都消費完成之後再由消費者D對兩者同時消費。(注意ABC以及下面提到的消息處理類必須要實作EventHandler接口)
EventHandlerGroup<Trade> handlerGroup =
disruptor.handleEventsWith(A, B);
//聲明在C1,C2完事之後執行JMS消息發送操作 也就是流程走到C3
handlerGroup.then(C);
2、生産者A生成的資料同時被B1,C2兩個消費者消費,而B消耗完畢之後由B2處理,C1處理完成之後由C2處理,B2與C2兩者都消費完成之後再由消費者D對兩者同時消費。其中B1與B2,C1與C2是并行執行的。
disruptor.handleEventsWith(B1, C1);
disruptor.after(B1).handleEventsWith(B2);
disruptor.after(C1).handleEventsWith(C2);
disruptor.after(B2, C2).handleEventsWith(h3);
3、生産者A生成的資料依次被A,B,C三個消費者消費
并行運作方式demo:
Main方法:
package com.zyh.study.disruptor.base;
import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) throws InterruptedException {
long beginTime=System.currentTimeMillis();
int bufferSize=;
ExecutorService executor=Executors.newFixedThreadPool();
Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {
@Override
public Trade newInstance() {
return new Trade();
}
}, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());
//菱形操作
//使用disruptor建立消費者組C1,C2
EventHandlerGroup<Trade> handlerGroup =
disruptor.handleEventsWith(new Handler1(), new Handler2());
//聲明在C1,C2完事之後執行JMS消息發送操作 也就是流程走到C3
handlerGroup.then(new Handler3());
//順序操作
/**
disruptor.handleEventsWith(new Handler1()).
handleEventsWith(new Handler2()).
handleEventsWith(new Handler3());
*/
//六邊形操作.
/**
Handler1 h1 = new Handler1();
Handler2 h2 = new Handler2();
Handler3 h3 = new Handler3();
Handler4 h4 = new Handler4();
Handler5 h5 = new Handler5();
disruptor.handleEventsWith(h1, h2);
disruptor.after(h1).handleEventsWith(h4);
disruptor.after(h2).handleEventsWith(h5);
disruptor.after(h4, h5).handleEventsWith(h3);
*/
disruptor.start();//啟動
CountDownLatch latch=new CountDownLatch();
//生産者準備
executor.submit(new TradePublisher(latch, disruptor));
latch.await();//等待生産者完事.
disruptor.shutdown();
executor.shutdown();
System.out.println("總耗時:"+(System.currentTimeMillis()-beginTime));
}
}
TradePublisher:
public class TradePublisher implements Runnable {
Disruptor<Trade> disruptor;
private CountDownLatch latch;
private static int LOOP=;//模拟百萬次交易的發生
public TradePublisher(CountDownLatch latch,Disruptor<Trade> disruptor) {
this.disruptor=disruptor;
this.latch=latch;
}
@Override
public void run() {
TradeEventTranslator tradeTransloator = new TradeEventTranslator();
for(int i=;i<LOOP;i++){
disruptor.publishEvent(tradeTransloator);
}
latch.countDown();
}
}
TradeEventTranslator:
class TradeEventTranslator implements EventTranslator<Trade>{
private Random random=new Random();
@Override
public void translateTo(Trade event, long sequence) {
this.generateTrade(event);
}
private Trade generateTrade(Trade trade){
trade.setPrice(random.nextDouble()*);
return trade;
}
}
Trade:
@Data
public class Trade {
private String id;//ID
private String name;
private double price;//金額
private AtomicInteger count = new AtomicInteger();
}
Handler1:
package com.zyh.study.disruptor.base;
import com.lmax.disruptor.EventHandler;
public class Handler1 implements EventHandler<Trade>{
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("handler1: set name");
event.setName("h1");
Thread.sleep();
}
}
Handler2:
package com.zyh.study.disruptor.base;
import com.lmax.disruptor.EventHandler;
public class Handler2 implements EventHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("handler2: set price");
event.setPrice();
Thread.sleep();
}
}
Handler3:
package com.zyh.study.disruptor.base;
import com.lmax.disruptor.EventHandler;
public class Handler3 implements EventHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("handler3: name: " + event.getName() + " , price: " + event.getPrice() + "; instance: " + event.toString());
}
}
Handler4:
package com.zyh.study.disruptor.base;
import com.lmax.disruptor.EventHandler;
public class Handler4 implements EventHandler<Trade> {
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("handler4: get name : " + event.getName());
event.setName(event.getName() + "h4");
}
}
Handler5:
package com.zyh.study.disruptor.base;
import com.lmax.disruptor.EventHandler;
public class Handler5 implements EventHandler<Trade>{
@Override
public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("handler5: get price : " + event.getPrice());
event.setPrice(event.getPrice() + );
}
}