天天看點

叢集通信元件tribes之通道攔截器

攔截器應該可以說是一個很經典的設計模式,它有點類似于過濾器,當某資訊從一個地方流向目的地的過程中,可能需要統一對資訊進行處理,如果考慮到系統的可擴充性和靈活性通常就會使用攔截器模式,它就像一個個關卡被設定在資訊流動的通道中,并且可以按照實際需要添加和減少關卡。Tribes為了在應用層提供對源消息統一處理的管道引入通道攔截器,使用者在應用層隻需要根據自己需要添加攔截器即可,例如,壓縮解壓攔截器、消息輸出輸入統計攔截器、異步消息發送器等等。

攔截器的資料流向示意圖可以參考前面的tribes簡介章節,資料從IO層流向應用層,中間就會經過一個攔截器棧,應用層處理完就會傳回一個ack給發送端表示已經接收并處理完畢(消息可靠級别為SYNC_ACK),下面嘗試用最簡單一些代碼和僞代碼說明tribes的攔截器實作,旨在領會攔截器如何設計而并非具體的實作。最終實作的功能如圖所示,最底層的協調者ChannelCoordinator永遠作為第一個加入攔截器棧的攔截器,往上則是按照添加順序排列,且每個攔截器的previous、next分别指向前一個攔截器和下一個攔截器。

叢集通信元件tribes之通道攔截器

① 定義攔截器接口

public interface ChannelInterceptor{

    public void setNext(ChannelInterceptor next) ;

    public ChannelInterceptor getPrevious();

    public void sendMessage(ChannelMessage msg);

    public void messageReceived(ChannelMessage msg);

}

② 定義一個基礎攔截器,提供一些公共的操作,由于攔截器執行完後要觸發下個攔截器,是以把觸發工作統一抽離到基礎類裡面完成,當然裡面必須包含前一個和後一個攔截器的引用。

public class ChannelInterceptorBase implements ChannelInterceptor {

    private ChannelInterceptor next;

    private ChannelInterceptor previous;

    public ChannelInterceptorBase() {

    }

    public final void setNext(ChannelInterceptor next) {

        this.next = next;

    public final ChannelInterceptor getNext() {

        return next;

    public final void setPrevious(ChannelInterceptor previous) {

        this.previous = previous;

    public final ChannelInterceptor getPrevious() {

        return previous;

    public void sendMessage(ChannelMessage msg) {

        if (getNext() != null) getNext().sendMessage(msg,);

    public void messageReceived(ChannelMessage msg) {

        if (getPrevious() != null) getPrevious().messageReceived(msg);

③ 壓縮解壓攔截器,此攔截器負責按一定算法壓縮和解壓處理。

public class GzipInterceptor extends ChannelInterceptorBase {

    public void sendMessage(ChannelMessage msg){

            compress the msg;

            getNext().sendMessage(msg);

            decompress the msg;

            getPrevious().messageReceived(msg);

④ 最底層的協調器,直接與網絡IO做互動

public class ChannelCoordinator extends ChannelInterceptorBase{

    public ChannelCoordinator() {

    public void sendMessage(ChannelMessage msg) throws ChannelException {

        Network IO Send

public void messageReceived(ChannelMessage msg) {

    Network IO READ

        super.messageReceived(msg);

⑤ 測試類

public class Test{

public void main(String[] args){

ChannelCoordinator coordinator = new ChannelCoordinator();

GzipInterceptor gzipInterceptor = new GzipInterceptor();

coordinator.setNext(null);

coordinator.setPrevious(gzipInterceptor);

gzipInterceptor.setPrevious(null);

gzipInterceptor .setNext(coordinator);

gzipInterceptor.sendMessage(msg);

coordinator.messageReceived(msg);

    Tribes的攔截器整體設計就如上面,整個攔截器的執行順序如下,當執行寫操作時,資料流向GzipInterceptor -> ChannelCoordinator -> Network IO;當執行讀操作時,資料流向則為Network IO -> ChannelCoordinator -> GzipInterceptor。了解了整個設計原理後對于tribes的整體把握将會更加深入。

<a target="_blank" href="https://item.jd.com/12185360.html">點選訂購作者《Tomcat核心設計剖析》</a>

繼續閱讀