天天看點

Disruptor并發架構--學習筆記Disruptor并發架構簡介

Disruptor并發架構簡介

Martin Fowler在自己網站上寫了一篇LMAX架構的文章,在文章中他介紹了LMAX是一種新型零售金融交易平台,它能夠以很低的延遲産生大量交易。這個系統是建立在JVM平台上,其核心是一個業務邏輯處理器,它能夠在一個線程裡每秒處理6百萬訂單。業務邏輯處理器完全是運作在記憶體中,使用事件源驅動方式。業務邏輯處理器的核心是Disruptor。

Disruptor它是一個開源的并發架構,并獲得2011 Duke’s 程式架構創新獎,能夠在無鎖的情況下實作網絡的Queue并發操作。

Disruptor是一個高性能的異步處理架構,或者可以認為是最快的消息架構(輕量的JMS),也可以認為是一個觀察者模式的實作,或者事件監聽模式的實作。

目前我們使用disruptor已經更新到了3.x版本,比之前的2.x版本性能更加的優秀,提供更多的API使用方式。

下載下傳disruptor-3.3.2.jar引入我們的項目既可以開始disruptor之旅。

在使用之前,首先說明disruptor主要功能加以說明,你可以了解為他是一種高效的”生産者-消費者”模型。也就性能遠遠高于傳統的BlockingQueue容器。

官方學習網站:http://ifeve.com/disruptor-getting-started/

(1)使用Disruptor

  • 第一:建立一個Event類,用來承載資料,因為Disruptor是一個事件驅動的,是以再Disruptor中是以事件綁定資料進行傳遞的
  • 第二:建立一個工廠Event類,用于建立Event類執行個體對象
  • 第三:需要有一個監聽事件類,用于處理資料(Event類)
  • 第四:我們需要進行測試代碼編寫。執行個體化Disruptor執行個體,配置一系列參數。然後我們對Disruptor執行個體綁定監聽事件類,接受并處理資料。
  • 第五:在Disruptor中,真正存儲資料的核心叫做RingBuffer,我們通過Disruptor執行個體拿到它,然後把資料生産出來,把資料加入到RingBuffer的執行個體對象中即可。
package com.zyh.study.disruptor.helloworld;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class Test {

    public static void main(String[] args) throws Exception {
        //建立緩沖池
        ExecutorService  executor = Executors.newCachedThreadPool();
        //建立bufferSize ,也就是RingBuffer大小,必須是2的N次方
        int ringBufferSize =  * ; // 

        /**
        //BlockingWaitStrategy 是最低效的政策,但其對CPU的消耗最小并且在各種不同部署環境中能提供更加一緻的性能表現
        WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
        //SleepingWaitStrategy 的性能表現跟BlockingWaitStrategy差不多,對CPU的消耗也類似,但其對生産者線程的影響最小,适合用于異步日志類似的場景
        WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
        //YieldingWaitStrategy 的性能是最好的,适合用于低延遲的系統。在要求極高性能且事件處理線數小于CPU邏輯核心數的場景中,推薦使用此政策;例如,CPU開啟超線程的特性
        WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
        */

        //建立disruptor
        Disruptor<Student> disruptor = new Disruptor<Student>(
                new EventFactory<Student>() {
                    @Override
                    public Student newInstance() {
                        return new Student();
                    }
                }, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());
        // 連接配接消費事件方法
        disruptor.handleEventsWith(new EventHandler<Student>(){
            @Override
            public void onEvent(Student student, long l, boolean b) throws Exception {
                System.out.println(student.toString());
            }
        });

        // 啟動
        disruptor.start();

        //Disruptor 的事件釋出過程是一個兩階段送出的過程:
        //釋出事件
        RingBuffer<Student> ringBuffer = disruptor.getRingBuffer();

        List<Student> stus = Arrays.asList(
                new Student("xiaohuihui",,"男"),
                new Student("mingming",,"男"),
                new Student("fangfang",,"女"),
                new Student("ganggang",,"男"),
                new Student("chunchun",,"女")

        );
        for (Student stu : stus) {
            //生産者釋出資料
            ringBuffer.publishEvent(new EventTranslator<Student>() {
                @Override
                public void translateTo(Student student, long l) {
                    student.setAge(stu.getAge());
                    student.setName(stu.getName());
                    student.setSex(stu.getSex());

                }
            });
        }

        disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理;
        executor.shutdown();//關閉 disruptor 使用的線程池;如果需要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉;      


    }
}
           

(2)Disruptor術語

  • RingBuffer:被看做Disruptor最主要的元件,然而從3.0開始RingBuffer僅僅負責存儲和更新再Disruptor中流通的資料。對一些特殊的使用場景能夠被使用者(使用其他資料結構)完全替代。
  • Sequence:Disruptor使用Sequence來表示一個特殊元件處理的序号。和Disruptor一樣,每一個消費者(EventProcessor)都維持着一個Sequence。大部分的并發代碼依賴這些Sequence值得運轉,是以Sequence支援多種目前為AtomicLong類的特性。
  • Sequencer:這是Disruptor真正的核心。實作了這個接口的兩種生産者(單生産者和多生産者)均實作了所有的并發算法,為了在生産者和消費者之間進行準确快速的資料傳遞。
  • SequenceBarrier:由Sequencer生成,并且包含了已經釋出的Sequence的引用,這些Sequence源于Sequencer和一些獨立的消費者的Sequence。它包含了決定是否有供消費者消費的Event的邏輯。用來權衡當消費者無法從RingBuffer裡面擷取事件時的處理政策。(例如:當生産者太慢,消費者太快,會導緻消費者擷取不到新的事件會根據該政策進行處理,預設會堵塞)
  • WaitStrategy:決定一個消費者将如何等待生産者将Event置入Disruptor的政策。用來權衡當生産者無法将新的事件放進RingBuffer時的處理政策。(例如:當生産者太快,消費者太慢,會導緻生成者擷取不到新的事件槽來插入新事件,則會根據該政策進行處理,預設會堵塞)
  • Event:從生産者到消費者過程中所處理的資料單元。Disruptor中沒有代碼表示Event,因為它完全是由使用者定義的。
  • EventProcessor:主要事件循環,處理Disruptor中的Event,并且擁有消費者的Sequence。它有一個實作類是BatchEventProcessor,包含了event loop有效的實作,并且将回調到一個EventHandler接口的實作對象。
  • EventHandler:由使用者實作并且代表了Disruptor中的一個消費者的接口。
  • Producer:由使用者實作,它調用RingBuffer來插入事件(Event),在Disruptor中沒有相應的實作代碼,由使用者實作。
  • WorkProcessor:確定每個sequence隻被一個processor消費,在同一個WorkPool中的處理多個WorkProcessor不會消費同樣的sequence。
  • WorkerPool:一個WorkProcessor池,其中WorkProcessor将消費Sequence,是以任務可以在實作WorkHandler接口的worker之間移交
  • LifecycleAware:當BatchEventProcessor啟動和停止時,于實作這個接口用于接收通知。

(3)Disruptor應用

Disruptor實際上是對RingBuffer的封裝,是以我們也可以直接使用RingBuffer類

API提供的生産者接口

EventTranslator<V>

EventTranslatorOneArg<V v, Object data>

,前者不能動态傳參,後者可以動态傳遞一個參數

data

V

為需要建立的資料對象,

data

為實際資料,實作

translateTo(V v, long sequeue, Object data)

方法,其中

v

就是下一個可用事件槽裡面的對象,

data

為傳進來的真實資料,調用

ringBuffer.publishEvent(EventTranslatorOneArg translator, Object data);

來釋出資料到RingBuffer中。

package com.zyh.study.disruptor.helloworld;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class LongEventMain {

    public static void main(String[] args) throws Exception {
        //建立緩沖池
        ExecutorService  executor = Executors.newCachedThreadPool();
        //建立bufferSize ,也就是RingBuffer大小,必須是的N次方
        int ringBufferSize =  * ; // 

        /**
        //BlockingWaitStrategy 是最低效的政策,但其對CPU的消耗最小并且在各種不同部署環境中能提供更加一緻的性能表現
        WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy();
        //SleepingWaitStrategy 的性能表現跟BlockingWaitStrategy差不多,對CPU的消耗也類似,但其對生産者線程的影響最小,适合用于異步日志類似的場景
        WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy();
        //YieldingWaitStrategy 的性能是最好的,适合用于低延遲的系統。在要求極高性能且事件處理線數小于CPU邏輯核心數的場景中,推薦使用此政策;例如,CPU開啟超線程的特性
        WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy();
        */

        //建立disruptor
        Disruptor<Student> disruptor = new Disruptor<Student>(()->{
            return new Student();
        }, ringBufferSize, executor, ProducerType.SINGLE, new YieldingWaitStrategy());

        // 連接配接消費事件方法
        disruptor.handleEventsWith(new EventHandler<Student>(){
            @Override
            public void onEvent(Student student, long l, boolean b) throws Exception {
                System.out.println(student.toString());
            }
        });

        // 啟動
        disruptor.start();

        //Disruptor 的事件釋出過程是一個兩階段送出的過程:
        //釋出事件
        RingBuffer<Student> ringBuffer = disruptor.getRingBuffer();

        List<Student> stus = Arrays.asList(
                new Student("xiaohuihui",,"男"),
                new Student("mingming",,"男"),
                new Student("fangfang",,"女"),
                new Student("ganggang",,"男"),
                new Student("chunchun",,"女")
        );

        //生産者釋出資料
        ringBuffer.publishEvent(new EventTranslatorOneArg<Student, List<Student>>() {
            @Override
            public void translateTo(Student student, long l, List<Student> students) {
                    student.setName(students.get().getName());
                    student.setAge(students.get().getAge());
                    student.setSex(students.get().getSex());
            }
        },stus);

//      for (Student stu : stus) {
//          //生産者釋出資料
//          ringBuffer.publishEvent(new EventTranslator<Student>() {
//              @Override
//              public void translateTo(Student student, long l) {
//                  student.setAge(stu.getAge());
//                  student.setName(stu.getName());
//                  student.setSex(stu.getSex());
//
//              }
//          });
//      }

        disruptor.shutdown();//關閉 disruptor,方法會堵塞,直至所有的事件都得到處理;
        executor.shutdown();//關閉 disruptor 使用的線程池;如果需要的話,必須手動關閉, disruptor 在 shutdown 時不會自動關閉;      

    }
}

           

API提供的消費者接口

  1. WorkerPool :

    WorkerPool<Order>(RingBuffer<V> ringBuffer, SequenceBarrier sequenceBarrier, ExceptionHandler<? super V> exceptionHandler, WorkHandler<? super V>... workHandlers)

    其中RingBuffer為資料緩沖區,sequenceBarrier是消費者與生産者之間的協調政策,API預設提供了一個實作類ProcessingSequenceBarrier,可以通過

    RingBuffer.newBarrier(Sequence... sequencesToTrack);

    來擷取,exceptionHandler為異常處理函數,當handler發生異常時回調該函數,workHandlers為實作了EventHandler接口的消息業務處理類,可以有多個。

    WorkerPool啟動的方法是

    WorkerPool.start(Executor executor)

  2. BatchEventProcessor :

    BatchEventProcessor<V>(RingBuffer extends DataProvider, SequenceBarrier sequenceBarrier, EventHandler<? super V> eventHandler)

    其中RingBuffer為資料緩沖區,sequenceBarrier是消費者與生産者之間的協調政策,API預設提供了一個實作類ProcessingSequenceBarrier,可以通過

    RingBuffer.newBarrier(Sequence... sequencesToTrack);

    來擷取,eventHandler為實作了EventHandler接口的消息業務處理類。

    BatchEventProcessor啟動的方法是

    Executor.submit(BatchEventProcessor batchEventProcessor)

注意

SequenceBarrier是用來協調消費者和生成者之間效率的政策類,是以要想Barrier生效,必須要将消費者消費的Seuence傳遞給RingBuffer,然後由RingBuffer進行協調:

ringBuffer.addGatingSequences(BatchEventProcessor.getSequence());

多消費者時使用

BatchEventProcessor.getWorkerSequences()

(這兩個方法WorkerPool同樣适用)。這是在直接使用RingBuffer時需要進行的處理,如果通過Disruptor去進行調用,在調用handleEventsWith注冊消費者方法時會自動添加該處理。

Disruptor注冊消費者的方法是:

Disruptor.handleEventsWith(final EventHandler<? super T>... handlers)

Disruptor提供了一些複雜的并行運作方式。

1、生産者A生成的資料同時被B,C兩個消費者消費,兩者都消費完成之後再由消費者D對兩者同時消費。(注意ABC以及下面提到的消息處理類必須要實作EventHandler接口)

EventHandlerGroup<Trade> handlerGroup = 
        disruptor.handleEventsWith(A, B);
//聲明在C1,C2完事之後執行JMS消息發送操作 也就是流程走到C3 
handlerGroup.then(C);
           

2、生産者A生成的資料同時被B1,C2兩個消費者消費,而B消耗完畢之後由B2處理,C1處理完成之後由C2處理,B2與C2兩者都消費完成之後再由消費者D對兩者同時消費。其中B1與B2,C1與C2是并行執行的。

disruptor.handleEventsWith(B1, C1);
disruptor.after(B1).handleEventsWith(B2);
disruptor.after(C1).handleEventsWith(C2);
disruptor.after(B2, C2).handleEventsWith(h3);
           

3、生産者A生成的資料依次被A,B,C三個消費者消費

并行運作方式demo:

Main方法:

package com.zyh.study.disruptor.base;

import com.lmax.disruptor.BusySpinWaitStrategy;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.EventHandlerGroup;
import com.lmax.disruptor.dsl.ProducerType;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {  
    public static void main(String[] args) throws InterruptedException {  

        long beginTime=System.currentTimeMillis();  
        int bufferSize=;  
        ExecutorService executor=Executors.newFixedThreadPool();  

        Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {  
            @Override  
            public Trade newInstance() {  
                return new Trade();  
            }  
        }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());  

        //菱形操作
        //使用disruptor建立消費者組C1,C2
        EventHandlerGroup<Trade> handlerGroup =
                disruptor.handleEventsWith(new Handler1(), new Handler2());
        //聲明在C1,C2完事之後執行JMS消息發送操作 也就是流程走到C3
        handlerGroup.then(new Handler3());

        //順序操作
        /**
                disruptor.handleEventsWith(new Handler1()).
                handleEventsWith(new Handler2()).
                handleEventsWith(new Handler3());
                */

        //六邊形操作. 
        /**
                Handler1 h1 = new Handler1();
                Handler2 h2 = new Handler2();
                Handler3 h3 = new Handler3();
                Handler4 h4 = new Handler4();
                Handler5 h5 = new Handler5();
                disruptor.handleEventsWith(h1, h2);
                disruptor.after(h1).handleEventsWith(h4);
                disruptor.after(h2).handleEventsWith(h5);
                disruptor.after(h4, h5).handleEventsWith(h3);
                */

        disruptor.start();//啟動  
        CountDownLatch latch=new CountDownLatch();  
        //生産者準備  
        executor.submit(new TradePublisher(latch, disruptor));

        latch.await();//等待生産者完事. 

        disruptor.shutdown();  
        executor.shutdown();  
        System.out.println("總耗時:"+(System.currentTimeMillis()-beginTime));  
    }  
}  
           

TradePublisher:

public class TradePublisher implements Runnable {  

    Disruptor<Trade> disruptor;  
    private CountDownLatch latch;  

    private static int LOOP=;//模拟百萬次交易的發生  

    public TradePublisher(CountDownLatch latch,Disruptor<Trade> disruptor) {  
        this.disruptor=disruptor;  
        this.latch=latch;  
    }  

    @Override  
    public void run() {
        TradeEventTranslator tradeTransloator = new TradeEventTranslator();  
        for(int i=;i<LOOP;i++){  
            disruptor.publishEvent(tradeTransloator);  
        }  
        latch.countDown();  
    }  

}  
           

TradeEventTranslator:

class TradeEventTranslator implements EventTranslator<Trade>{  

    private Random random=new Random();  

    @Override  
    public void translateTo(Trade event, long sequence) {  
        this.generateTrade(event);  
    }  

    private Trade generateTrade(Trade trade){  
        trade.setPrice(random.nextDouble()*);  
        return trade;  
    }  

}  
           

Trade:

@Data
public class Trade {  

    private String id;//ID  
    private String name;
    private double price;//金額  
    private AtomicInteger count = new AtomicInteger();

}  
           

Handler1:

package com.zyh.study.disruptor.base;

import com.lmax.disruptor.EventHandler;

public class Handler1 implements EventHandler<Trade>{

    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("handler1: set name");
        event.setName("h1");
        Thread.sleep();    
    }
}  
           

Handler2:

package com.zyh.study.disruptor.base;

import com.lmax.disruptor.EventHandler;

public class Handler2 implements EventHandler<Trade> {  

    @Override  
    public void onEvent(Trade event, long sequence,  boolean endOfBatch) throws Exception {  
        System.out.println("handler2: set price");
        event.setPrice();
        Thread.sleep();
    }  

}  
           

Handler3:

package com.zyh.study.disruptor.base;

import com.lmax.disruptor.EventHandler;

public class Handler3 implements EventHandler<Trade> {
    @Override  
    public void onEvent(Trade event, long sequence,  boolean endOfBatch) throws Exception {  
        System.out.println("handler3: name: " + event.getName() + " , price: " + event.getPrice() + ";  instance: " + event.toString());
    }  
}
           

Handler4:

package com.zyh.study.disruptor.base;

import com.lmax.disruptor.EventHandler;

public class Handler4 implements EventHandler<Trade> {

    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("handler4: get name : " + event.getName());
        event.setName(event.getName() + "h4");
    }

}  
           

Handler5:

package com.zyh.study.disruptor.base;

import com.lmax.disruptor.EventHandler;

public class Handler5 implements EventHandler<Trade>{

    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println("handler5: get price : " + event.getPrice());
        event.setPrice(event.getPrice() + );
    }
}